Whamcloud - gitweb
c6ccb6871be09113ea435aed10cfc6999e4d474a
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include <obd.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66         struct obdo              *aa_oa;
67         int                       aa_requested_nob;
68         int                       aa_nio_count;
69         u32                       aa_page_count;
70         int                       aa_resends;
71         struct brw_page **aa_ppga;
72         struct client_obd        *aa_cli;
73         struct list_head          aa_oaps;
74         struct list_head          aa_exts;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_setattr_args {
81         struct obdo             *sa_oa;
82         obd_enqueue_update_f     sa_upcall;
83         void                    *sa_cookie;
84 };
85
86 struct osc_fsync_args {
87         struct osc_object       *fa_obj;
88         struct obdo             *fa_oa;
89         obd_enqueue_update_f    fa_upcall;
90         void                    *fa_cookie;
91 };
92
93 struct osc_enqueue_args {
94         struct obd_export       *oa_exp;
95         ldlm_type_t             oa_type;
96         ldlm_mode_t             oa_mode;
97         __u64                   *oa_flags;
98         osc_enqueue_upcall_f    oa_upcall;
99         void                    *oa_cookie;
100         struct ost_lvb          *oa_lvb;
101         struct lustre_handle    oa_lockh;
102         unsigned int            oa_agl:1;
103 };
104
105 static void osc_release_ppga(struct brw_page **ppga, size_t count);
106 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
107                          void *data, int rc);
108
109 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
110 {
111         struct ost_body *body;
112
113         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
114         LASSERT(body);
115
116         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
117 }
118
119 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
120                        struct obdo *oa)
121 {
122         struct ptlrpc_request   *req;
123         struct ost_body         *body;
124         int                      rc;
125
126         ENTRY;
127         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
128         if (req == NULL)
129                 RETURN(-ENOMEM);
130
131         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
132         if (rc) {
133                 ptlrpc_request_free(req);
134                 RETURN(rc);
135         }
136
137         osc_pack_req_body(req, oa);
138
139         ptlrpc_request_set_replen(req);
140
141         rc = ptlrpc_queue_wait(req);
142         if (rc)
143                 GOTO(out, rc);
144
145         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
146         if (body == NULL)
147                 GOTO(out, rc = -EPROTO);
148
149         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
150         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
151
152         oa->o_blksize = cli_brw_size(exp->exp_obd);
153         oa->o_valid |= OBD_MD_FLBLKSZ;
154
155         EXIT;
156 out:
157         ptlrpc_req_finished(req);
158
159         return rc;
160 }
161
162 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
163                        struct obdo *oa)
164 {
165         struct ptlrpc_request   *req;
166         struct ost_body         *body;
167         int                      rc;
168
169         ENTRY;
170         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
171
172         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
173         if (req == NULL)
174                 RETURN(-ENOMEM);
175
176         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
177         if (rc) {
178                 ptlrpc_request_free(req);
179                 RETURN(rc);
180         }
181
182         osc_pack_req_body(req, oa);
183
184         ptlrpc_request_set_replen(req);
185
186         rc = ptlrpc_queue_wait(req);
187         if (rc)
188                 GOTO(out, rc);
189
190         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191         if (body == NULL)
192                 GOTO(out, rc = -EPROTO);
193
194         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
195
196         EXIT;
197 out:
198         ptlrpc_req_finished(req);
199
200         RETURN(rc);
201 }
202
203 static int osc_setattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_setattr_args *sa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body == NULL)
215                 GOTO(out, rc = -EPROTO);
216
217         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
218                              &body->oa);
219 out:
220         rc = sa->sa_upcall(sa->sa_cookie, rc);
221         RETURN(rc);
222 }
223
224 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
225                       obd_enqueue_update_f upcall, void *cookie,
226                       struct ptlrpc_request_set *rqset)
227 {
228         struct ptlrpc_request   *req;
229         struct osc_setattr_args *sa;
230         int                      rc;
231
232         ENTRY;
233
234         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
235         if (req == NULL)
236                 RETURN(-ENOMEM);
237
238         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
239         if (rc) {
240                 ptlrpc_request_free(req);
241                 RETURN(rc);
242         }
243
244         osc_pack_req_body(req, oa);
245
246         ptlrpc_request_set_replen(req);
247
248         /* do mds to ost setattr asynchronously */
249         if (!rqset) {
250                 /* Do not wait for response. */
251                 ptlrpcd_add_req(req);
252         } else {
253                 req->rq_interpret_reply =
254                         (ptlrpc_interpterer_t)osc_setattr_interpret;
255
256                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
257                 sa = ptlrpc_req_async_args(req);
258                 sa->sa_oa = oa;
259                 sa->sa_upcall = upcall;
260                 sa->sa_cookie = cookie;
261
262                 if (rqset == PTLRPCD_SET)
263                         ptlrpcd_add_req(req);
264                 else
265                         ptlrpc_set_add_req(rqset, req);
266         }
267
268         RETURN(0);
269 }
270
271 static int osc_create(const struct lu_env *env, struct obd_export *exp,
272                       struct obdo *oa)
273 {
274         struct ptlrpc_request *req;
275         struct ost_body       *body;
276         int                    rc;
277         ENTRY;
278
279         LASSERT(oa != NULL);
280         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
281         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
284         if (req == NULL)
285                 GOTO(out, rc = -ENOMEM);
286
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 GOTO(out, rc);
291         }
292
293         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
294         LASSERT(body);
295
296         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
297
298         ptlrpc_request_set_replen(req);
299
300         rc = ptlrpc_queue_wait(req);
301         if (rc)
302                 GOTO(out_req, rc);
303
304         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
305         if (body == NULL)
306                 GOTO(out_req, rc = -EPROTO);
307
308         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
309         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
310
311         oa->o_blksize = cli_brw_size(exp->exp_obd);
312         oa->o_valid |= OBD_MD_FLBLKSZ;
313
314         CDEBUG(D_HA, "transno: "LPD64"\n",
315                lustre_msg_get_transno(req->rq_repmsg));
316 out_req:
317         ptlrpc_req_finished(req);
318 out:
319         RETURN(rc);
320 }
321
322 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
323                    obd_enqueue_update_f upcall, void *cookie,
324                    struct ptlrpc_request_set *rqset)
325 {
326         struct ptlrpc_request   *req;
327         struct osc_setattr_args *sa;
328         struct ost_body         *body;
329         int                      rc;
330         ENTRY;
331
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
333         if (req == NULL)
334                 RETURN(-ENOMEM);
335
336         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
337         if (rc) {
338                 ptlrpc_request_free(req);
339                 RETURN(rc);
340         }
341         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
342         ptlrpc_at_set_req_timeout(req);
343
344         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
345         LASSERT(body);
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
351         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
352         sa = ptlrpc_req_async_args(req);
353         sa->sa_oa = oa;
354         sa->sa_upcall = upcall;
355         sa->sa_cookie = cookie;
356         if (rqset == PTLRPCD_SET)
357                 ptlrpcd_add_req(req);
358         else
359                 ptlrpc_set_add_req(rqset, req);
360
361         RETURN(0);
362 }
363
364 static int osc_sync_interpret(const struct lu_env *env,
365                               struct ptlrpc_request *req,
366                               void *arg, int rc)
367 {
368         struct osc_fsync_args   *fa = arg;
369         struct ost_body         *body;
370         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
371         unsigned long           valid = 0;
372         struct cl_object        *obj;
373         ENTRY;
374
375         if (rc != 0)
376                 GOTO(out, rc);
377
378         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
379         if (body == NULL) {
380                 CERROR("can't unpack ost_body\n");
381                 GOTO(out, rc = -EPROTO);
382         }
383
384         *fa->fa_oa = body->oa;
385         obj = osc2cl(fa->fa_obj);
386
387         /* Update osc object's blocks attribute */
388         cl_object_attr_lock(obj);
389         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
390                 attr->cat_blocks = body->oa.o_blocks;
391                 valid |= CAT_BLOCKS;
392         }
393
394         if (valid != 0)
395                 cl_object_attr_update(env, obj, attr, valid);
396         cl_object_attr_unlock(obj);
397
398 out:
399         rc = fa->fa_upcall(fa->fa_cookie, rc);
400         RETURN(rc);
401 }
402
403 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
404                   obd_enqueue_update_f upcall, void *cookie,
405                   struct ptlrpc_request_set *rqset)
406 {
407         struct obd_export     *exp = osc_export(obj);
408         struct ptlrpc_request *req;
409         struct ost_body       *body;
410         struct osc_fsync_args *fa;
411         int                    rc;
412         ENTRY;
413
414         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
415         if (req == NULL)
416                 RETURN(-ENOMEM);
417
418         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
419         if (rc) {
420                 ptlrpc_request_free(req);
421                 RETURN(rc);
422         }
423
424         /* overload the size and blocks fields in the oa with start/end */
425         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426         LASSERT(body);
427         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
428
429         ptlrpc_request_set_replen(req);
430         req->rq_interpret_reply = osc_sync_interpret;
431
432         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
433         fa = ptlrpc_req_async_args(req);
434         fa->fa_obj = obj;
435         fa->fa_oa = oa;
436         fa->fa_upcall = upcall;
437         fa->fa_cookie = cookie;
438
439         if (rqset == PTLRPCD_SET)
440                 ptlrpcd_add_req(req);
441         else
442                 ptlrpc_set_add_req(rqset, req);
443
444         RETURN (0);
445 }
446
447 /* Find and cancel locally locks matched by @mode in the resource found by
448  * @objid. Found locks are added into @cancel list. Returns the amount of
449  * locks added to @cancels list. */
450 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
451                                    struct list_head *cancels,
452                                    ldlm_mode_t mode, __u64 lock_flags)
453 {
454         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
455         struct ldlm_res_id res_id;
456         struct ldlm_resource *res;
457         int count;
458         ENTRY;
459
460         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
461          * export) but disabled through procfs (flag in NS).
462          *
463          * This distinguishes from a case when ELC is not supported originally,
464          * when we still want to cancel locks in advance and just cancel them
465          * locally, without sending any RPC. */
466         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
467                 RETURN(0);
468
469         ostid_build_res_name(&oa->o_oi, &res_id);
470         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
471         if (IS_ERR(res))
472                 RETURN(0);
473
474         LDLM_RESOURCE_ADDREF(res);
475         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
476                                            lock_flags, 0, NULL);
477         LDLM_RESOURCE_DELREF(res);
478         ldlm_resource_putref(res);
479         RETURN(count);
480 }
481
482 static int osc_destroy_interpret(const struct lu_env *env,
483                                  struct ptlrpc_request *req, void *data,
484                                  int rc)
485 {
486         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
487
488         atomic_dec(&cli->cl_destroy_in_flight);
489         wake_up(&cli->cl_destroy_waitq);
490         return 0;
491 }
492
493 static int osc_can_send_destroy(struct client_obd *cli)
494 {
495         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
496             cli->cl_max_rpcs_in_flight) {
497                 /* The destroy request can be sent */
498                 return 1;
499         }
500         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
501             cli->cl_max_rpcs_in_flight) {
502                 /*
503                  * The counter has been modified between the two atomic
504                  * operations.
505                  */
506                 wake_up(&cli->cl_destroy_waitq);
507         }
508         return 0;
509 }
510
511 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
512                        struct obdo *oa)
513 {
514         struct client_obd     *cli = &exp->exp_obd->u.cli;
515         struct ptlrpc_request *req;
516         struct ost_body       *body;
517         struct list_head       cancels = LIST_HEAD_INIT(cancels);
518         int rc, count;
519         ENTRY;
520
521         if (!oa) {
522                 CDEBUG(D_INFO, "oa NULL\n");
523                 RETURN(-EINVAL);
524         }
525
526         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
527                                         LDLM_FL_DISCARD_DATA);
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
530         if (req == NULL) {
531                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
532                 RETURN(-ENOMEM);
533         }
534
535         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
536                                0, &cancels, count);
537         if (rc) {
538                 ptlrpc_request_free(req);
539                 RETURN(rc);
540         }
541
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544
545         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546         LASSERT(body);
547         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548
549         ptlrpc_request_set_replen(req);
550
551         req->rq_interpret_reply = osc_destroy_interpret;
552         if (!osc_can_send_destroy(cli)) {
553                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
554
555                 /*
556                  * Wait until the number of on-going destroy RPCs drops
557                  * under max_rpc_in_flight
558                  */
559                 l_wait_event_exclusive(cli->cl_destroy_waitq,
560                                        osc_can_send_destroy(cli), &lwi);
561         }
562
563         /* Do not wait for response */
564         ptlrpcd_add_req(req);
565         RETURN(0);
566 }
567
568 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
569                                 long writing_bytes)
570 {
571         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
572
573         LASSERT(!(oa->o_valid & bits));
574
575         oa->o_valid |= bits;
576         spin_lock(&cli->cl_loi_list_lock);
577         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
578         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
579                      cli->cl_dirty_max_pages)) {
580                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
581                        cli->cl_dirty_pages, cli->cl_dirty_transit,
582                        cli->cl_dirty_max_pages);
583                 oa->o_undirty = 0;
584         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
585                             atomic_long_read(&obd_dirty_transit_pages) >
586                             (obd_max_dirty_pages + 1))) {
587                 /* The atomic_read() allowing the atomic_inc() are
588                  * not covered by a lock thus they may safely race and trip
589                  * this CERROR() unless we add in a small fudge factor (+1). */
590                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
591                        cli->cl_import->imp_obd->obd_name,
592                        atomic_long_read(&obd_dirty_pages),
593                        atomic_long_read(&obd_dirty_transit_pages),
594                        obd_max_dirty_pages);
595                 oa->o_undirty = 0;
596         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
597                             0x7fffffff)) {
598                 CERROR("dirty %lu - dirty_max %lu too big???\n",
599                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
600                 oa->o_undirty = 0;
601         } else {
602                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
603                                       PAGE_CACHE_SHIFT) *
604                                      (cli->cl_max_rpcs_in_flight + 1);
605                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
606                                     max_in_flight);
607         }
608         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
609         oa->o_dropped = cli->cl_lost_grant;
610         cli->cl_lost_grant = 0;
611         spin_unlock(&cli->cl_loi_list_lock);
612         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
613                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
614
615 }
616
617 void osc_update_next_shrink(struct client_obd *cli)
618 {
619         cli->cl_next_shrink_grant =
620                 cfs_time_shift(cli->cl_grant_shrink_interval);
621         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
622                cli->cl_next_shrink_grant);
623 }
624
625 static void __osc_update_grant(struct client_obd *cli, u64 grant)
626 {
627         spin_lock(&cli->cl_loi_list_lock);
628         cli->cl_avail_grant += grant;
629         spin_unlock(&cli->cl_loi_list_lock);
630 }
631
632 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
633 {
634         if (body->oa.o_valid & OBD_MD_FLGRANT) {
635                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
636                 __osc_update_grant(cli, body->oa.o_grant);
637         }
638 }
639
640 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
641                               u32 keylen, void *key,
642                               u32 vallen, void *val,
643                               struct ptlrpc_request_set *set);
644
645 static int osc_shrink_grant_interpret(const struct lu_env *env,
646                                       struct ptlrpc_request *req,
647                                       void *aa, int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
651         struct ost_body *body;
652
653         if (rc != 0) {
654                 __osc_update_grant(cli, oa->o_grant);
655                 GOTO(out, rc);
656         }
657
658         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
659         LASSERT(body);
660         osc_update_grant(cli, body);
661 out:
662         OBDO_FREE(oa);
663         return rc;
664 }
665
666 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
667 {
668         spin_lock(&cli->cl_loi_list_lock);
669         oa->o_grant = cli->cl_avail_grant / 4;
670         cli->cl_avail_grant -= oa->o_grant;
671         spin_unlock(&cli->cl_loi_list_lock);
672         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
673                 oa->o_valid |= OBD_MD_FLFLAGS;
674                 oa->o_flags = 0;
675         }
676         oa->o_flags |= OBD_FL_SHRINK_GRANT;
677         osc_update_next_shrink(cli);
678 }
679
680 /* Shrink the current grant, either from some large amount to enough for a
681  * full set of in-flight RPCs, or if we have already shrunk to that limit
682  * then to enough for a single RPC.  This avoids keeping more grant than
683  * needed, and avoids shrinking the grant piecemeal. */
684 static int osc_shrink_grant(struct client_obd *cli)
685 {
686         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
687                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
688
689         spin_lock(&cli->cl_loi_list_lock);
690         if (cli->cl_avail_grant <= target_bytes)
691                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
692         spin_unlock(&cli->cl_loi_list_lock);
693
694         return osc_shrink_grant_to_target(cli, target_bytes);
695 }
696
697 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
698 {
699         int                     rc = 0;
700         struct ost_body        *body;
701         ENTRY;
702
703         spin_lock(&cli->cl_loi_list_lock);
704         /* Don't shrink if we are already above or below the desired limit
705          * We don't want to shrink below a single RPC, as that will negatively
706          * impact block allocation and long-term performance. */
707         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
708                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
709
710         if (target_bytes >= cli->cl_avail_grant) {
711                 spin_unlock(&cli->cl_loi_list_lock);
712                 RETURN(0);
713         }
714         spin_unlock(&cli->cl_loi_list_lock);
715
716         OBD_ALLOC_PTR(body);
717         if (!body)
718                 RETURN(-ENOMEM);
719
720         osc_announce_cached(cli, &body->oa, 0);
721
722         spin_lock(&cli->cl_loi_list_lock);
723         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
724         cli->cl_avail_grant = target_bytes;
725         spin_unlock(&cli->cl_loi_list_lock);
726         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
727                 body->oa.o_valid |= OBD_MD_FLFLAGS;
728                 body->oa.o_flags = 0;
729         }
730         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
731         osc_update_next_shrink(cli);
732
733         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
734                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
735                                 sizeof(*body), body, NULL);
736         if (rc != 0)
737                 __osc_update_grant(cli, body->oa.o_grant);
738         OBD_FREE_PTR(body);
739         RETURN(rc);
740 }
741
742 static int osc_should_shrink_grant(struct client_obd *client)
743 {
744         cfs_time_t time = cfs_time_current();
745         cfs_time_t next_shrink = client->cl_next_shrink_grant;
746
747         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
748              OBD_CONNECT_GRANT_SHRINK) == 0)
749                 return 0;
750
751         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
752                 /* Get the current RPC size directly, instead of going via:
753                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
754                  * Keep comment here so that it can be found by searching. */
755                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
756
757                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
758                     client->cl_avail_grant > brw_size)
759                         return 1;
760                 else
761                         osc_update_next_shrink(client);
762         }
763         return 0;
764 }
765
766 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
767 {
768         struct client_obd *client;
769
770         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
771                 if (osc_should_shrink_grant(client))
772                         osc_shrink_grant(client);
773         }
774         return 0;
775 }
776
777 static int osc_add_shrink_grant(struct client_obd *client)
778 {
779         int rc;
780
781         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
782                                        TIMEOUT_GRANT,
783                                        osc_grant_shrink_grant_cb, NULL,
784                                        &client->cl_grant_shrink_list);
785         if (rc) {
786                 CERROR("add grant client %s error %d\n",
787                         client->cl_import->imp_obd->obd_name, rc);
788                 return rc;
789         }
790         CDEBUG(D_CACHE, "add grant client %s \n",
791                client->cl_import->imp_obd->obd_name);
792         osc_update_next_shrink(client);
793         return 0;
794 }
795
796 static int osc_del_shrink_grant(struct client_obd *client)
797 {
798         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
799                                          TIMEOUT_GRANT);
800 }
801
802 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
803 {
804         /*
805          * ocd_grant is the total grant amount we're expect to hold: if we've
806          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
807          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
808          * dirty.
809          *
810          * race is tolerable here: if we're evicted, but imp_state already
811          * left EVICTED state, then cl_dirty_pages must be 0 already.
812          */
813         spin_lock(&cli->cl_loi_list_lock);
814         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
815                 cli->cl_avail_grant = ocd->ocd_grant;
816         else
817                 cli->cl_avail_grant = ocd->ocd_grant -
818                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
819
820         if (cli->cl_avail_grant < 0) {
821                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
822                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
823                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
824                 /* workaround for servers which do not have the patch from
825                  * LU-2679 */
826                 cli->cl_avail_grant = ocd->ocd_grant;
827         }
828
829         /* determine the appropriate chunk size used by osc_extent. */
830         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
831         spin_unlock(&cli->cl_loi_list_lock);
832
833         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
834                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
835                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
836
837         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
838             list_empty(&cli->cl_grant_shrink_list))
839                 osc_add_shrink_grant(cli);
840 }
841
842 /* We assume that the reason this OSC got a short read is because it read
843  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
844  * via the LOV, and it _knows_ it's reading inside the file, it's just that
845  * this stripe never got written at or beyond this stripe offset yet. */
846 static void handle_short_read(int nob_read, size_t page_count,
847                               struct brw_page **pga)
848 {
849         char *ptr;
850         int i = 0;
851
852         /* skip bytes read OK */
853         while (nob_read > 0) {
854                 LASSERT (page_count > 0);
855
856                 if (pga[i]->count > nob_read) {
857                         /* EOF inside this page */
858                         ptr = kmap(pga[i]->pg) +
859                                 (pga[i]->off & ~PAGE_MASK);
860                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
861                         kunmap(pga[i]->pg);
862                         page_count--;
863                         i++;
864                         break;
865                 }
866
867                 nob_read -= pga[i]->count;
868                 page_count--;
869                 i++;
870         }
871
872         /* zero remaining pages */
873         while (page_count-- > 0) {
874                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
875                 memset(ptr, 0, pga[i]->count);
876                 kunmap(pga[i]->pg);
877                 i++;
878         }
879 }
880
881 static int check_write_rcs(struct ptlrpc_request *req,
882                            int requested_nob, int niocount,
883                            size_t page_count, struct brw_page **pga)
884 {
885         int     i;
886         __u32   *remote_rcs;
887
888         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
889                                                   sizeof(*remote_rcs) *
890                                                   niocount);
891         if (remote_rcs == NULL) {
892                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
893                 return(-EPROTO);
894         }
895
896         /* return error if any niobuf was in error */
897         for (i = 0; i < niocount; i++) {
898                 if ((int)remote_rcs[i] < 0)
899                         return(remote_rcs[i]);
900
901                 if (remote_rcs[i] != 0) {
902                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
903                                 i, remote_rcs[i], req);
904                         return(-EPROTO);
905                 }
906         }
907
908         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
909                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
910                        req->rq_bulk->bd_nob_transferred, requested_nob);
911                 return(-EPROTO);
912         }
913
914         return (0);
915 }
916
917 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
918 {
919         if (p1->flag != p2->flag) {
920                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
921                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
922                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
923
924                 /* warn if we try to combine flags that we don't know to be
925                  * safe to combine */
926                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
927                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
928                               "report this at https://jira.hpdd.intel.com/\n",
929                               p1->flag, p2->flag);
930                 }
931                 return 0;
932         }
933
934         return (p1->off + p1->count == p2->off);
935 }
936
937 static u32 osc_checksum_bulk(int nob, size_t pg_count,
938                              struct brw_page **pga, int opc,
939                              cksum_type_t cksum_type)
940 {
941         u32                             cksum;
942         int                             i = 0;
943         struct cfs_crypto_hash_desc     *hdesc;
944         unsigned int                    bufsize;
945         int                             err;
946         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
947
948         LASSERT(pg_count > 0);
949
950         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
951         if (IS_ERR(hdesc)) {
952                 CERROR("Unable to initialize checksum hash %s\n",
953                        cfs_crypto_hash_name(cfs_alg));
954                 return PTR_ERR(hdesc);
955         }
956
957         while (nob > 0 && pg_count > 0) {
958                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
959
960                 /* corrupt the data before we compute the checksum, to
961                  * simulate an OST->client data error */
962                 if (i == 0 && opc == OST_READ &&
963                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
964                         unsigned char *ptr = kmap(pga[i]->pg);
965                         int off = pga[i]->off & ~PAGE_MASK;
966
967                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
968                         kunmap(pga[i]->pg);
969                 }
970                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
971                                             pga[i]->off & ~PAGE_MASK,
972                                             count);
973                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
974                                (int)(pga[i]->off & ~PAGE_MASK));
975
976                 nob -= pga[i]->count;
977                 pg_count--;
978                 i++;
979         }
980
981         bufsize = sizeof(cksum);
982         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
983
984         /* For sending we only compute the wrong checksum instead
985          * of corrupting the data so it is still correct on a redo */
986         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
987                 cksum++;
988
989         return cksum;
990 }
991
992 static int
993 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
994                      u32 page_count, struct brw_page **pga,
995                      struct ptlrpc_request **reqp, int resend)
996 {
997         struct ptlrpc_request   *req;
998         struct ptlrpc_bulk_desc *desc;
999         struct ost_body         *body;
1000         struct obd_ioobj        *ioobj;
1001         struct niobuf_remote    *niobuf;
1002         int niocount, i, requested_nob, opc, rc;
1003         struct osc_brw_async_args *aa;
1004         struct req_capsule      *pill;
1005         struct brw_page *pg_prev;
1006
1007         ENTRY;
1008         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1009                 RETURN(-ENOMEM); /* Recoverable */
1010         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1011                 RETURN(-EINVAL); /* Fatal */
1012
1013         if ((cmd & OBD_BRW_WRITE) != 0) {
1014                 opc = OST_WRITE;
1015                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1016                                                 osc_rq_pool,
1017                                                 &RQF_OST_BRW_WRITE);
1018         } else {
1019                 opc = OST_READ;
1020                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1021         }
1022         if (req == NULL)
1023                 RETURN(-ENOMEM);
1024
1025         for (niocount = i = 1; i < page_count; i++) {
1026                 if (!can_merge_pages(pga[i - 1], pga[i]))
1027                         niocount++;
1028         }
1029
1030         pill = &req->rq_pill;
1031         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1032                              sizeof(*ioobj));
1033         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1034                              niocount * sizeof(*niobuf));
1035
1036         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1037         if (rc) {
1038                 ptlrpc_request_free(req);
1039                 RETURN(rc);
1040         }
1041         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1042         ptlrpc_at_set_req_timeout(req);
1043         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1044          * retry logic */
1045         req->rq_no_retry_einprogress = 1;
1046
1047         desc = ptlrpc_prep_bulk_imp(req, page_count,
1048                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1049                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1050                         PTLRPC_BULK_PUT_SINK) |
1051                         PTLRPC_BULK_BUF_KIOV,
1052                 OST_BULK_PORTAL,
1053                 &ptlrpc_bulk_kiov_pin_ops);
1054
1055         if (desc == NULL)
1056                 GOTO(out, rc = -ENOMEM);
1057         /* NB request now owns desc and will free it when it gets freed */
1058
1059         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1060         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1061         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1062         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1063
1064         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1065
1066         obdo_to_ioobj(oa, ioobj);
1067         ioobj->ioo_bufcnt = niocount;
1068         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1069          * that might be send for this request.  The actual number is decided
1070          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1071          * "max - 1" for old client compatibility sending "0", and also so the
1072          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1073         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1074         LASSERT(page_count > 0);
1075         pg_prev = pga[0];
1076         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1077                 struct brw_page *pg = pga[i];
1078                 int poff = pg->off & ~PAGE_MASK;
1079
1080                 LASSERT(pg->count > 0);
1081                 /* make sure there is no gap in the middle of page array */
1082                 LASSERTF(page_count == 1 ||
1083                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1084                           ergo(i > 0 && i < page_count - 1,
1085                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1086                           ergo(i == page_count - 1, poff == 0)),
1087                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1088                          i, page_count, pg, pg->off, pg->count);
1089                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1090                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1091                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1092                          i, page_count,
1093                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1094                          pg_prev->pg, page_private(pg_prev->pg),
1095                          pg_prev->pg->index, pg_prev->off);
1096                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1097                         (pg->flag & OBD_BRW_SRVLOCK));
1098
1099                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1100                 requested_nob += pg->count;
1101
1102                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1103                         niobuf--;
1104                         niobuf->rnb_len += pg->count;
1105                 } else {
1106                         niobuf->rnb_offset = pg->off;
1107                         niobuf->rnb_len    = pg->count;
1108                         niobuf->rnb_flags  = pg->flag;
1109                 }
1110                 pg_prev = pg;
1111         }
1112
1113         LASSERTF((void *)(niobuf - niocount) ==
1114                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1115                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1116                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1117
1118         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1119         if (resend) {
1120                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1121                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1122                         body->oa.o_flags = 0;
1123                 }
1124                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1125         }
1126
1127         if (osc_should_shrink_grant(cli))
1128                 osc_shrink_grant_local(cli, &body->oa);
1129
1130         /* size[REQ_REC_OFF] still sizeof (*body) */
1131         if (opc == OST_WRITE) {
1132                 if (cli->cl_checksum &&
1133                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1134                         /* store cl_cksum_type in a local variable since
1135                          * it can be changed via lprocfs */
1136                         cksum_type_t cksum_type = cli->cl_cksum_type;
1137
1138                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1139                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1140                                 body->oa.o_flags = 0;
1141                         }
1142                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1143                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1144                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1145                                                              page_count, pga,
1146                                                              OST_WRITE,
1147                                                              cksum_type);
1148                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1149                                body->oa.o_cksum);
1150                         /* save this in 'oa', too, for later checking */
1151                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1152                         oa->o_flags |= cksum_type_pack(cksum_type);
1153                 } else {
1154                         /* clear out the checksum flag, in case this is a
1155                          * resend but cl_checksum is no longer set. b=11238 */
1156                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1157                 }
1158                 oa->o_cksum = body->oa.o_cksum;
1159                 /* 1 RC per niobuf */
1160                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1161                                      sizeof(__u32) * niocount);
1162         } else {
1163                 if (cli->cl_checksum &&
1164                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1165                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1166                                 body->oa.o_flags = 0;
1167                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1168                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1169                 }
1170         }
1171         ptlrpc_request_set_replen(req);
1172
1173         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1174         aa = ptlrpc_req_async_args(req);
1175         aa->aa_oa = oa;
1176         aa->aa_requested_nob = requested_nob;
1177         aa->aa_nio_count = niocount;
1178         aa->aa_page_count = page_count;
1179         aa->aa_resends = 0;
1180         aa->aa_ppga = pga;
1181         aa->aa_cli = cli;
1182         INIT_LIST_HEAD(&aa->aa_oaps);
1183
1184         *reqp = req;
1185         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1186         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1187                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1188                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1189         RETURN(0);
1190
1191  out:
1192         ptlrpc_req_finished(req);
1193         RETURN(rc);
1194 }
1195
1196 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1197                                 __u32 client_cksum, __u32 server_cksum, int nob,
1198                                 size_t page_count, struct brw_page **pga,
1199                                 cksum_type_t client_cksum_type)
1200 {
1201         __u32 new_cksum;
1202         char *msg;
1203         cksum_type_t cksum_type;
1204
1205         if (server_cksum == client_cksum) {
1206                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1207                 return 0;
1208         }
1209
1210         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1211                                        oa->o_flags : 0);
1212         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1213                                       cksum_type);
1214
1215         if (cksum_type != client_cksum_type)
1216                 msg = "the server did not use the checksum type specified in "
1217                       "the original request - likely a protocol problem";
1218         else if (new_cksum == server_cksum)
1219                 msg = "changed on the client after we checksummed it - "
1220                       "likely false positive due to mmap IO (bug 11742)";
1221         else if (new_cksum == client_cksum)
1222                 msg = "changed in transit before arrival at OST";
1223         else
1224                 msg = "changed in transit AND doesn't match the original - "
1225                       "likely false positive due to mmap IO (bug 11742)";
1226
1227         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1228                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1229                            msg, libcfs_nid2str(peer->nid),
1230                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1231                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1232                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1233                            POSTID(&oa->o_oi), pga[0]->off,
1234                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1235         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1236                "client csum now %x\n", client_cksum, client_cksum_type,
1237                server_cksum, cksum_type, new_cksum);
1238         return 1;
1239 }
1240
1241 /* Note rc enters this function as number of bytes transferred */
1242 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1243 {
1244         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1245         const lnet_process_id_t *peer =
1246                         &req->rq_import->imp_connection->c_peer;
1247         struct client_obd *cli = aa->aa_cli;
1248         struct ost_body *body;
1249         u32 client_cksum = 0;
1250         ENTRY;
1251
1252         if (rc < 0 && rc != -EDQUOT) {
1253                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1254                 RETURN(rc);
1255         }
1256
1257         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1258         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1259         if (body == NULL) {
1260                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1261                 RETURN(-EPROTO);
1262         }
1263
1264         /* set/clear over quota flag for a uid/gid */
1265         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1266             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1267                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1268
1269                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1270                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1271                        body->oa.o_flags);
1272                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1273         }
1274
1275         osc_update_grant(cli, body);
1276
1277         if (rc < 0)
1278                 RETURN(rc);
1279
1280         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1281                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1282
1283         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1284                 if (rc > 0) {
1285                         CERROR("Unexpected +ve rc %d\n", rc);
1286                         RETURN(-EPROTO);
1287                 }
1288                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1289
1290                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1291                         RETURN(-EAGAIN);
1292
1293                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1294                     check_write_checksum(&body->oa, peer, client_cksum,
1295                                          body->oa.o_cksum, aa->aa_requested_nob,
1296                                          aa->aa_page_count, aa->aa_ppga,
1297                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1298                         RETURN(-EAGAIN);
1299
1300                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1301                                      aa->aa_page_count, aa->aa_ppga);
1302                 GOTO(out, rc);
1303         }
1304
1305         /* The rest of this function executes only for OST_READs */
1306
1307         /* if unwrap_bulk failed, return -EAGAIN to retry */
1308         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1309         if (rc < 0)
1310                 GOTO(out, rc = -EAGAIN);
1311
1312         if (rc > aa->aa_requested_nob) {
1313                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1314                        aa->aa_requested_nob);
1315                 RETURN(-EPROTO);
1316         }
1317
1318         if (rc != req->rq_bulk->bd_nob_transferred) {
1319                 CERROR ("Unexpected rc %d (%d transferred)\n",
1320                         rc, req->rq_bulk->bd_nob_transferred);
1321                 return (-EPROTO);
1322         }
1323
1324         if (rc < aa->aa_requested_nob)
1325                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1326
1327         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1328                 static int cksum_counter;
1329                 u32        server_cksum = body->oa.o_cksum;
1330                 char      *via = "";
1331                 char      *router = "";
1332                 cksum_type_t cksum_type;
1333
1334                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1335                                                body->oa.o_flags : 0);
1336                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1337                                                  aa->aa_ppga, OST_READ,
1338                                                  cksum_type);
1339
1340                 if (peer->nid != req->rq_bulk->bd_sender) {
1341                         via = " via ";
1342                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1343                 }
1344
1345                 if (server_cksum != client_cksum) {
1346                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1347                                            "%s%s%s inode "DFID" object "DOSTID
1348                                            " extent ["LPU64"-"LPU64"]\n",
1349                                            req->rq_import->imp_obd->obd_name,
1350                                            libcfs_nid2str(peer->nid),
1351                                            via, router,
1352                                            body->oa.o_valid & OBD_MD_FLFID ?
1353                                                 body->oa.o_parent_seq : (__u64)0,
1354                                            body->oa.o_valid & OBD_MD_FLFID ?
1355                                                 body->oa.o_parent_oid : 0,
1356                                            body->oa.o_valid & OBD_MD_FLFID ?
1357                                                 body->oa.o_parent_ver : 0,
1358                                            POSTID(&body->oa.o_oi),
1359                                            aa->aa_ppga[0]->off,
1360                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1361                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1362                                                                         1);
1363                         CERROR("client %x, server %x, cksum_type %x\n",
1364                                client_cksum, server_cksum, cksum_type);
1365                         cksum_counter = 0;
1366                         aa->aa_oa->o_cksum = client_cksum;
1367                         rc = -EAGAIN;
1368                 } else {
1369                         cksum_counter++;
1370                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1371                         rc = 0;
1372                 }
1373         } else if (unlikely(client_cksum)) {
1374                 static int cksum_missed;
1375
1376                 cksum_missed++;
1377                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1378                         CERROR("Checksum %u requested from %s but not sent\n",
1379                                cksum_missed, libcfs_nid2str(peer->nid));
1380         } else {
1381                 rc = 0;
1382         }
1383 out:
1384         if (rc >= 0)
1385                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1386                                      aa->aa_oa, &body->oa);
1387
1388         RETURN(rc);
1389 }
1390
1391 static int osc_brw_redo_request(struct ptlrpc_request *request,
1392                                 struct osc_brw_async_args *aa, int rc)
1393 {
1394         struct ptlrpc_request *new_req;
1395         struct osc_brw_async_args *new_aa;
1396         struct osc_async_page *oap;
1397         ENTRY;
1398
1399         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1400                   "redo for recoverable error %d", rc);
1401
1402         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1403                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1404                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1405                                   aa->aa_ppga, &new_req, 1);
1406         if (rc)
1407                 RETURN(rc);
1408
1409         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1410                 if (oap->oap_request != NULL) {
1411                         LASSERTF(request == oap->oap_request,
1412                                  "request %p != oap_request %p\n",
1413                                  request, oap->oap_request);
1414                         if (oap->oap_interrupted) {
1415                                 ptlrpc_req_finished(new_req);
1416                                 RETURN(-EINTR);
1417                         }
1418                 }
1419         }
1420         /* New request takes over pga and oaps from old request.
1421          * Note that copying a list_head doesn't work, need to move it... */
1422         aa->aa_resends++;
1423         new_req->rq_interpret_reply = request->rq_interpret_reply;
1424         new_req->rq_async_args = request->rq_async_args;
1425         new_req->rq_commit_cb = request->rq_commit_cb;
1426         /* cap resend delay to the current request timeout, this is similar to
1427          * what ptlrpc does (see after_reply()) */
1428         if (aa->aa_resends > new_req->rq_timeout)
1429                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1430         else
1431                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1432         new_req->rq_generation_set = 1;
1433         new_req->rq_import_generation = request->rq_import_generation;
1434
1435         new_aa = ptlrpc_req_async_args(new_req);
1436
1437         INIT_LIST_HEAD(&new_aa->aa_oaps);
1438         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1439         INIT_LIST_HEAD(&new_aa->aa_exts);
1440         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1441         new_aa->aa_resends = aa->aa_resends;
1442
1443         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1444                 if (oap->oap_request) {
1445                         ptlrpc_req_finished(oap->oap_request);
1446                         oap->oap_request = ptlrpc_request_addref(new_req);
1447                 }
1448         }
1449
1450         /* XXX: This code will run into problem if we're going to support
1451          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1452          * and wait for all of them to be finished. We should inherit request
1453          * set from old request. */
1454         ptlrpcd_add_req(new_req);
1455
1456         DEBUG_REQ(D_INFO, new_req, "new request");
1457         RETURN(0);
1458 }
1459
1460 /*
1461  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1462  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1463  * fine for our small page arrays and doesn't require allocation.  its an
1464  * insertion sort that swaps elements that are strides apart, shrinking the
1465  * stride down until its '1' and the array is sorted.
1466  */
1467 static void sort_brw_pages(struct brw_page **array, int num)
1468 {
1469         int stride, i, j;
1470         struct brw_page *tmp;
1471
1472         if (num == 1)
1473                 return;
1474         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1475                 ;
1476
1477         do {
1478                 stride /= 3;
1479                 for (i = stride ; i < num ; i++) {
1480                         tmp = array[i];
1481                         j = i;
1482                         while (j >= stride && array[j - stride]->off > tmp->off) {
1483                                 array[j] = array[j - stride];
1484                                 j -= stride;
1485                         }
1486                         array[j] = tmp;
1487                 }
1488         } while (stride > 1);
1489 }
1490
1491 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1492 {
1493         LASSERT(ppga != NULL);
1494         OBD_FREE(ppga, sizeof(*ppga) * count);
1495 }
1496
1497 static int brw_interpret(const struct lu_env *env,
1498                          struct ptlrpc_request *req, void *data, int rc)
1499 {
1500         struct osc_brw_async_args *aa = data;
1501         struct osc_extent *ext;
1502         struct osc_extent *tmp;
1503         struct client_obd *cli = aa->aa_cli;
1504         ENTRY;
1505
1506         rc = osc_brw_fini_request(req, rc);
1507         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1508         /* When server return -EINPROGRESS, client should always retry
1509          * regardless of the number of times the bulk was resent already. */
1510         if (osc_recoverable_error(rc)) {
1511                 if (req->rq_import_generation !=
1512                     req->rq_import->imp_generation) {
1513                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1514                                ""DOSTID", rc = %d.\n",
1515                                req->rq_import->imp_obd->obd_name,
1516                                POSTID(&aa->aa_oa->o_oi), rc);
1517                 } else if (rc == -EINPROGRESS ||
1518                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1519                         rc = osc_brw_redo_request(req, aa, rc);
1520                 } else {
1521                         CERROR("%s: too many resent retries for object: "
1522                                ""LPU64":"LPU64", rc = %d.\n",
1523                                req->rq_import->imp_obd->obd_name,
1524                                POSTID(&aa->aa_oa->o_oi), rc);
1525                 }
1526
1527                 if (rc == 0)
1528                         RETURN(0);
1529                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1530                         rc = -EIO;
1531         }
1532
1533         if (rc == 0) {
1534                 struct obdo *oa = aa->aa_oa;
1535                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1536                 unsigned long valid = 0;
1537                 struct cl_object *obj;
1538                 struct osc_async_page *last;
1539
1540                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1541                 obj = osc2cl(last->oap_obj);
1542
1543                 cl_object_attr_lock(obj);
1544                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1545                         attr->cat_blocks = oa->o_blocks;
1546                         valid |= CAT_BLOCKS;
1547                 }
1548                 if (oa->o_valid & OBD_MD_FLMTIME) {
1549                         attr->cat_mtime = oa->o_mtime;
1550                         valid |= CAT_MTIME;
1551                 }
1552                 if (oa->o_valid & OBD_MD_FLATIME) {
1553                         attr->cat_atime = oa->o_atime;
1554                         valid |= CAT_ATIME;
1555                 }
1556                 if (oa->o_valid & OBD_MD_FLCTIME) {
1557                         attr->cat_ctime = oa->o_ctime;
1558                         valid |= CAT_CTIME;
1559                 }
1560
1561                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1562                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1563                         loff_t last_off = last->oap_count + last->oap_obj_off +
1564                                 last->oap_page_off;
1565
1566                         /* Change file size if this is an out of quota or
1567                          * direct IO write and it extends the file size */
1568                         if (loi->loi_lvb.lvb_size < last_off) {
1569                                 attr->cat_size = last_off;
1570                                 valid |= CAT_SIZE;
1571                         }
1572                         /* Extend KMS if it's not a lockless write */
1573                         if (loi->loi_kms < last_off &&
1574                             oap2osc_page(last)->ops_srvlock == 0) {
1575                                 attr->cat_kms = last_off;
1576                                 valid |= CAT_KMS;
1577                         }
1578                 }
1579
1580                 if (valid != 0)
1581                         cl_object_attr_update(env, obj, attr, valid);
1582                 cl_object_attr_unlock(obj);
1583         }
1584         OBDO_FREE(aa->aa_oa);
1585
1586         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1587                 osc_inc_unstable_pages(req);
1588
1589         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1590                 list_del_init(&ext->oe_link);
1591                 osc_extent_finish(env, ext, 1, rc);
1592         }
1593         LASSERT(list_empty(&aa->aa_exts));
1594         LASSERT(list_empty(&aa->aa_oaps));
1595
1596         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1597                           req->rq_bulk->bd_nob_transferred);
1598         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1599         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1600
1601         spin_lock(&cli->cl_loi_list_lock);
1602         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1603          * is called so we know whether to go to sync BRWs or wait for more
1604          * RPCs to complete */
1605         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1606                 cli->cl_w_in_flight--;
1607         else
1608                 cli->cl_r_in_flight--;
1609         osc_wake_cache_waiters(cli);
1610         spin_unlock(&cli->cl_loi_list_lock);
1611
1612         osc_io_unplug(env, cli, NULL);
1613         RETURN(rc);
1614 }
1615
1616 static void brw_commit(struct ptlrpc_request *req)
1617 {
1618         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1619          * this called via the rq_commit_cb, I need to ensure
1620          * osc_dec_unstable_pages is still called. Otherwise unstable
1621          * pages may be leaked. */
1622         spin_lock(&req->rq_lock);
1623         if (likely(req->rq_unstable)) {
1624                 req->rq_unstable = 0;
1625                 spin_unlock(&req->rq_lock);
1626
1627                 osc_dec_unstable_pages(req);
1628         } else {
1629                 req->rq_committed = 1;
1630                 spin_unlock(&req->rq_lock);
1631         }
1632 }
1633
1634 /**
1635  * Build an RPC by the list of extent @ext_list. The caller must ensure
1636  * that the total pages in this list are NOT over max pages per RPC.
1637  * Extents in the list must be in OES_RPC state.
1638  */
1639 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1640                   struct list_head *ext_list, int cmd)
1641 {
1642         struct ptlrpc_request           *req = NULL;
1643         struct osc_extent               *ext;
1644         struct brw_page                 **pga = NULL;
1645         struct osc_brw_async_args       *aa = NULL;
1646         struct obdo                     *oa = NULL;
1647         struct osc_async_page           *oap;
1648         struct osc_async_page           *tmp;
1649         struct cl_req                   *clerq = NULL;
1650         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1651                                                                       CRT_READ;
1652         struct cl_req_attr              *crattr = NULL;
1653         loff_t                          starting_offset = OBD_OBJECT_EOF;
1654         loff_t                          ending_offset = 0;
1655         int                             mpflag = 0;
1656         int                             mem_tight = 0;
1657         int                             page_count = 0;
1658         bool                            soft_sync = false;
1659         int                             i;
1660         int                             rc;
1661         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1662         struct ost_body                 *body;
1663         ENTRY;
1664         LASSERT(!list_empty(ext_list));
1665
1666         /* add pages into rpc_list to build BRW rpc */
1667         list_for_each_entry(ext, ext_list, oe_link) {
1668                 LASSERT(ext->oe_state == OES_RPC);
1669                 mem_tight |= ext->oe_memalloc;
1670                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1671                         ++page_count;
1672                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1673                         if (starting_offset == OBD_OBJECT_EOF ||
1674                             starting_offset > oap->oap_obj_off)
1675                                 starting_offset = oap->oap_obj_off;
1676                         else
1677                                 LASSERT(oap->oap_page_off == 0);
1678                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1679                                 ending_offset = oap->oap_obj_off +
1680                                                 oap->oap_count;
1681                         else
1682                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1683                                         PAGE_CACHE_SIZE);
1684                 }
1685         }
1686
1687         soft_sync = osc_over_unstable_soft_limit(cli);
1688         if (mem_tight)
1689                 mpflag = cfs_memory_pressure_get_and_set();
1690
1691         OBD_ALLOC(crattr, sizeof(*crattr));
1692         if (crattr == NULL)
1693                 GOTO(out, rc = -ENOMEM);
1694
1695         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1696         if (pga == NULL)
1697                 GOTO(out, rc = -ENOMEM);
1698
1699         OBDO_ALLOC(oa);
1700         if (oa == NULL)
1701                 GOTO(out, rc = -ENOMEM);
1702
1703         i = 0;
1704         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1705                 struct cl_page *page = oap2cl_page(oap);
1706                 if (clerq == NULL) {
1707                         clerq = cl_req_alloc(env, page, crt,
1708                                              1 /* only 1-object rpcs for now */);
1709                         if (IS_ERR(clerq))
1710                                 GOTO(out, rc = PTR_ERR(clerq));
1711                 }
1712                 if (mem_tight)
1713                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1714                 if (soft_sync)
1715                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1716                 pga[i] = &oap->oap_brw_page;
1717                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1718                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1719                        pga[i]->pg, page_index(oap->oap_page), oap,
1720                        pga[i]->flag);
1721                 i++;
1722                 cl_req_page_add(env, clerq, page);
1723         }
1724
1725         /* always get the data for the obdo for the rpc */
1726         LASSERT(clerq != NULL);
1727         crattr->cra_oa = oa;
1728         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1729
1730         rc = cl_req_prep(env, clerq);
1731         if (rc != 0) {
1732                 CERROR("cl_req_prep failed: %d\n", rc);
1733                 GOTO(out, rc);
1734         }
1735
1736         sort_brw_pages(pga, page_count);
1737         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1738         if (rc != 0) {
1739                 CERROR("prep_req failed: %d\n", rc);
1740                 GOTO(out, rc);
1741         }
1742
1743         req->rq_commit_cb = brw_commit;
1744         req->rq_interpret_reply = brw_interpret;
1745
1746         if (mem_tight != 0)
1747                 req->rq_memalloc = 1;
1748
1749         /* Need to update the timestamps after the request is built in case
1750          * we race with setattr (locally or in queue at OST).  If OST gets
1751          * later setattr before earlier BRW (as determined by the request xid),
1752          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1753          * way to do this in a single call.  bug 10150 */
1754         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1755         crattr->cra_oa = &body->oa;
1756         cl_req_attr_set(env, clerq, crattr,
1757                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1758
1759         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1760
1761         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1762         aa = ptlrpc_req_async_args(req);
1763         INIT_LIST_HEAD(&aa->aa_oaps);
1764         list_splice_init(&rpc_list, &aa->aa_oaps);
1765         INIT_LIST_HEAD(&aa->aa_exts);
1766         list_splice_init(ext_list, &aa->aa_exts);
1767         aa->aa_clerq = clerq;
1768
1769         /* queued sync pages can be torn down while the pages
1770          * were between the pending list and the rpc */
1771         tmp = NULL;
1772         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1773                 /* only one oap gets a request reference */
1774                 if (tmp == NULL)
1775                         tmp = oap;
1776                 if (oap->oap_interrupted && !req->rq_intr) {
1777                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1778                                         oap, req);
1779                         ptlrpc_mark_interrupted(req);
1780                 }
1781         }
1782         if (tmp != NULL)
1783                 tmp->oap_request = ptlrpc_request_addref(req);
1784
1785         spin_lock(&cli->cl_loi_list_lock);
1786         starting_offset >>= PAGE_CACHE_SHIFT;
1787         if (cmd == OBD_BRW_READ) {
1788                 cli->cl_r_in_flight++;
1789                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1790                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1791                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1792                                       starting_offset + 1);
1793         } else {
1794                 cli->cl_w_in_flight++;
1795                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1796                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1797                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1798                                       starting_offset + 1);
1799         }
1800         spin_unlock(&cli->cl_loi_list_lock);
1801
1802         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1803                   page_count, aa, cli->cl_r_in_flight,
1804                   cli->cl_w_in_flight);
1805
1806         ptlrpcd_add_req(req);
1807         rc = 0;
1808         EXIT;
1809
1810 out:
1811         if (mem_tight != 0)
1812                 cfs_memory_pressure_restore(mpflag);
1813
1814         if (crattr != NULL)
1815                 OBD_FREE(crattr, sizeof(*crattr));
1816
1817         if (rc != 0) {
1818                 LASSERT(req == NULL);
1819
1820                 if (oa)
1821                         OBDO_FREE(oa);
1822                 if (pga)
1823                         OBD_FREE(pga, sizeof(*pga) * page_count);
1824                 /* this should happen rarely and is pretty bad, it makes the
1825                  * pending list not follow the dirty order */
1826                 while (!list_empty(ext_list)) {
1827                         ext = list_entry(ext_list->next, struct osc_extent,
1828                                          oe_link);
1829                         list_del_init(&ext->oe_link);
1830                         osc_extent_finish(env, ext, 0, rc);
1831                 }
1832                 if (clerq && !IS_ERR(clerq))
1833                         cl_req_completion(env, clerq, rc);
1834         }
1835         RETURN(rc);
1836 }
1837
1838 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1839                                         struct ldlm_enqueue_info *einfo)
1840 {
1841         void *data = einfo->ei_cbdata;
1842         int set = 0;
1843
1844         LASSERT(lock != NULL);
1845         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1846         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1847         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1848         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1849
1850         lock_res_and_lock(lock);
1851
1852         if (lock->l_ast_data == NULL)
1853                 lock->l_ast_data = data;
1854         if (lock->l_ast_data == data)
1855                 set = 1;
1856
1857         unlock_res_and_lock(lock);
1858
1859         return set;
1860 }
1861
1862 static int osc_set_data_with_check(struct lustre_handle *lockh,
1863                                    struct ldlm_enqueue_info *einfo)
1864 {
1865         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1866         int set = 0;
1867
1868         if (lock != NULL) {
1869                 set = osc_set_lock_data_with_check(lock, einfo);
1870                 LDLM_LOCK_PUT(lock);
1871         } else
1872                 CERROR("lockh %p, data %p - client evicted?\n",
1873                        lockh, einfo->ei_cbdata);
1874         return set;
1875 }
1876
1877 static int osc_enqueue_fini(struct ptlrpc_request *req,
1878                             osc_enqueue_upcall_f upcall, void *cookie,
1879                             struct lustre_handle *lockh, ldlm_mode_t mode,
1880                             __u64 *flags, int agl, int errcode)
1881 {
1882         bool intent = *flags & LDLM_FL_HAS_INTENT;
1883         int rc;
1884         ENTRY;
1885
1886         /* The request was created before ldlm_cli_enqueue call. */
1887         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1888                 struct ldlm_reply *rep;
1889
1890                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1891                 LASSERT(rep != NULL);
1892
1893                 rep->lock_policy_res1 =
1894                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1895                 if (rep->lock_policy_res1)
1896                         errcode = rep->lock_policy_res1;
1897                 if (!agl)
1898                         *flags |= LDLM_FL_LVB_READY;
1899         } else if (errcode == ELDLM_OK) {
1900                 *flags |= LDLM_FL_LVB_READY;
1901         }
1902
1903         /* Call the update callback. */
1904         rc = (*upcall)(cookie, lockh, errcode);
1905
1906         /* release the reference taken in ldlm_cli_enqueue() */
1907         if (errcode == ELDLM_LOCK_MATCHED)
1908                 errcode = ELDLM_OK;
1909         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1910                 ldlm_lock_decref(lockh, mode);
1911
1912         RETURN(rc);
1913 }
1914
1915 static int osc_enqueue_interpret(const struct lu_env *env,
1916                                  struct ptlrpc_request *req,
1917                                  struct osc_enqueue_args *aa, int rc)
1918 {
1919         struct ldlm_lock *lock;
1920         struct lustre_handle *lockh = &aa->oa_lockh;
1921         ldlm_mode_t mode = aa->oa_mode;
1922         struct ost_lvb *lvb = aa->oa_lvb;
1923         __u32 lvb_len = sizeof(*lvb);
1924         __u64 flags = 0;
1925
1926         ENTRY;
1927
1928         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1929          * be valid. */
1930         lock = ldlm_handle2lock(lockh);
1931         LASSERTF(lock != NULL,
1932                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1933                  lockh->cookie, req, aa);
1934
1935         /* Take an additional reference so that a blocking AST that
1936          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1937          * to arrive after an upcall has been executed by
1938          * osc_enqueue_fini(). */
1939         ldlm_lock_addref(lockh, mode);
1940
1941         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1942         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1943
1944         /* Let CP AST to grant the lock first. */
1945         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1946
1947         if (aa->oa_agl) {
1948                 LASSERT(aa->oa_lvb == NULL);
1949                 LASSERT(aa->oa_flags == NULL);
1950                 aa->oa_flags = &flags;
1951         }
1952
1953         /* Complete obtaining the lock procedure. */
1954         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1955                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1956                                    lockh, rc);
1957         /* Complete osc stuff. */
1958         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1959                               aa->oa_flags, aa->oa_agl, rc);
1960
1961         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1962
1963         ldlm_lock_decref(lockh, mode);
1964         LDLM_LOCK_PUT(lock);
1965         RETURN(rc);
1966 }
1967
1968 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1969
1970 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1971  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1972  * other synchronous requests, however keeping some locks and trying to obtain
1973  * others may take a considerable amount of time in a case of ost failure; and
1974  * when other sync requests do not get released lock from a client, the client
1975  * is evicted from the cluster -- such scenarious make the life difficult, so
1976  * release locks just after they are obtained. */
1977 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1978                      __u64 *flags, ldlm_policy_data_t *policy,
1979                      struct ost_lvb *lvb, int kms_valid,
1980                      osc_enqueue_upcall_f upcall, void *cookie,
1981                      struct ldlm_enqueue_info *einfo,
1982                      struct ptlrpc_request_set *rqset, int async, int agl)
1983 {
1984         struct obd_device *obd = exp->exp_obd;
1985         struct lustre_handle lockh = { 0 };
1986         struct ptlrpc_request *req = NULL;
1987         int intent = *flags & LDLM_FL_HAS_INTENT;
1988         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1989         ldlm_mode_t mode;
1990         int rc;
1991         ENTRY;
1992
1993         /* Filesystem lock extents are extended to page boundaries so that
1994          * dealing with the page cache is a little smoother.  */
1995         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1996         policy->l_extent.end |= ~PAGE_MASK;
1997
1998         /*
1999          * kms is not valid when either object is completely fresh (so that no
2000          * locks are cached), or object was evicted. In the latter case cached
2001          * lock cannot be used, because it would prime inode state with
2002          * potentially stale LVB.
2003          */
2004         if (!kms_valid)
2005                 goto no_match;
2006
2007         /* Next, search for already existing extent locks that will cover us */
2008         /* If we're trying to read, we also search for an existing PW lock.  The
2009          * VFS and page cache already protect us locally, so lots of readers/
2010          * writers can share a single PW lock.
2011          *
2012          * There are problems with conversion deadlocks, so instead of
2013          * converting a read lock to a write lock, we'll just enqueue a new
2014          * one.
2015          *
2016          * At some point we should cancel the read lock instead of making them
2017          * send us a blocking callback, but there are problems with canceling
2018          * locks out from other users right now, too. */
2019         mode = einfo->ei_mode;
2020         if (einfo->ei_mode == LCK_PR)
2021                 mode |= LCK_PW;
2022         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2023                                einfo->ei_type, policy, mode, &lockh, 0);
2024         if (mode) {
2025                 struct ldlm_lock *matched;
2026
2027                 if (*flags & LDLM_FL_TEST_LOCK)
2028                         RETURN(ELDLM_OK);
2029
2030                 matched = ldlm_handle2lock(&lockh);
2031                 if (agl) {
2032                         /* AGL enqueues DLM locks speculatively. Therefore if
2033                          * it already exists a DLM lock, it wll just inform the
2034                          * caller to cancel the AGL process for this stripe. */
2035                         ldlm_lock_decref(&lockh, mode);
2036                         LDLM_LOCK_PUT(matched);
2037                         RETURN(-ECANCELED);
2038                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2039                         *flags |= LDLM_FL_LVB_READY;
2040
2041                         /* We already have a lock, and it's referenced. */
2042                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2043
2044                         ldlm_lock_decref(&lockh, mode);
2045                         LDLM_LOCK_PUT(matched);
2046                         RETURN(ELDLM_OK);
2047                 } else {
2048                         ldlm_lock_decref(&lockh, mode);
2049                         LDLM_LOCK_PUT(matched);
2050                 }
2051         }
2052
2053 no_match:
2054         if (*flags & LDLM_FL_TEST_LOCK)
2055                 RETURN(-ENOLCK);
2056
2057         if (intent) {
2058                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2059                                            &RQF_LDLM_ENQUEUE_LVB);
2060                 if (req == NULL)
2061                         RETURN(-ENOMEM);
2062
2063                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2064                 if (rc) {
2065                         ptlrpc_request_free(req);
2066                         RETURN(rc);
2067                 }
2068
2069                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2070                                      sizeof *lvb);
2071                 ptlrpc_request_set_replen(req);
2072         }
2073
2074         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2075         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2076
2077         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2078                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2079         if (async) {
2080                 if (!rc) {
2081                         struct osc_enqueue_args *aa;
2082                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2083                         aa = ptlrpc_req_async_args(req);
2084                         aa->oa_exp    = exp;
2085                         aa->oa_mode   = einfo->ei_mode;
2086                         aa->oa_type   = einfo->ei_type;
2087                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2088                         aa->oa_upcall = upcall;
2089                         aa->oa_cookie = cookie;
2090                         aa->oa_agl    = !!agl;
2091                         if (!agl) {
2092                                 aa->oa_flags  = flags;
2093                                 aa->oa_lvb    = lvb;
2094                         } else {
2095                                 /* AGL is essentially to enqueue an DLM lock
2096                                  * in advance, so we don't care about the
2097                                  * result of AGL enqueue. */
2098                                 aa->oa_lvb    = NULL;
2099                                 aa->oa_flags  = NULL;
2100                         }
2101
2102                         req->rq_interpret_reply =
2103                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2104                         if (rqset == PTLRPCD_SET)
2105                                 ptlrpcd_add_req(req);
2106                         else
2107                                 ptlrpc_set_add_req(rqset, req);
2108                 } else if (intent) {
2109                         ptlrpc_req_finished(req);
2110                 }
2111                 RETURN(rc);
2112         }
2113
2114         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2115                               flags, agl, rc);
2116         if (intent)
2117                 ptlrpc_req_finished(req);
2118
2119         RETURN(rc);
2120 }
2121
2122 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2123                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2124                    __u64 *flags, void *data, struct lustre_handle *lockh,
2125                    int unref)
2126 {
2127         struct obd_device *obd = exp->exp_obd;
2128         __u64 lflags = *flags;
2129         ldlm_mode_t rc;
2130         ENTRY;
2131
2132         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2133                 RETURN(-EIO);
2134
2135         /* Filesystem lock extents are extended to page boundaries so that
2136          * dealing with the page cache is a little smoother */
2137         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2138         policy->l_extent.end |= ~PAGE_MASK;
2139
2140         /* Next, search for already existing extent locks that will cover us */
2141         /* If we're trying to read, we also search for an existing PW lock.  The
2142          * VFS and page cache already protect us locally, so lots of readers/
2143          * writers can share a single PW lock. */
2144         rc = mode;
2145         if (mode == LCK_PR)
2146                 rc |= LCK_PW;
2147         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2148                              res_id, type, policy, rc, lockh, unref);
2149         if (rc) {
2150                 if (data != NULL) {
2151                         if (!osc_set_data_with_check(lockh, data)) {
2152                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2153                                         ldlm_lock_decref(lockh, rc);
2154                                 RETURN(0);
2155                         }
2156                 }
2157                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2158                         ldlm_lock_addref(lockh, LCK_PR);
2159                         ldlm_lock_decref(lockh, LCK_PW);
2160                 }
2161                 RETURN(rc);
2162         }
2163         RETURN(rc);
2164 }
2165
2166 static int osc_statfs_interpret(const struct lu_env *env,
2167                                 struct ptlrpc_request *req,
2168                                 struct osc_async_args *aa, int rc)
2169 {
2170         struct obd_statfs *msfs;
2171         ENTRY;
2172
2173         if (rc == -EBADR)
2174                 /* The request has in fact never been sent
2175                  * due to issues at a higher level (LOV).
2176                  * Exit immediately since the caller is
2177                  * aware of the problem and takes care
2178                  * of the clean up */
2179                  RETURN(rc);
2180
2181         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2182             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2183                 GOTO(out, rc = 0);
2184
2185         if (rc != 0)
2186                 GOTO(out, rc);
2187
2188         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2189         if (msfs == NULL) {
2190                 GOTO(out, rc = -EPROTO);
2191         }
2192
2193         *aa->aa_oi->oi_osfs = *msfs;
2194 out:
2195         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2196         RETURN(rc);
2197 }
2198
2199 static int osc_statfs_async(struct obd_export *exp,
2200                             struct obd_info *oinfo, __u64 max_age,
2201                             struct ptlrpc_request_set *rqset)
2202 {
2203         struct obd_device     *obd = class_exp2obd(exp);
2204         struct ptlrpc_request *req;
2205         struct osc_async_args *aa;
2206         int                    rc;
2207         ENTRY;
2208
2209         /* We could possibly pass max_age in the request (as an absolute
2210          * timestamp or a "seconds.usec ago") so the target can avoid doing
2211          * extra calls into the filesystem if that isn't necessary (e.g.
2212          * during mount that would help a bit).  Having relative timestamps
2213          * is not so great if request processing is slow, while absolute
2214          * timestamps are not ideal because they need time synchronization. */
2215         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2216         if (req == NULL)
2217                 RETURN(-ENOMEM);
2218
2219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2220         if (rc) {
2221                 ptlrpc_request_free(req);
2222                 RETURN(rc);
2223         }
2224         ptlrpc_request_set_replen(req);
2225         req->rq_request_portal = OST_CREATE_PORTAL;
2226         ptlrpc_at_set_req_timeout(req);
2227
2228         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2229                 /* procfs requests not want stat in wait for avoid deadlock */
2230                 req->rq_no_resend = 1;
2231                 req->rq_no_delay = 1;
2232         }
2233
2234         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2235         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2236         aa = ptlrpc_req_async_args(req);
2237         aa->aa_oi = oinfo;
2238
2239         ptlrpc_set_add_req(rqset, req);
2240         RETURN(0);
2241 }
2242
2243 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2244                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2245 {
2246         struct obd_device     *obd = class_exp2obd(exp);
2247         struct obd_statfs     *msfs;
2248         struct ptlrpc_request *req;
2249         struct obd_import     *imp = NULL;
2250         int rc;
2251         ENTRY;
2252
2253         /*Since the request might also come from lprocfs, so we need
2254          *sync this with client_disconnect_export Bug15684*/
2255         down_read(&obd->u.cli.cl_sem);
2256         if (obd->u.cli.cl_import)
2257                 imp = class_import_get(obd->u.cli.cl_import);
2258         up_read(&obd->u.cli.cl_sem);
2259         if (!imp)
2260                 RETURN(-ENODEV);
2261
2262         /* We could possibly pass max_age in the request (as an absolute
2263          * timestamp or a "seconds.usec ago") so the target can avoid doing
2264          * extra calls into the filesystem if that isn't necessary (e.g.
2265          * during mount that would help a bit).  Having relative timestamps
2266          * is not so great if request processing is slow, while absolute
2267          * timestamps are not ideal because they need time synchronization. */
2268         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2269
2270         class_import_put(imp);
2271
2272         if (req == NULL)
2273                 RETURN(-ENOMEM);
2274
2275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2276         if (rc) {
2277                 ptlrpc_request_free(req);
2278                 RETURN(rc);
2279         }
2280         ptlrpc_request_set_replen(req);
2281         req->rq_request_portal = OST_CREATE_PORTAL;
2282         ptlrpc_at_set_req_timeout(req);
2283
2284         if (flags & OBD_STATFS_NODELAY) {
2285                 /* procfs requests not want stat in wait for avoid deadlock */
2286                 req->rq_no_resend = 1;
2287                 req->rq_no_delay = 1;
2288         }
2289
2290         rc = ptlrpc_queue_wait(req);
2291         if (rc)
2292                 GOTO(out, rc);
2293
2294         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2295         if (msfs == NULL) {
2296                 GOTO(out, rc = -EPROTO);
2297         }
2298
2299         *osfs = *msfs;
2300
2301         EXIT;
2302  out:
2303         ptlrpc_req_finished(req);
2304         return rc;
2305 }
2306
2307 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2308                          void *karg, void *uarg)
2309 {
2310         struct obd_device *obd = exp->exp_obd;
2311         struct obd_ioctl_data *data = karg;
2312         int err = 0;
2313         ENTRY;
2314
2315         if (!try_module_get(THIS_MODULE)) {
2316                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2317                        module_name(THIS_MODULE));
2318                 return -EINVAL;
2319         }
2320         switch (cmd) {
2321         case OBD_IOC_CLIENT_RECOVER:
2322                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2323                                             data->ioc_inlbuf1, 0);
2324                 if (err > 0)
2325                         err = 0;
2326                 GOTO(out, err);
2327         case IOC_OSC_SET_ACTIVE:
2328                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2329                                                data->ioc_offset);
2330                 GOTO(out, err);
2331         case OBD_IOC_PING_TARGET:
2332                 err = ptlrpc_obd_ping(obd);
2333                 GOTO(out, err);
2334         default:
2335                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2336                        cmd, current_comm());
2337                 GOTO(out, err = -ENOTTY);
2338         }
2339 out:
2340         module_put(THIS_MODULE);
2341         return err;
2342 }
2343
2344 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2345                               u32 keylen, void *key,
2346                               u32 vallen, void *val,
2347                               struct ptlrpc_request_set *set)
2348 {
2349         struct ptlrpc_request *req;
2350         struct obd_device     *obd = exp->exp_obd;
2351         struct obd_import     *imp = class_exp2cliimp(exp);
2352         char                  *tmp;
2353         int                    rc;
2354         ENTRY;
2355
2356         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2357
2358         if (KEY_IS(KEY_CHECKSUM)) {
2359                 if (vallen != sizeof(int))
2360                         RETURN(-EINVAL);
2361                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2362                 RETURN(0);
2363         }
2364
2365         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2366                 sptlrpc_conf_client_adapt(obd);
2367                 RETURN(0);
2368         }
2369
2370         if (KEY_IS(KEY_FLUSH_CTX)) {
2371                 sptlrpc_import_flush_my_ctx(imp);
2372                 RETURN(0);
2373         }
2374
2375         if (KEY_IS(KEY_CACHE_SET)) {
2376                 struct client_obd *cli = &obd->u.cli;
2377
2378                 LASSERT(cli->cl_cache == NULL); /* only once */
2379                 cli->cl_cache = (struct cl_client_cache *)val;
2380                 cl_cache_incref(cli->cl_cache);
2381                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2382
2383                 /* add this osc into entity list */
2384                 LASSERT(list_empty(&cli->cl_lru_osc));
2385                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2386                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2387                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2388
2389                 RETURN(0);
2390         }
2391
2392         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2393                 struct client_obd *cli = &obd->u.cli;
2394                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2395                 long target = *(long *)val;
2396
2397                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2398                 *(long *)val -= nr;
2399                 RETURN(0);
2400         }
2401
2402         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2403                 RETURN(-EINVAL);
2404
2405         /* We pass all other commands directly to OST. Since nobody calls osc
2406            methods directly and everybody is supposed to go through LOV, we
2407            assume lov checked invalid values for us.
2408            The only recognised values so far are evict_by_nid and mds_conn.
2409            Even if something bad goes through, we'd get a -EINVAL from OST
2410            anyway. */
2411
2412         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2413                                                 &RQF_OST_SET_GRANT_INFO :
2414                                                 &RQF_OBD_SET_INFO);
2415         if (req == NULL)
2416                 RETURN(-ENOMEM);
2417
2418         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2419                              RCL_CLIENT, keylen);
2420         if (!KEY_IS(KEY_GRANT_SHRINK))
2421                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2422                                      RCL_CLIENT, vallen);
2423         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2424         if (rc) {
2425                 ptlrpc_request_free(req);
2426                 RETURN(rc);
2427         }
2428
2429         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2430         memcpy(tmp, key, keylen);
2431         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2432                                                         &RMF_OST_BODY :
2433                                                         &RMF_SETINFO_VAL);
2434         memcpy(tmp, val, vallen);
2435
2436         if (KEY_IS(KEY_GRANT_SHRINK)) {
2437                 struct osc_grant_args *aa;
2438                 struct obdo *oa;
2439
2440                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2441                 aa = ptlrpc_req_async_args(req);
2442                 OBDO_ALLOC(oa);
2443                 if (!oa) {
2444                         ptlrpc_req_finished(req);
2445                         RETURN(-ENOMEM);
2446                 }
2447                 *oa = ((struct ost_body *)val)->oa;
2448                 aa->aa_oa = oa;
2449                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2450         }
2451
2452         ptlrpc_request_set_replen(req);
2453         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2454                 LASSERT(set != NULL);
2455                 ptlrpc_set_add_req(set, req);
2456                 ptlrpc_check_set(NULL, set);
2457         } else {
2458                 ptlrpcd_add_req(req);
2459         }
2460
2461         RETURN(0);
2462 }
2463
2464 static int osc_reconnect(const struct lu_env *env,
2465                          struct obd_export *exp, struct obd_device *obd,
2466                          struct obd_uuid *cluuid,
2467                          struct obd_connect_data *data,
2468                          void *localdata)
2469 {
2470         struct client_obd *cli = &obd->u.cli;
2471
2472         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2473                 long lost_grant;
2474
2475                 spin_lock(&cli->cl_loi_list_lock);
2476                 data->ocd_grant = (cli->cl_avail_grant +
2477                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2478                                   2 * cli_brw_size(obd);
2479                 lost_grant = cli->cl_lost_grant;
2480                 cli->cl_lost_grant = 0;
2481                 spin_unlock(&cli->cl_loi_list_lock);
2482
2483                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2484                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2485                        data->ocd_version, data->ocd_grant, lost_grant);
2486         }
2487
2488         RETURN(0);
2489 }
2490
2491 static int osc_disconnect(struct obd_export *exp)
2492 {
2493         struct obd_device *obd = class_exp2obd(exp);
2494         int rc;
2495
2496         rc = client_disconnect_export(exp);
2497         /**
2498          * Initially we put del_shrink_grant before disconnect_export, but it
2499          * causes the following problem if setup (connect) and cleanup
2500          * (disconnect) are tangled together.
2501          *      connect p1                     disconnect p2
2502          *   ptlrpc_connect_import
2503          *     ...............               class_manual_cleanup
2504          *                                     osc_disconnect
2505          *                                     del_shrink_grant
2506          *   ptlrpc_connect_interrupt
2507          *     init_grant_shrink
2508          *   add this client to shrink list
2509          *                                      cleanup_osc
2510          * Bang! pinger trigger the shrink.
2511          * So the osc should be disconnected from the shrink list, after we
2512          * are sure the import has been destroyed. BUG18662
2513          */
2514         if (obd->u.cli.cl_import == NULL)
2515                 osc_del_shrink_grant(&obd->u.cli);
2516         return rc;
2517 }
2518
2519 static int osc_import_event(struct obd_device *obd,
2520                             struct obd_import *imp,
2521                             enum obd_import_event event)
2522 {
2523         struct client_obd *cli;
2524         int rc = 0;
2525
2526         ENTRY;
2527         LASSERT(imp->imp_obd == obd);
2528
2529         switch (event) {
2530         case IMP_EVENT_DISCON: {
2531                 cli = &obd->u.cli;
2532                 spin_lock(&cli->cl_loi_list_lock);
2533                 cli->cl_avail_grant = 0;
2534                 cli->cl_lost_grant = 0;
2535                 spin_unlock(&cli->cl_loi_list_lock);
2536                 break;
2537         }
2538         case IMP_EVENT_INACTIVE: {
2539                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2540                 break;
2541         }
2542         case IMP_EVENT_INVALIDATE: {
2543                 struct ldlm_namespace *ns = obd->obd_namespace;
2544                 struct lu_env         *env;
2545                 int                    refcheck;
2546
2547                 env = cl_env_get(&refcheck);
2548                 if (!IS_ERR(env)) {
2549                         /* Reset grants */
2550                         cli = &obd->u.cli;
2551                         /* all pages go to failing rpcs due to the invalid
2552                          * import */
2553                         osc_io_unplug(env, cli, NULL);
2554
2555                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2556                         cl_env_put(env, &refcheck);
2557                 } else
2558                         rc = PTR_ERR(env);
2559                 break;
2560         }
2561         case IMP_EVENT_ACTIVE: {
2562                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2563                 break;
2564         }
2565         case IMP_EVENT_OCD: {
2566                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2567
2568                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2569                         osc_init_grant(&obd->u.cli, ocd);
2570
2571                 /* See bug 7198 */
2572                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2573                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2574
2575                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2576                 break;
2577         }
2578         case IMP_EVENT_DEACTIVATE: {
2579                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2580                 break;
2581         }
2582         case IMP_EVENT_ACTIVATE: {
2583                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2584                 break;
2585         }
2586         default:
2587                 CERROR("Unknown import event %d\n", event);
2588                 LBUG();
2589         }
2590         RETURN(rc);
2591 }
2592
2593 /**
2594  * Determine whether the lock can be canceled before replaying the lock
2595  * during recovery, see bug16774 for detailed information.
2596  *
2597  * \retval zero the lock can't be canceled
2598  * \retval other ok to cancel
2599  */
2600 static int osc_cancel_weight(struct ldlm_lock *lock)
2601 {
2602         /*
2603          * Cancel all unused and granted extent lock.
2604          */
2605         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2606             lock->l_granted_mode == lock->l_req_mode &&
2607             osc_ldlm_weigh_ast(lock) == 0)
2608                 RETURN(1);
2609
2610         RETURN(0);
2611 }
2612
2613 static int brw_queue_work(const struct lu_env *env, void *data)
2614 {
2615         struct client_obd *cli = data;
2616
2617         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2618
2619         osc_io_unplug(env, cli, NULL);
2620         RETURN(0);
2621 }
2622
2623 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2624 {
2625         struct client_obd *cli = &obd->u.cli;
2626         struct obd_type   *type;
2627         void              *handler;
2628         int                rc;
2629         int                adding;
2630         int                added;
2631         int                req_count;
2632         ENTRY;
2633
2634         rc = ptlrpcd_addref();
2635         if (rc)
2636                 RETURN(rc);
2637
2638         rc = client_obd_setup(obd, lcfg);
2639         if (rc)
2640                 GOTO(out_ptlrpcd, rc);
2641
2642         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2643         if (IS_ERR(handler))
2644                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2645         cli->cl_writeback_work = handler;
2646
2647         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2648         if (IS_ERR(handler))
2649                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2650         cli->cl_lru_work = handler;
2651
2652         rc = osc_quota_setup(obd);
2653         if (rc)
2654                 GOTO(out_ptlrpcd_work, rc);
2655
2656         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2657
2658 #ifdef CONFIG_PROC_FS
2659         obd->obd_vars = lprocfs_osc_obd_vars;
2660 #endif
2661         /* If this is true then both client (osc) and server (osp) are on the
2662          * same node. The osp layer if loaded first will register the osc proc
2663          * directory. In that case this obd_device will be attached its proc
2664          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2665         type = class_search_type(LUSTRE_OSP_NAME);
2666         if (type && type->typ_procsym) {
2667                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2668                                                        type->typ_procsym,
2669                                                        obd->obd_vars, obd);
2670                 if (IS_ERR(obd->obd_proc_entry)) {
2671                         rc = PTR_ERR(obd->obd_proc_entry);
2672                         CERROR("error %d setting up lprocfs for %s\n", rc,
2673                                obd->obd_name);
2674                         obd->obd_proc_entry = NULL;
2675                 }
2676         } else {
2677                 rc = lprocfs_obd_setup(obd);
2678         }
2679
2680         /* If the basic OSC proc tree construction succeeded then
2681          * lets do the rest. */
2682         if (rc == 0) {
2683                 lproc_osc_attach_seqstat(obd);
2684                 sptlrpc_lprocfs_cliobd_attach(obd);
2685                 ptlrpc_lprocfs_register_obd(obd);
2686         }
2687
2688         /*
2689          * We try to control the total number of requests with a upper limit
2690          * osc_reqpool_maxreqcount. There might be some race which will cause
2691          * over-limit allocation, but it is fine.
2692          */
2693         req_count = atomic_read(&osc_pool_req_count);
2694         if (req_count < osc_reqpool_maxreqcount) {
2695                 adding = cli->cl_max_rpcs_in_flight + 2;
2696                 if (req_count + adding > osc_reqpool_maxreqcount)
2697                         adding = osc_reqpool_maxreqcount - req_count;
2698
2699                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2700                 atomic_add(added, &osc_pool_req_count);
2701         }
2702
2703         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2704         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2705         RETURN(0);
2706
2707 out_ptlrpcd_work:
2708         if (cli->cl_writeback_work != NULL) {
2709                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2710                 cli->cl_writeback_work = NULL;
2711         }
2712         if (cli->cl_lru_work != NULL) {
2713                 ptlrpcd_destroy_work(cli->cl_lru_work);
2714                 cli->cl_lru_work = NULL;
2715         }
2716 out_client_setup:
2717         client_obd_cleanup(obd);
2718 out_ptlrpcd:
2719         ptlrpcd_decref();
2720         RETURN(rc);
2721 }
2722
2723 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2724 {
2725         int rc = 0;
2726         ENTRY;
2727
2728         switch (stage) {
2729         case OBD_CLEANUP_EARLY: {
2730                 struct obd_import *imp;
2731                 imp = obd->u.cli.cl_import;
2732                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2733                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2734                 ptlrpc_deactivate_import(imp);
2735                 spin_lock(&imp->imp_lock);
2736                 imp->imp_pingable = 0;
2737                 spin_unlock(&imp->imp_lock);
2738                 break;
2739         }
2740         case OBD_CLEANUP_EXPORTS: {
2741                 struct client_obd *cli = &obd->u.cli;
2742                 /* LU-464
2743                  * for echo client, export may be on zombie list, wait for
2744                  * zombie thread to cull it, because cli.cl_import will be
2745                  * cleared in client_disconnect_export():
2746                  *   class_export_destroy() -> obd_cleanup() ->
2747                  *   echo_device_free() -> echo_client_cleanup() ->
2748                  *   obd_disconnect() -> osc_disconnect() ->
2749                  *   client_disconnect_export()
2750                  */
2751                 obd_zombie_barrier();
2752                 if (cli->cl_writeback_work) {
2753                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2754                         cli->cl_writeback_work = NULL;
2755                 }
2756                 if (cli->cl_lru_work) {
2757                         ptlrpcd_destroy_work(cli->cl_lru_work);
2758                         cli->cl_lru_work = NULL;
2759                 }
2760                 obd_cleanup_client_import(obd);
2761                 ptlrpc_lprocfs_unregister_obd(obd);
2762                 lprocfs_obd_cleanup(obd);
2763                 break;
2764                 }
2765         }
2766         RETURN(rc);
2767 }
2768
2769 int osc_cleanup(struct obd_device *obd)
2770 {
2771         struct client_obd *cli = &obd->u.cli;
2772         int rc;
2773
2774         ENTRY;
2775
2776         /* lru cleanup */
2777         if (cli->cl_cache != NULL) {
2778                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2779                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2780                 list_del_init(&cli->cl_lru_osc);
2781                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2782                 cli->cl_lru_left = NULL;
2783                 cl_cache_decref(cli->cl_cache);
2784                 cli->cl_cache = NULL;
2785         }
2786
2787         /* free memory of osc quota cache */
2788         osc_quota_cleanup(obd);
2789
2790         rc = client_obd_cleanup(obd);
2791
2792         ptlrpcd_decref();
2793         RETURN(rc);
2794 }
2795
2796 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2797 {
2798         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2799         return rc > 0 ? 0: rc;
2800 }
2801
2802 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2803 {
2804         return osc_process_config_base(obd, buf);
2805 }
2806
2807 static struct obd_ops osc_obd_ops = {
2808         .o_owner                = THIS_MODULE,
2809         .o_setup                = osc_setup,
2810         .o_precleanup           = osc_precleanup,
2811         .o_cleanup              = osc_cleanup,
2812         .o_add_conn             = client_import_add_conn,
2813         .o_del_conn             = client_import_del_conn,
2814         .o_connect              = client_connect_import,
2815         .o_reconnect            = osc_reconnect,
2816         .o_disconnect           = osc_disconnect,
2817         .o_statfs               = osc_statfs,
2818         .o_statfs_async         = osc_statfs_async,
2819         .o_create               = osc_create,
2820         .o_destroy              = osc_destroy,
2821         .o_getattr              = osc_getattr,
2822         .o_setattr              = osc_setattr,
2823         .o_iocontrol            = osc_iocontrol,
2824         .o_set_info_async       = osc_set_info_async,
2825         .o_import_event         = osc_import_event,
2826         .o_process_config       = osc_process_config,
2827         .o_quotactl             = osc_quotactl,
2828 };
2829
2830 static int __init osc_init(void)
2831 {
2832         bool enable_proc = true;
2833         struct obd_type *type;
2834         unsigned int reqpool_size;
2835         unsigned int reqsize;
2836         int rc;
2837
2838         ENTRY;
2839
2840         /* print an address of _any_ initialized kernel symbol from this
2841          * module, to allow debugging with gdb that doesn't support data
2842          * symbols from modules.*/
2843         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2844
2845         rc = lu_kmem_init(osc_caches);
2846         if (rc)
2847                 RETURN(rc);
2848
2849         type = class_search_type(LUSTRE_OSP_NAME);
2850         if (type != NULL && type->typ_procsym != NULL)
2851                 enable_proc = false;
2852
2853         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2854                                  LUSTRE_OSC_NAME, &osc_device_type);
2855         if (rc)
2856                 GOTO(out_kmem, rc);
2857
2858         /* This is obviously too much memory, only prevent overflow here */
2859         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2860                 GOTO(out_type, rc = -EINVAL);
2861
2862         reqpool_size = osc_reqpool_mem_max << 20;
2863
2864         reqsize = 1;
2865         while (reqsize < OST_IO_MAXREQSIZE)
2866                 reqsize = reqsize << 1;
2867
2868         /*
2869          * We don't enlarge the request count in OSC pool according to
2870          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2871          * tried after normal allocation failed. So a small OSC pool won't
2872          * cause much performance degression in most of cases.
2873          */
2874         osc_reqpool_maxreqcount = reqpool_size / reqsize;
2875
2876         atomic_set(&osc_pool_req_count, 0);
2877         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2878                                           ptlrpc_add_rqs_to_pool);
2879
2880         if (osc_rq_pool != NULL)
2881                 GOTO(out, rc);
2882         rc = -ENOMEM;
2883 out_type:
2884         class_unregister_type(LUSTRE_OSC_NAME);
2885 out_kmem:
2886         lu_kmem_fini(osc_caches);
2887 out:
2888         RETURN(rc);
2889 }
2890
2891 static void /*__exit*/ osc_exit(void)
2892 {
2893         class_unregister_type(LUSTRE_OSC_NAME);
2894         lu_kmem_fini(osc_caches);
2895         ptlrpc_free_rq_pool(osc_rq_pool);
2896 }
2897
2898 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2899 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2900 MODULE_VERSION(LUSTRE_VERSION_STRING);
2901 MODULE_LICENSE("GPL");
2902
2903 module_init(osc_init);
2904 module_exit(osc_exit);