Whamcloud - gitweb
3938d60bd3e037389c13a83f5f31d0cbb8cc9997
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lustre/lustre_user.h>
38
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
48 #include <obd.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
51
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
54
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
58
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
62
63 struct osc_brw_async_args {
64         struct obdo              *aa_oa;
65         int                       aa_requested_nob;
66         int                       aa_nio_count;
67         u32                       aa_page_count;
68         int                       aa_resends;
69         struct brw_page **aa_ppga;
70         struct client_obd        *aa_cli;
71         struct list_head          aa_oaps;
72         struct list_head          aa_exts;
73 };
74
75 #define osc_grant_args osc_brw_async_args
76
77 struct osc_setattr_args {
78         struct obdo             *sa_oa;
79         obd_enqueue_update_f     sa_upcall;
80         void                    *sa_cookie;
81 };
82
83 struct osc_fsync_args {
84         struct osc_object       *fa_obj;
85         struct obdo             *fa_oa;
86         obd_enqueue_update_f    fa_upcall;
87         void                    *fa_cookie;
88 };
89
90 struct osc_ladvise_args {
91         struct obdo             *la_oa;
92         obd_enqueue_update_f     la_upcall;
93         void                    *la_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export       *oa_exp;
98         enum ldlm_type          oa_type;
99         enum ldlm_mode          oa_mode;
100         __u64                   *oa_flags;
101         osc_enqueue_upcall_f    oa_upcall;
102         void                    *oa_cookie;
103         struct ost_lvb          *oa_lvb;
104         struct lustre_handle    oa_lockh;
105         unsigned int            oa_agl:1;
106 };
107
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110                          void *data, int rc);
111
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
113 {
114         struct ost_body *body;
115
116         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117         LASSERT(body);
118
119         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 }
121
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123                        struct obdo *oa)
124 {
125         struct ptlrpc_request   *req;
126         struct ost_body         *body;
127         int                      rc;
128
129         ENTRY;
130         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
131         if (req == NULL)
132                 RETURN(-ENOMEM);
133
134         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
135         if (rc) {
136                 ptlrpc_request_free(req);
137                 RETURN(rc);
138         }
139
140         osc_pack_req_body(req, oa);
141
142         ptlrpc_request_set_replen(req);
143
144         rc = ptlrpc_queue_wait(req);
145         if (rc)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body == NULL)
150                 GOTO(out, rc = -EPROTO);
151
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
154
155         oa->o_blksize = cli_brw_size(exp->exp_obd);
156         oa->o_valid |= OBD_MD_FLBLKSZ;
157
158         EXIT;
159 out:
160         ptlrpc_req_finished(req);
161
162         return rc;
163 }
164
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166                        struct obdo *oa)
167 {
168         struct ptlrpc_request   *req;
169         struct ost_body         *body;
170         int                      rc;
171
172         ENTRY;
173         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
180         if (rc) {
181                 ptlrpc_request_free(req);
182                 RETURN(rc);
183         }
184
185         osc_pack_req_body(req, oa);
186
187         ptlrpc_request_set_replen(req);
188
189         rc = ptlrpc_queue_wait(req);
190         if (rc)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
198
199         EXIT;
200 out:
201         ptlrpc_req_finished(req);
202
203         RETURN(rc);
204 }
205
206 static int osc_setattr_interpret(const struct lu_env *env,
207                                  struct ptlrpc_request *req,
208                                  struct osc_setattr_args *sa, int rc)
209 {
210         struct ost_body *body;
211         ENTRY;
212
213         if (rc != 0)
214                 GOTO(out, rc);
215
216         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         if (body == NULL)
218                 GOTO(out, rc = -EPROTO);
219
220         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221                              &body->oa);
222 out:
223         rc = sa->sa_upcall(sa->sa_cookie, rc);
224         RETURN(rc);
225 }
226
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228                       obd_enqueue_update_f upcall, void *cookie,
229                       struct ptlrpc_request_set *rqset)
230 {
231         struct ptlrpc_request   *req;
232         struct osc_setattr_args *sa;
233         int                      rc;
234
235         ENTRY;
236
237         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
242         if (rc) {
243                 ptlrpc_request_free(req);
244                 RETURN(rc);
245         }
246
247         osc_pack_req_body(req, oa);
248
249         ptlrpc_request_set_replen(req);
250
251         /* do mds to ost setattr asynchronously */
252         if (!rqset) {
253                 /* Do not wait for response. */
254                 ptlrpcd_add_req(req);
255         } else {
256                 req->rq_interpret_reply =
257                         (ptlrpc_interpterer_t)osc_setattr_interpret;
258
259                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260                 sa = ptlrpc_req_async_args(req);
261                 sa->sa_oa = oa;
262                 sa->sa_upcall = upcall;
263                 sa->sa_cookie = cookie;
264
265                 if (rqset == PTLRPCD_SET)
266                         ptlrpcd_add_req(req);
267                 else
268                         ptlrpc_set_add_req(rqset, req);
269         }
270
271         RETURN(0);
272 }
273
274 static int osc_ladvise_interpret(const struct lu_env *env,
275                                  struct ptlrpc_request *req,
276                                  void *arg, int rc)
277 {
278         struct osc_ladvise_args *la = arg;
279         struct ost_body *body;
280         ENTRY;
281
282         if (rc != 0)
283                 GOTO(out, rc);
284
285         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286         if (body == NULL)
287                 GOTO(out, rc = -EPROTO);
288
289         *la->la_oa = body->oa;
290 out:
291         rc = la->la_upcall(la->la_cookie, rc);
292         RETURN(rc);
293 }
294
295 /**
296  * If rqset is NULL, do not wait for response. Upcall and cookie could also
297  * be NULL in this case
298  */
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300                      struct ladvise_hdr *ladvise_hdr,
301                      obd_enqueue_update_f upcall, void *cookie,
302                      struct ptlrpc_request_set *rqset)
303 {
304         struct ptlrpc_request   *req;
305         struct ost_body         *body;
306         struct osc_ladvise_args *la;
307         int                      rc;
308         struct lu_ladvise       *req_ladvise;
309         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
310         int                      num_advise = ladvise_hdr->lah_count;
311         struct ladvise_hdr      *req_ladvise_hdr;
312         ENTRY;
313
314         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
315         if (req == NULL)
316                 RETURN(-ENOMEM);
317
318         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319                              num_advise * sizeof(*ladvise));
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
321         if (rc != 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325         req->rq_request_portal = OST_IO_PORTAL;
326         ptlrpc_at_set_req_timeout(req);
327
328         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
329         LASSERT(body);
330         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331                              oa);
332
333         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334                                                  &RMF_OST_LADVISE_HDR);
335         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
336
337         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339         ptlrpc_request_set_replen(req);
340
341         if (rqset == NULL) {
342                 /* Do not wait for response. */
343                 ptlrpcd_add_req(req);
344                 RETURN(0);
345         }
346
347         req->rq_interpret_reply = osc_ladvise_interpret;
348         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349         la = ptlrpc_req_async_args(req);
350         la->la_oa = oa;
351         la->la_upcall = upcall;
352         la->la_cookie = cookie;
353
354         if (rqset == PTLRPCD_SET)
355                 ptlrpcd_add_req(req);
356         else
357                 ptlrpc_set_add_req(rqset, req);
358
359         RETURN(0);
360 }
361
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363                       struct obdo *oa)
364 {
365         struct ptlrpc_request *req;
366         struct ost_body       *body;
367         int                    rc;
368         ENTRY;
369
370         LASSERT(oa != NULL);
371         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
375         if (req == NULL)
376                 GOTO(out, rc = -ENOMEM);
377
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 GOTO(out, rc);
382         }
383
384         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385         LASSERT(body);
386
387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
388
389         ptlrpc_request_set_replen(req);
390
391         rc = ptlrpc_queue_wait(req);
392         if (rc)
393                 GOTO(out_req, rc);
394
395         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
396         if (body == NULL)
397                 GOTO(out_req, rc = -EPROTO);
398
399         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
401
402         oa->o_blksize = cli_brw_size(exp->exp_obd);
403         oa->o_valid |= OBD_MD_FLBLKSZ;
404
405         CDEBUG(D_HA, "transno: %lld\n",
406                lustre_msg_get_transno(req->rq_repmsg));
407 out_req:
408         ptlrpc_req_finished(req);
409 out:
410         RETURN(rc);
411 }
412
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414                    obd_enqueue_update_f upcall, void *cookie,
415                    struct ptlrpc_request_set *rqset)
416 {
417         struct ptlrpc_request   *req;
418         struct osc_setattr_args *sa;
419         struct ost_body         *body;
420         int                      rc;
421         ENTRY;
422
423         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
424         if (req == NULL)
425                 RETURN(-ENOMEM);
426
427         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(rc);
431         }
432         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433         ptlrpc_at_set_req_timeout(req);
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
438
439         ptlrpc_request_set_replen(req);
440
441         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443         sa = ptlrpc_req_async_args(req);
444         sa->sa_oa = oa;
445         sa->sa_upcall = upcall;
446         sa->sa_cookie = cookie;
447         if (rqset == PTLRPCD_SET)
448                 ptlrpcd_add_req(req);
449         else
450                 ptlrpc_set_add_req(rqset, req);
451
452         RETURN(0);
453 }
454
455 static int osc_sync_interpret(const struct lu_env *env,
456                               struct ptlrpc_request *req,
457                               void *arg, int rc)
458 {
459         struct osc_fsync_args   *fa = arg;
460         struct ost_body         *body;
461         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
462         unsigned long           valid = 0;
463         struct cl_object        *obj;
464         ENTRY;
465
466         if (rc != 0)
467                 GOTO(out, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL) {
471                 CERROR("can't unpack ost_body\n");
472                 GOTO(out, rc = -EPROTO);
473         }
474
475         *fa->fa_oa = body->oa;
476         obj = osc2cl(fa->fa_obj);
477
478         /* Update osc object's blocks attribute */
479         cl_object_attr_lock(obj);
480         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481                 attr->cat_blocks = body->oa.o_blocks;
482                 valid |= CAT_BLOCKS;
483         }
484
485         if (valid != 0)
486                 cl_object_attr_update(env, obj, attr, valid);
487         cl_object_attr_unlock(obj);
488
489 out:
490         rc = fa->fa_upcall(fa->fa_cookie, rc);
491         RETURN(rc);
492 }
493
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495                   obd_enqueue_update_f upcall, void *cookie,
496                   struct ptlrpc_request_set *rqset)
497 {
498         struct obd_export     *exp = osc_export(obj);
499         struct ptlrpc_request *req;
500         struct ost_body       *body;
501         struct osc_fsync_args *fa;
502         int                    rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
506         if (req == NULL)
507                 RETURN(-ENOMEM);
508
509         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(rc);
513         }
514
515         /* overload the size and blocks fields in the oa with start/end */
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521         req->rq_interpret_reply = osc_sync_interpret;
522
523         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524         fa = ptlrpc_req_async_args(req);
525         fa->fa_obj = obj;
526         fa->fa_oa = oa;
527         fa->fa_upcall = upcall;
528         fa->fa_cookie = cookie;
529
530         if (rqset == PTLRPCD_SET)
531                 ptlrpcd_add_req(req);
532         else
533                 ptlrpc_set_add_req(rqset, req);
534
535         RETURN (0);
536 }
537
538 /* Find and cancel locally locks matched by @mode in the resource found by
539  * @objid. Found locks are added into @cancel list. Returns the amount of
540  * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542                                    struct list_head *cancels,
543                                    enum ldlm_mode mode, __u64 lock_flags)
544 {
545         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546         struct ldlm_res_id res_id;
547         struct ldlm_resource *res;
548         int count;
549         ENTRY;
550
551         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552          * export) but disabled through procfs (flag in NS).
553          *
554          * This distinguishes from a case when ELC is not supported originally,
555          * when we still want to cancel locks in advance and just cancel them
556          * locally, without sending any RPC. */
557         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558                 RETURN(0);
559
560         ostid_build_res_name(&oa->o_oi, &res_id);
561         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
562         if (IS_ERR(res))
563                 RETURN(0);
564
565         LDLM_RESOURCE_ADDREF(res);
566         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567                                            lock_flags, 0, NULL);
568         LDLM_RESOURCE_DELREF(res);
569         ldlm_resource_putref(res);
570         RETURN(count);
571 }
572
573 static int osc_destroy_interpret(const struct lu_env *env,
574                                  struct ptlrpc_request *req, void *data,
575                                  int rc)
576 {
577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
578
579         atomic_dec(&cli->cl_destroy_in_flight);
580         wake_up(&cli->cl_destroy_waitq);
581         return 0;
582 }
583
584 static int osc_can_send_destroy(struct client_obd *cli)
585 {
586         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587             cli->cl_max_rpcs_in_flight) {
588                 /* The destroy request can be sent */
589                 return 1;
590         }
591         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592             cli->cl_max_rpcs_in_flight) {
593                 /*
594                  * The counter has been modified between the two atomic
595                  * operations.
596                  */
597                 wake_up(&cli->cl_destroy_waitq);
598         }
599         return 0;
600 }
601
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603                        struct obdo *oa)
604 {
605         struct client_obd     *cli = &exp->exp_obd->u.cli;
606         struct ptlrpc_request *req;
607         struct ost_body       *body;
608         struct list_head       cancels = LIST_HEAD_INIT(cancels);
609         int rc, count;
610         ENTRY;
611
612         if (!oa) {
613                 CDEBUG(D_INFO, "oa NULL\n");
614                 RETURN(-EINVAL);
615         }
616
617         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618                                         LDLM_FL_DISCARD_DATA);
619
620         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
621         if (req == NULL) {
622                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
623                 RETURN(-ENOMEM);
624         }
625
626         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627                                0, &cancels, count);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 RETURN(rc);
631         }
632
633         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634         ptlrpc_at_set_req_timeout(req);
635
636         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
637         LASSERT(body);
638         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
639
640         ptlrpc_request_set_replen(req);
641
642         req->rq_interpret_reply = osc_destroy_interpret;
643         if (!osc_can_send_destroy(cli)) {
644                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645
646                 /*
647                  * Wait until the number of on-going destroy RPCs drops
648                  * under max_rpc_in_flight
649                  */
650                 l_wait_event_exclusive(cli->cl_destroy_waitq,
651                                        osc_can_send_destroy(cli), &lwi);
652         }
653
654         /* Do not wait for response */
655         ptlrpcd_add_req(req);
656         RETURN(0);
657 }
658
659 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660                                 long writing_bytes)
661 {
662         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
663
664         LASSERT(!(oa->o_valid & bits));
665
666         oa->o_valid |= bits;
667         spin_lock(&cli->cl_loi_list_lock);
668         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
669                 oa->o_dirty = cli->cl_dirty_grant;
670         else
671                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
672         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
673                      cli->cl_dirty_max_pages)) {
674                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
675                        cli->cl_dirty_pages, cli->cl_dirty_transit,
676                        cli->cl_dirty_max_pages);
677                 oa->o_undirty = 0;
678         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
679                             atomic_long_read(&obd_dirty_transit_pages) >
680                             (long)(obd_max_dirty_pages + 1))) {
681                 /* The atomic_read() allowing the atomic_inc() are
682                  * not covered by a lock thus they may safely race and trip
683                  * this CERROR() unless we add in a small fudge factor (+1). */
684                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
685                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
686                        atomic_long_read(&obd_dirty_transit_pages),
687                        obd_max_dirty_pages);
688                 oa->o_undirty = 0;
689         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
690                             0x7fffffff)) {
691                 CERROR("dirty %lu - dirty_max %lu too big???\n",
692                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
693                 oa->o_undirty = 0;
694         } else {
695                 unsigned long nrpages;
696
697                 nrpages = cli->cl_max_pages_per_rpc;
698                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
699                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
700                 oa->o_undirty = nrpages << PAGE_SHIFT;
701                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
702                                  GRANT_PARAM)) {
703                         int nrextents;
704
705                         /* take extent tax into account when asking for more
706                          * grant space */
707                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
708                                      cli->cl_max_extent_pages;
709                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
710                 }
711         }
712         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
713         oa->o_dropped = cli->cl_lost_grant;
714         cli->cl_lost_grant = 0;
715         spin_unlock(&cli->cl_loi_list_lock);
716         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
717                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
718 }
719
720 void osc_update_next_shrink(struct client_obd *cli)
721 {
722         cli->cl_next_shrink_grant =
723                 cfs_time_shift(cli->cl_grant_shrink_interval);
724         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
725                cli->cl_next_shrink_grant);
726 }
727
728 static void __osc_update_grant(struct client_obd *cli, u64 grant)
729 {
730         spin_lock(&cli->cl_loi_list_lock);
731         cli->cl_avail_grant += grant;
732         spin_unlock(&cli->cl_loi_list_lock);
733 }
734
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
736 {
737         if (body->oa.o_valid & OBD_MD_FLGRANT) {
738                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
739                 __osc_update_grant(cli, body->oa.o_grant);
740         }
741 }
742
743 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
744                               u32 keylen, void *key,
745                               u32 vallen, void *val,
746                               struct ptlrpc_request_set *set);
747
748 static int osc_shrink_grant_interpret(const struct lu_env *env,
749                                       struct ptlrpc_request *req,
750                                       void *aa, int rc)
751 {
752         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
753         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
754         struct ost_body *body;
755
756         if (rc != 0) {
757                 __osc_update_grant(cli, oa->o_grant);
758                 GOTO(out, rc);
759         }
760
761         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
762         LASSERT(body);
763         osc_update_grant(cli, body);
764 out:
765         OBDO_FREE(oa);
766         return rc;
767 }
768
769 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
770 {
771         spin_lock(&cli->cl_loi_list_lock);
772         oa->o_grant = cli->cl_avail_grant / 4;
773         cli->cl_avail_grant -= oa->o_grant;
774         spin_unlock(&cli->cl_loi_list_lock);
775         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
776                 oa->o_valid |= OBD_MD_FLFLAGS;
777                 oa->o_flags = 0;
778         }
779         oa->o_flags |= OBD_FL_SHRINK_GRANT;
780         osc_update_next_shrink(cli);
781 }
782
783 /* Shrink the current grant, either from some large amount to enough for a
784  * full set of in-flight RPCs, or if we have already shrunk to that limit
785  * then to enough for a single RPC.  This avoids keeping more grant than
786  * needed, and avoids shrinking the grant piecemeal. */
787 static int osc_shrink_grant(struct client_obd *cli)
788 {
789         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
790                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
791
792         spin_lock(&cli->cl_loi_list_lock);
793         if (cli->cl_avail_grant <= target_bytes)
794                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
795         spin_unlock(&cli->cl_loi_list_lock);
796
797         return osc_shrink_grant_to_target(cli, target_bytes);
798 }
799
800 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
801 {
802         int                     rc = 0;
803         struct ost_body        *body;
804         ENTRY;
805
806         spin_lock(&cli->cl_loi_list_lock);
807         /* Don't shrink if we are already above or below the desired limit
808          * We don't want to shrink below a single RPC, as that will negatively
809          * impact block allocation and long-term performance. */
810         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
811                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
812
813         if (target_bytes >= cli->cl_avail_grant) {
814                 spin_unlock(&cli->cl_loi_list_lock);
815                 RETURN(0);
816         }
817         spin_unlock(&cli->cl_loi_list_lock);
818
819         OBD_ALLOC_PTR(body);
820         if (!body)
821                 RETURN(-ENOMEM);
822
823         osc_announce_cached(cli, &body->oa, 0);
824
825         spin_lock(&cli->cl_loi_list_lock);
826         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
827         cli->cl_avail_grant = target_bytes;
828         spin_unlock(&cli->cl_loi_list_lock);
829         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
830                 body->oa.o_valid |= OBD_MD_FLFLAGS;
831                 body->oa.o_flags = 0;
832         }
833         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
834         osc_update_next_shrink(cli);
835
836         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
837                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
838                                 sizeof(*body), body, NULL);
839         if (rc != 0)
840                 __osc_update_grant(cli, body->oa.o_grant);
841         OBD_FREE_PTR(body);
842         RETURN(rc);
843 }
844
845 static int osc_should_shrink_grant(struct client_obd *client)
846 {
847         cfs_time_t time = cfs_time_current();
848         cfs_time_t next_shrink = client->cl_next_shrink_grant;
849
850         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
851              OBD_CONNECT_GRANT_SHRINK) == 0)
852                 return 0;
853
854         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
855                 /* Get the current RPC size directly, instead of going via:
856                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
857                  * Keep comment here so that it can be found by searching. */
858                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
861                     client->cl_avail_grant > brw_size)
862                         return 1;
863                 else
864                         osc_update_next_shrink(client);
865         }
866         return 0;
867 }
868
869 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
870 {
871         struct client_obd *client;
872
873         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
874                 if (osc_should_shrink_grant(client))
875                         osc_shrink_grant(client);
876         }
877         return 0;
878 }
879
880 static int osc_add_shrink_grant(struct client_obd *client)
881 {
882         int rc;
883
884         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
885                                        TIMEOUT_GRANT,
886                                        osc_grant_shrink_grant_cb, NULL,
887                                        &client->cl_grant_shrink_list);
888         if (rc) {
889                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
890                 return rc;
891         }
892         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
893         osc_update_next_shrink(client);
894         return 0;
895 }
896
897 static int osc_del_shrink_grant(struct client_obd *client)
898 {
899         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
900                                          TIMEOUT_GRANT);
901 }
902
903 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
904 {
905         /*
906          * ocd_grant is the total grant amount we're expect to hold: if we've
907          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
908          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
909          * dirty.
910          *
911          * race is tolerable here: if we're evicted, but imp_state already
912          * left EVICTED state, then cl_dirty_pages must be 0 already.
913          */
914         spin_lock(&cli->cl_loi_list_lock);
915         cli->cl_avail_grant = ocd->ocd_grant;
916         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
917                 cli->cl_avail_grant -= cli->cl_reserved_grant;
918                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
919                         cli->cl_avail_grant -= cli->cl_dirty_grant;
920                 else
921                         cli->cl_avail_grant -=
922                                         cli->cl_dirty_pages << PAGE_SHIFT;
923         }
924
925         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
926                 u64 size;
927                 int chunk_mask;
928
929                 /* overhead for each extent insertion */
930                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
931                 /* determine the appropriate chunk size used by osc_extent. */
932                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
933                                           ocd->ocd_grant_blkbits);
934                 /* max_pages_per_rpc must be chunk aligned */
935                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
936                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
937                                              ~chunk_mask) & chunk_mask;
938                 /* determine maximum extent size, in #pages */
939                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
940                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
941                 if (cli->cl_max_extent_pages == 0)
942                         cli->cl_max_extent_pages = 1;
943         } else {
944                 cli->cl_grant_extent_tax = 0;
945                 cli->cl_chunkbits = PAGE_SHIFT;
946                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
947         }
948         spin_unlock(&cli->cl_loi_list_lock);
949
950         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
951                 "chunk bits: %d cl_max_extent_pages: %d\n",
952                 cli_name(cli),
953                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
954                 cli->cl_max_extent_pages);
955
956         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
957             list_empty(&cli->cl_grant_shrink_list))
958                 osc_add_shrink_grant(cli);
959 }
960
961 /* We assume that the reason this OSC got a short read is because it read
962  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
963  * via the LOV, and it _knows_ it's reading inside the file, it's just that
964  * this stripe never got written at or beyond this stripe offset yet. */
965 static void handle_short_read(int nob_read, size_t page_count,
966                               struct brw_page **pga)
967 {
968         char *ptr;
969         int i = 0;
970
971         /* skip bytes read OK */
972         while (nob_read > 0) {
973                 LASSERT (page_count > 0);
974
975                 if (pga[i]->count > nob_read) {
976                         /* EOF inside this page */
977                         ptr = kmap(pga[i]->pg) +
978                                 (pga[i]->off & ~PAGE_MASK);
979                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
980                         kunmap(pga[i]->pg);
981                         page_count--;
982                         i++;
983                         break;
984                 }
985
986                 nob_read -= pga[i]->count;
987                 page_count--;
988                 i++;
989         }
990
991         /* zero remaining pages */
992         while (page_count-- > 0) {
993                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
994                 memset(ptr, 0, pga[i]->count);
995                 kunmap(pga[i]->pg);
996                 i++;
997         }
998 }
999
1000 static int check_write_rcs(struct ptlrpc_request *req,
1001                            int requested_nob, int niocount,
1002                            size_t page_count, struct brw_page **pga)
1003 {
1004         int     i;
1005         __u32   *remote_rcs;
1006
1007         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1008                                                   sizeof(*remote_rcs) *
1009                                                   niocount);
1010         if (remote_rcs == NULL) {
1011                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1012                 return(-EPROTO);
1013         }
1014
1015         /* return error if any niobuf was in error */
1016         for (i = 0; i < niocount; i++) {
1017                 if ((int)remote_rcs[i] < 0)
1018                         return(remote_rcs[i]);
1019
1020                 if (remote_rcs[i] != 0) {
1021                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1022                                 i, remote_rcs[i], req);
1023                         return(-EPROTO);
1024                 }
1025         }
1026
1027         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1028                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1029                        req->rq_bulk->bd_nob_transferred, requested_nob);
1030                 return(-EPROTO);
1031         }
1032
1033         return (0);
1034 }
1035
1036 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1037 {
1038         if (p1->flag != p2->flag) {
1039                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1040                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1041                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1042
1043                 /* warn if we try to combine flags that we don't know to be
1044                  * safe to combine */
1045                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1046                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1047                               "report this at https://jira.hpdd.intel.com/\n",
1048                               p1->flag, p2->flag);
1049                 }
1050                 return 0;
1051         }
1052
1053         return (p1->off + p1->count == p2->off);
1054 }
1055
1056 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1057                              struct brw_page **pga, int opc,
1058                              cksum_type_t cksum_type)
1059 {
1060         u32                             cksum;
1061         int                             i = 0;
1062         struct cfs_crypto_hash_desc     *hdesc;
1063         unsigned int                    bufsize;
1064         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1065
1066         LASSERT(pg_count > 0);
1067
1068         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1069         if (IS_ERR(hdesc)) {
1070                 CERROR("Unable to initialize checksum hash %s\n",
1071                        cfs_crypto_hash_name(cfs_alg));
1072                 return PTR_ERR(hdesc);
1073         }
1074
1075         while (nob > 0 && pg_count > 0) {
1076                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1077
1078                 /* corrupt the data before we compute the checksum, to
1079                  * simulate an OST->client data error */
1080                 if (i == 0 && opc == OST_READ &&
1081                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1082                         unsigned char *ptr = kmap(pga[i]->pg);
1083                         int off = pga[i]->off & ~PAGE_MASK;
1084
1085                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1086                         kunmap(pga[i]->pg);
1087                 }
1088                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1089                                             pga[i]->off & ~PAGE_MASK,
1090                                             count);
1091                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1092                                (int)(pga[i]->off & ~PAGE_MASK));
1093
1094                 nob -= pga[i]->count;
1095                 pg_count--;
1096                 i++;
1097         }
1098
1099         bufsize = sizeof(cksum);
1100         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1101
1102         /* For sending we only compute the wrong checksum instead
1103          * of corrupting the data so it is still correct on a redo */
1104         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1105                 cksum++;
1106
1107         return cksum;
1108 }
1109
1110 static int
1111 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1112                      u32 page_count, struct brw_page **pga,
1113                      struct ptlrpc_request **reqp, int resend)
1114 {
1115         struct ptlrpc_request   *req;
1116         struct ptlrpc_bulk_desc *desc;
1117         struct ost_body         *body;
1118         struct obd_ioobj        *ioobj;
1119         struct niobuf_remote    *niobuf;
1120         int niocount, i, requested_nob, opc, rc;
1121         struct osc_brw_async_args *aa;
1122         struct req_capsule      *pill;
1123         struct brw_page *pg_prev;
1124
1125         ENTRY;
1126         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1127                 RETURN(-ENOMEM); /* Recoverable */
1128         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1129                 RETURN(-EINVAL); /* Fatal */
1130
1131         if ((cmd & OBD_BRW_WRITE) != 0) {
1132                 opc = OST_WRITE;
1133                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1134                                                 osc_rq_pool,
1135                                                 &RQF_OST_BRW_WRITE);
1136         } else {
1137                 opc = OST_READ;
1138                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1139         }
1140         if (req == NULL)
1141                 RETURN(-ENOMEM);
1142
1143         for (niocount = i = 1; i < page_count; i++) {
1144                 if (!can_merge_pages(pga[i - 1], pga[i]))
1145                         niocount++;
1146         }
1147
1148         pill = &req->rq_pill;
1149         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1150                              sizeof(*ioobj));
1151         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1152                              niocount * sizeof(*niobuf));
1153
1154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1155         if (rc) {
1156                 ptlrpc_request_free(req);
1157                 RETURN(rc);
1158         }
1159         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1160         ptlrpc_at_set_req_timeout(req);
1161         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1162          * retry logic */
1163         req->rq_no_retry_einprogress = 1;
1164
1165         desc = ptlrpc_prep_bulk_imp(req, page_count,
1166                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1167                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1168                         PTLRPC_BULK_PUT_SINK) |
1169                         PTLRPC_BULK_BUF_KIOV,
1170                 OST_BULK_PORTAL,
1171                 &ptlrpc_bulk_kiov_pin_ops);
1172
1173         if (desc == NULL)
1174                 GOTO(out, rc = -ENOMEM);
1175         /* NB request now owns desc and will free it when it gets freed */
1176
1177         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1178         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1179         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1180         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1181
1182         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1183
1184         obdo_to_ioobj(oa, ioobj);
1185         ioobj->ioo_bufcnt = niocount;
1186         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1187          * that might be send for this request.  The actual number is decided
1188          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1189          * "max - 1" for old client compatibility sending "0", and also so the
1190          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1191         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1192         LASSERT(page_count > 0);
1193         pg_prev = pga[0];
1194         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1195                 struct brw_page *pg = pga[i];
1196                 int poff = pg->off & ~PAGE_MASK;
1197
1198                 LASSERT(pg->count > 0);
1199                 /* make sure there is no gap in the middle of page array */
1200                 LASSERTF(page_count == 1 ||
1201                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1202                           ergo(i > 0 && i < page_count - 1,
1203                                poff == 0 && pg->count == PAGE_SIZE)   &&
1204                           ergo(i == page_count - 1, poff == 0)),
1205                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1206                          i, page_count, pg, pg->off, pg->count);
1207                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1208                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1209                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1210                          i, page_count,
1211                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1212                          pg_prev->pg, page_private(pg_prev->pg),
1213                          pg_prev->pg->index, pg_prev->off);
1214                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1215                         (pg->flag & OBD_BRW_SRVLOCK));
1216
1217                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1218                 requested_nob += pg->count;
1219
1220                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1221                         niobuf--;
1222                         niobuf->rnb_len += pg->count;
1223                 } else {
1224                         niobuf->rnb_offset = pg->off;
1225                         niobuf->rnb_len    = pg->count;
1226                         niobuf->rnb_flags  = pg->flag;
1227                 }
1228                 pg_prev = pg;
1229         }
1230
1231         LASSERTF((void *)(niobuf - niocount) ==
1232                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1233                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1234                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1235
1236         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1237         if (resend) {
1238                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1239                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1240                         body->oa.o_flags = 0;
1241                 }
1242                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1243         }
1244
1245         if (osc_should_shrink_grant(cli))
1246                 osc_shrink_grant_local(cli, &body->oa);
1247
1248         /* size[REQ_REC_OFF] still sizeof (*body) */
1249         if (opc == OST_WRITE) {
1250                 if (cli->cl_checksum &&
1251                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1252                         /* store cl_cksum_type in a local variable since
1253                          * it can be changed via lprocfs */
1254                         cksum_type_t cksum_type = cli->cl_cksum_type;
1255
1256                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1257                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1258                                 body->oa.o_flags = 0;
1259                         }
1260                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1261                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1262                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1263                                                              page_count, pga,
1264                                                              OST_WRITE,
1265                                                              cksum_type);
1266                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1267                                body->oa.o_cksum);
1268                         /* save this in 'oa', too, for later checking */
1269                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1270                         oa->o_flags |= cksum_type_pack(cksum_type);
1271                 } else {
1272                         /* clear out the checksum flag, in case this is a
1273                          * resend but cl_checksum is no longer set. b=11238 */
1274                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1275                 }
1276                 oa->o_cksum = body->oa.o_cksum;
1277                 /* 1 RC per niobuf */
1278                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1279                                      sizeof(__u32) * niocount);
1280         } else {
1281                 if (cli->cl_checksum &&
1282                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1283                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284                                 body->oa.o_flags = 0;
1285                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1286                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1287                 }
1288         }
1289         ptlrpc_request_set_replen(req);
1290
1291         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1292         aa = ptlrpc_req_async_args(req);
1293         aa->aa_oa = oa;
1294         aa->aa_requested_nob = requested_nob;
1295         aa->aa_nio_count = niocount;
1296         aa->aa_page_count = page_count;
1297         aa->aa_resends = 0;
1298         aa->aa_ppga = pga;
1299         aa->aa_cli = cli;
1300         INIT_LIST_HEAD(&aa->aa_oaps);
1301
1302         *reqp = req;
1303         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1304         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1305                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1306                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1307         RETURN(0);
1308
1309  out:
1310         ptlrpc_req_finished(req);
1311         RETURN(rc);
1312 }
1313
1314 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1315                                 __u32 client_cksum, __u32 server_cksum, int nob,
1316                                 size_t page_count, struct brw_page **pga,
1317                                 cksum_type_t client_cksum_type)
1318 {
1319         __u32 new_cksum;
1320         char *msg;
1321         cksum_type_t cksum_type;
1322
1323         if (server_cksum == client_cksum) {
1324                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1325                 return 0;
1326         }
1327
1328         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1329                                        oa->o_flags : 0);
1330         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1331                                       cksum_type);
1332
1333         if (cksum_type != client_cksum_type)
1334                 msg = "the server did not use the checksum type specified in "
1335                       "the original request - likely a protocol problem";
1336         else if (new_cksum == server_cksum)
1337                 msg = "changed on the client after we checksummed it - "
1338                       "likely false positive due to mmap IO (bug 11742)";
1339         else if (new_cksum == client_cksum)
1340                 msg = "changed in transit before arrival at OST";
1341         else
1342                 msg = "changed in transit AND doesn't match the original - "
1343                       "likely false positive due to mmap IO (bug 11742)";
1344
1345         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1346                            " object "DOSTID" extent [%llu-%llu]\n",
1347                            msg, libcfs_nid2str(peer->nid),
1348                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1349                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1350                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1351                            POSTID(&oa->o_oi), pga[0]->off,
1352                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1353         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1354                "client csum now %x\n", client_cksum, client_cksum_type,
1355                server_cksum, cksum_type, new_cksum);
1356         return 1;
1357 }
1358
1359 /* Note rc enters this function as number of bytes transferred */
1360 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1361 {
1362         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1363         const lnet_process_id_t *peer =
1364                         &req->rq_import->imp_connection->c_peer;
1365         struct client_obd *cli = aa->aa_cli;
1366         struct ost_body *body;
1367         u32 client_cksum = 0;
1368         ENTRY;
1369
1370         if (rc < 0 && rc != -EDQUOT) {
1371                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1372                 RETURN(rc);
1373         }
1374
1375         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1376         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1377         if (body == NULL) {
1378                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1379                 RETURN(-EPROTO);
1380         }
1381
1382         /* set/clear over quota flag for a uid/gid */
1383         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1384             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1385                 unsigned int qid[LL_MAXQUOTAS] =
1386                                         {body->oa.o_uid, body->oa.o_gid};
1387
1388                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1389                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1390                        body->oa.o_flags);
1391                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1392         }
1393
1394         osc_update_grant(cli, body);
1395
1396         if (rc < 0)
1397                 RETURN(rc);
1398
1399         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1400                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1401
1402         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1403                 if (rc > 0) {
1404                         CERROR("Unexpected +ve rc %d\n", rc);
1405                         RETURN(-EPROTO);
1406                 }
1407                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1408
1409                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1410                         RETURN(-EAGAIN);
1411
1412                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1413                     check_write_checksum(&body->oa, peer, client_cksum,
1414                                          body->oa.o_cksum, aa->aa_requested_nob,
1415                                          aa->aa_page_count, aa->aa_ppga,
1416                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1417                         RETURN(-EAGAIN);
1418
1419                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1420                                      aa->aa_page_count, aa->aa_ppga);
1421                 GOTO(out, rc);
1422         }
1423
1424         /* The rest of this function executes only for OST_READs */
1425
1426         /* if unwrap_bulk failed, return -EAGAIN to retry */
1427         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1428         if (rc < 0)
1429                 GOTO(out, rc = -EAGAIN);
1430
1431         if (rc > aa->aa_requested_nob) {
1432                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1433                        aa->aa_requested_nob);
1434                 RETURN(-EPROTO);
1435         }
1436
1437         if (rc != req->rq_bulk->bd_nob_transferred) {
1438                 CERROR ("Unexpected rc %d (%d transferred)\n",
1439                         rc, req->rq_bulk->bd_nob_transferred);
1440                 return (-EPROTO);
1441         }
1442
1443         if (rc < aa->aa_requested_nob)
1444                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1445
1446         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1447                 static int cksum_counter;
1448                 u32        server_cksum = body->oa.o_cksum;
1449                 char      *via = "";
1450                 char      *router = "";
1451                 cksum_type_t cksum_type;
1452
1453                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1454                                                body->oa.o_flags : 0);
1455                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1456                                                  aa->aa_ppga, OST_READ,
1457                                                  cksum_type);
1458
1459                 if (peer->nid != req->rq_bulk->bd_sender) {
1460                         via = " via ";
1461                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1462                 }
1463
1464                 if (server_cksum != client_cksum) {
1465                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1466                                            "%s%s%s inode "DFID" object "DOSTID
1467                                            " extent [%llu-%llu]\n",
1468                                            req->rq_import->imp_obd->obd_name,
1469                                            libcfs_nid2str(peer->nid),
1470                                            via, router,
1471                                            body->oa.o_valid & OBD_MD_FLFID ?
1472                                                 body->oa.o_parent_seq : (__u64)0,
1473                                            body->oa.o_valid & OBD_MD_FLFID ?
1474                                                 body->oa.o_parent_oid : 0,
1475                                            body->oa.o_valid & OBD_MD_FLFID ?
1476                                                 body->oa.o_parent_ver : 0,
1477                                            POSTID(&body->oa.o_oi),
1478                                            aa->aa_ppga[0]->off,
1479                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1480                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1481                                                                         1);
1482                         CERROR("client %x, server %x, cksum_type %x\n",
1483                                client_cksum, server_cksum, cksum_type);
1484                         cksum_counter = 0;
1485                         aa->aa_oa->o_cksum = client_cksum;
1486                         rc = -EAGAIN;
1487                 } else {
1488                         cksum_counter++;
1489                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1490                         rc = 0;
1491                 }
1492         } else if (unlikely(client_cksum)) {
1493                 static int cksum_missed;
1494
1495                 cksum_missed++;
1496                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1497                         CERROR("Checksum %u requested from %s but not sent\n",
1498                                cksum_missed, libcfs_nid2str(peer->nid));
1499         } else {
1500                 rc = 0;
1501         }
1502 out:
1503         if (rc >= 0)
1504                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1505                                      aa->aa_oa, &body->oa);
1506
1507         RETURN(rc);
1508 }
1509
1510 static int osc_brw_redo_request(struct ptlrpc_request *request,
1511                                 struct osc_brw_async_args *aa, int rc)
1512 {
1513         struct ptlrpc_request *new_req;
1514         struct osc_brw_async_args *new_aa;
1515         struct osc_async_page *oap;
1516         ENTRY;
1517
1518         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1519                   "redo for recoverable error %d", rc);
1520
1521         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1522                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1523                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1524                                   aa->aa_ppga, &new_req, 1);
1525         if (rc)
1526                 RETURN(rc);
1527
1528         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1529                 if (oap->oap_request != NULL) {
1530                         LASSERTF(request == oap->oap_request,
1531                                  "request %p != oap_request %p\n",
1532                                  request, oap->oap_request);
1533                         if (oap->oap_interrupted) {
1534                                 ptlrpc_req_finished(new_req);
1535                                 RETURN(-EINTR);
1536                         }
1537                 }
1538         }
1539         /* New request takes over pga and oaps from old request.
1540          * Note that copying a list_head doesn't work, need to move it... */
1541         aa->aa_resends++;
1542         new_req->rq_interpret_reply = request->rq_interpret_reply;
1543         new_req->rq_async_args = request->rq_async_args;
1544         new_req->rq_commit_cb = request->rq_commit_cb;
1545         /* cap resend delay to the current request timeout, this is similar to
1546          * what ptlrpc does (see after_reply()) */
1547         if (aa->aa_resends > new_req->rq_timeout)
1548                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1549         else
1550                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1551         new_req->rq_generation_set = 1;
1552         new_req->rq_import_generation = request->rq_import_generation;
1553
1554         new_aa = ptlrpc_req_async_args(new_req);
1555
1556         INIT_LIST_HEAD(&new_aa->aa_oaps);
1557         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1558         INIT_LIST_HEAD(&new_aa->aa_exts);
1559         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1560         new_aa->aa_resends = aa->aa_resends;
1561
1562         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1563                 if (oap->oap_request) {
1564                         ptlrpc_req_finished(oap->oap_request);
1565                         oap->oap_request = ptlrpc_request_addref(new_req);
1566                 }
1567         }
1568
1569         /* XXX: This code will run into problem if we're going to support
1570          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1571          * and wait for all of them to be finished. We should inherit request
1572          * set from old request. */
1573         ptlrpcd_add_req(new_req);
1574
1575         DEBUG_REQ(D_INFO, new_req, "new request");
1576         RETURN(0);
1577 }
1578
1579 /*
1580  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1581  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1582  * fine for our small page arrays and doesn't require allocation.  its an
1583  * insertion sort that swaps elements that are strides apart, shrinking the
1584  * stride down until its '1' and the array is sorted.
1585  */
1586 static void sort_brw_pages(struct brw_page **array, int num)
1587 {
1588         int stride, i, j;
1589         struct brw_page *tmp;
1590
1591         if (num == 1)
1592                 return;
1593         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1594                 ;
1595
1596         do {
1597                 stride /= 3;
1598                 for (i = stride ; i < num ; i++) {
1599                         tmp = array[i];
1600                         j = i;
1601                         while (j >= stride && array[j - stride]->off > tmp->off) {
1602                                 array[j] = array[j - stride];
1603                                 j -= stride;
1604                         }
1605                         array[j] = tmp;
1606                 }
1607         } while (stride > 1);
1608 }
1609
1610 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1611 {
1612         LASSERT(ppga != NULL);
1613         OBD_FREE(ppga, sizeof(*ppga) * count);
1614 }
1615
1616 static int brw_interpret(const struct lu_env *env,
1617                          struct ptlrpc_request *req, void *data, int rc)
1618 {
1619         struct osc_brw_async_args *aa = data;
1620         struct osc_extent *ext;
1621         struct osc_extent *tmp;
1622         struct client_obd *cli = aa->aa_cli;
1623         ENTRY;
1624
1625         rc = osc_brw_fini_request(req, rc);
1626         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1627         /* When server return -EINPROGRESS, client should always retry
1628          * regardless of the number of times the bulk was resent already. */
1629         if (osc_recoverable_error(rc)) {
1630                 if (req->rq_import_generation !=
1631                     req->rq_import->imp_generation) {
1632                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1633                                ""DOSTID", rc = %d.\n",
1634                                req->rq_import->imp_obd->obd_name,
1635                                POSTID(&aa->aa_oa->o_oi), rc);
1636                 } else if (rc == -EINPROGRESS ||
1637                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1638                         rc = osc_brw_redo_request(req, aa, rc);
1639                 } else {
1640                         CERROR("%s: too many resent retries for object: "
1641                                "%llu:%llu, rc = %d.\n",
1642                                req->rq_import->imp_obd->obd_name,
1643                                POSTID(&aa->aa_oa->o_oi), rc);
1644                 }
1645
1646                 if (rc == 0)
1647                         RETURN(0);
1648                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1649                         rc = -EIO;
1650         }
1651
1652         if (rc == 0) {
1653                 struct obdo *oa = aa->aa_oa;
1654                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1655                 unsigned long valid = 0;
1656                 struct cl_object *obj;
1657                 struct osc_async_page *last;
1658
1659                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1660                 obj = osc2cl(last->oap_obj);
1661
1662                 cl_object_attr_lock(obj);
1663                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1664                         attr->cat_blocks = oa->o_blocks;
1665                         valid |= CAT_BLOCKS;
1666                 }
1667                 if (oa->o_valid & OBD_MD_FLMTIME) {
1668                         attr->cat_mtime = oa->o_mtime;
1669                         valid |= CAT_MTIME;
1670                 }
1671                 if (oa->o_valid & OBD_MD_FLATIME) {
1672                         attr->cat_atime = oa->o_atime;
1673                         valid |= CAT_ATIME;
1674                 }
1675                 if (oa->o_valid & OBD_MD_FLCTIME) {
1676                         attr->cat_ctime = oa->o_ctime;
1677                         valid |= CAT_CTIME;
1678                 }
1679
1680                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1681                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1682                         loff_t last_off = last->oap_count + last->oap_obj_off +
1683                                 last->oap_page_off;
1684
1685                         /* Change file size if this is an out of quota or
1686                          * direct IO write and it extends the file size */
1687                         if (loi->loi_lvb.lvb_size < last_off) {
1688                                 attr->cat_size = last_off;
1689                                 valid |= CAT_SIZE;
1690                         }
1691                         /* Extend KMS if it's not a lockless write */
1692                         if (loi->loi_kms < last_off &&
1693                             oap2osc_page(last)->ops_srvlock == 0) {
1694                                 attr->cat_kms = last_off;
1695                                 valid |= CAT_KMS;
1696                         }
1697                 }
1698
1699                 if (valid != 0)
1700                         cl_object_attr_update(env, obj, attr, valid);
1701                 cl_object_attr_unlock(obj);
1702         }
1703         OBDO_FREE(aa->aa_oa);
1704
1705         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1706                 osc_inc_unstable_pages(req);
1707
1708         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1709                 list_del_init(&ext->oe_link);
1710                 osc_extent_finish(env, ext, 1, rc);
1711         }
1712         LASSERT(list_empty(&aa->aa_exts));
1713         LASSERT(list_empty(&aa->aa_oaps));
1714
1715         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1716         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1717
1718         spin_lock(&cli->cl_loi_list_lock);
1719         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1720          * is called so we know whether to go to sync BRWs or wait for more
1721          * RPCs to complete */
1722         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1723                 cli->cl_w_in_flight--;
1724         else
1725                 cli->cl_r_in_flight--;
1726         osc_wake_cache_waiters(cli);
1727         spin_unlock(&cli->cl_loi_list_lock);
1728
1729         osc_io_unplug(env, cli, NULL);
1730         RETURN(rc);
1731 }
1732
1733 static void brw_commit(struct ptlrpc_request *req)
1734 {
1735         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1736          * this called via the rq_commit_cb, I need to ensure
1737          * osc_dec_unstable_pages is still called. Otherwise unstable
1738          * pages may be leaked. */
1739         spin_lock(&req->rq_lock);
1740         if (likely(req->rq_unstable)) {
1741                 req->rq_unstable = 0;
1742                 spin_unlock(&req->rq_lock);
1743
1744                 osc_dec_unstable_pages(req);
1745         } else {
1746                 req->rq_committed = 1;
1747                 spin_unlock(&req->rq_lock);
1748         }
1749 }
1750
1751 /**
1752  * Build an RPC by the list of extent @ext_list. The caller must ensure
1753  * that the total pages in this list are NOT over max pages per RPC.
1754  * Extents in the list must be in OES_RPC state.
1755  */
1756 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1757                   struct list_head *ext_list, int cmd)
1758 {
1759         struct ptlrpc_request           *req = NULL;
1760         struct osc_extent               *ext;
1761         struct brw_page                 **pga = NULL;
1762         struct osc_brw_async_args       *aa = NULL;
1763         struct obdo                     *oa = NULL;
1764         struct osc_async_page           *oap;
1765         struct osc_object               *obj = NULL;
1766         struct cl_req_attr              *crattr = NULL;
1767         loff_t                          starting_offset = OBD_OBJECT_EOF;
1768         loff_t                          ending_offset = 0;
1769         int                             mpflag = 0;
1770         int                             mem_tight = 0;
1771         int                             page_count = 0;
1772         bool                            soft_sync = false;
1773         bool                            interrupted = false;
1774         int                             i;
1775         int                             grant = 0;
1776         int                             rc;
1777         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1778         struct ost_body                 *body;
1779         ENTRY;
1780         LASSERT(!list_empty(ext_list));
1781
1782         /* add pages into rpc_list to build BRW rpc */
1783         list_for_each_entry(ext, ext_list, oe_link) {
1784                 LASSERT(ext->oe_state == OES_RPC);
1785                 mem_tight |= ext->oe_memalloc;
1786                 grant += ext->oe_grants;
1787                 page_count += ext->oe_nr_pages;
1788                 if (obj == NULL)
1789                         obj = ext->oe_obj;
1790         }
1791
1792         soft_sync = osc_over_unstable_soft_limit(cli);
1793         if (mem_tight)
1794                 mpflag = cfs_memory_pressure_get_and_set();
1795
1796         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1797         if (pga == NULL)
1798                 GOTO(out, rc = -ENOMEM);
1799
1800         OBDO_ALLOC(oa);
1801         if (oa == NULL)
1802                 GOTO(out, rc = -ENOMEM);
1803
1804         i = 0;
1805         list_for_each_entry(ext, ext_list, oe_link) {
1806                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1807                         if (mem_tight)
1808                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1809                         if (soft_sync)
1810                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1811                         pga[i] = &oap->oap_brw_page;
1812                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1813                         i++;
1814
1815                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1816                         if (starting_offset == OBD_OBJECT_EOF ||
1817                             starting_offset > oap->oap_obj_off)
1818                                 starting_offset = oap->oap_obj_off;
1819                         else
1820                                 LASSERT(oap->oap_page_off == 0);
1821                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1822                                 ending_offset = oap->oap_obj_off +
1823                                                 oap->oap_count;
1824                         else
1825                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1826                                         PAGE_SIZE);
1827                         if (oap->oap_interrupted)
1828                                 interrupted = true;
1829                 }
1830         }
1831
1832         /* first page in the list */
1833         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1834
1835         crattr = &osc_env_info(env)->oti_req_attr;
1836         memset(crattr, 0, sizeof(*crattr));
1837         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1838         crattr->cra_flags = ~0ULL;
1839         crattr->cra_page = oap2cl_page(oap);
1840         crattr->cra_oa = oa;
1841         cl_req_attr_set(env, osc2cl(obj), crattr);
1842
1843         if (cmd == OBD_BRW_WRITE)
1844                 oa->o_grant_used = grant;
1845
1846         sort_brw_pages(pga, page_count);
1847         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1848         if (rc != 0) {
1849                 CERROR("prep_req failed: %d\n", rc);
1850                 GOTO(out, rc);
1851         }
1852
1853         req->rq_commit_cb = brw_commit;
1854         req->rq_interpret_reply = brw_interpret;
1855         req->rq_memalloc = mem_tight != 0;
1856         oap->oap_request = ptlrpc_request_addref(req);
1857         if (interrupted && !req->rq_intr)
1858                 ptlrpc_mark_interrupted(req);
1859
1860         /* Need to update the timestamps after the request is built in case
1861          * we race with setattr (locally or in queue at OST).  If OST gets
1862          * later setattr before earlier BRW (as determined by the request xid),
1863          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1864          * way to do this in a single call.  bug 10150 */
1865         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1866         crattr->cra_oa = &body->oa;
1867         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1868         cl_req_attr_set(env, osc2cl(obj), crattr);
1869         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1870
1871         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1872         aa = ptlrpc_req_async_args(req);
1873         INIT_LIST_HEAD(&aa->aa_oaps);
1874         list_splice_init(&rpc_list, &aa->aa_oaps);
1875         INIT_LIST_HEAD(&aa->aa_exts);
1876         list_splice_init(ext_list, &aa->aa_exts);
1877
1878         spin_lock(&cli->cl_loi_list_lock);
1879         starting_offset >>= PAGE_SHIFT;
1880         if (cmd == OBD_BRW_READ) {
1881                 cli->cl_r_in_flight++;
1882                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1883                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1884                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1885                                       starting_offset + 1);
1886         } else {
1887                 cli->cl_w_in_flight++;
1888                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1889                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1890                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1891                                       starting_offset + 1);
1892         }
1893         spin_unlock(&cli->cl_loi_list_lock);
1894
1895         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1896                   page_count, aa, cli->cl_r_in_flight,
1897                   cli->cl_w_in_flight);
1898         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1899
1900         ptlrpcd_add_req(req);
1901         rc = 0;
1902         EXIT;
1903
1904 out:
1905         if (mem_tight != 0)
1906                 cfs_memory_pressure_restore(mpflag);
1907
1908         if (rc != 0) {
1909                 LASSERT(req == NULL);
1910
1911                 if (oa)
1912                         OBDO_FREE(oa);
1913                 if (pga)
1914                         OBD_FREE(pga, sizeof(*pga) * page_count);
1915                 /* this should happen rarely and is pretty bad, it makes the
1916                  * pending list not follow the dirty order */
1917                 while (!list_empty(ext_list)) {
1918                         ext = list_entry(ext_list->next, struct osc_extent,
1919                                          oe_link);
1920                         list_del_init(&ext->oe_link);
1921                         osc_extent_finish(env, ext, 0, rc);
1922                 }
1923         }
1924         RETURN(rc);
1925 }
1926
1927 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1928 {
1929         int set = 0;
1930
1931         LASSERT(lock != NULL);
1932
1933         lock_res_and_lock(lock);
1934
1935         if (lock->l_ast_data == NULL)
1936                 lock->l_ast_data = data;
1937         if (lock->l_ast_data == data)
1938                 set = 1;
1939
1940         unlock_res_and_lock(lock);
1941
1942         return set;
1943 }
1944
1945 static int osc_enqueue_fini(struct ptlrpc_request *req,
1946                             osc_enqueue_upcall_f upcall, void *cookie,
1947                             struct lustre_handle *lockh, enum ldlm_mode mode,
1948                             __u64 *flags, int agl, int errcode)
1949 {
1950         bool intent = *flags & LDLM_FL_HAS_INTENT;
1951         int rc;
1952         ENTRY;
1953
1954         /* The request was created before ldlm_cli_enqueue call. */
1955         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1956                 struct ldlm_reply *rep;
1957
1958                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1959                 LASSERT(rep != NULL);
1960
1961                 rep->lock_policy_res1 =
1962                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1963                 if (rep->lock_policy_res1)
1964                         errcode = rep->lock_policy_res1;
1965                 if (!agl)
1966                         *flags |= LDLM_FL_LVB_READY;
1967         } else if (errcode == ELDLM_OK) {
1968                 *flags |= LDLM_FL_LVB_READY;
1969         }
1970
1971         /* Call the update callback. */
1972         rc = (*upcall)(cookie, lockh, errcode);
1973
1974         /* release the reference taken in ldlm_cli_enqueue() */
1975         if (errcode == ELDLM_LOCK_MATCHED)
1976                 errcode = ELDLM_OK;
1977         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1978                 ldlm_lock_decref(lockh, mode);
1979
1980         RETURN(rc);
1981 }
1982
1983 static int osc_enqueue_interpret(const struct lu_env *env,
1984                                  struct ptlrpc_request *req,
1985                                  struct osc_enqueue_args *aa, int rc)
1986 {
1987         struct ldlm_lock *lock;
1988         struct lustre_handle *lockh = &aa->oa_lockh;
1989         enum ldlm_mode mode = aa->oa_mode;
1990         struct ost_lvb *lvb = aa->oa_lvb;
1991         __u32 lvb_len = sizeof(*lvb);
1992         __u64 flags = 0;
1993
1994         ENTRY;
1995
1996         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1997          * be valid. */
1998         lock = ldlm_handle2lock(lockh);
1999         LASSERTF(lock != NULL,
2000                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2001                  lockh->cookie, req, aa);
2002
2003         /* Take an additional reference so that a blocking AST that
2004          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2005          * to arrive after an upcall has been executed by
2006          * osc_enqueue_fini(). */
2007         ldlm_lock_addref(lockh, mode);
2008
2009         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2010         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2011
2012         /* Let CP AST to grant the lock first. */
2013         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2014
2015         if (aa->oa_agl) {
2016                 LASSERT(aa->oa_lvb == NULL);
2017                 LASSERT(aa->oa_flags == NULL);
2018                 aa->oa_flags = &flags;
2019         }
2020
2021         /* Complete obtaining the lock procedure. */
2022         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2023                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2024                                    lockh, rc);
2025         /* Complete osc stuff. */
2026         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2027                               aa->oa_flags, aa->oa_agl, rc);
2028
2029         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2030
2031         ldlm_lock_decref(lockh, mode);
2032         LDLM_LOCK_PUT(lock);
2033         RETURN(rc);
2034 }
2035
2036 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2037
2038 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2039  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2040  * other synchronous requests, however keeping some locks and trying to obtain
2041  * others may take a considerable amount of time in a case of ost failure; and
2042  * when other sync requests do not get released lock from a client, the client
2043  * is evicted from the cluster -- such scenarious make the life difficult, so
2044  * release locks just after they are obtained. */
2045 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2046                      __u64 *flags, union ldlm_policy_data *policy,
2047                      struct ost_lvb *lvb, int kms_valid,
2048                      osc_enqueue_upcall_f upcall, void *cookie,
2049                      struct ldlm_enqueue_info *einfo,
2050                      struct ptlrpc_request_set *rqset, int async, int agl)
2051 {
2052         struct obd_device *obd = exp->exp_obd;
2053         struct lustre_handle lockh = { 0 };
2054         struct ptlrpc_request *req = NULL;
2055         int intent = *flags & LDLM_FL_HAS_INTENT;
2056         __u64 match_flags = *flags;
2057         enum ldlm_mode mode;
2058         int rc;
2059         ENTRY;
2060
2061         /* Filesystem lock extents are extended to page boundaries so that
2062          * dealing with the page cache is a little smoother.  */
2063         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2064         policy->l_extent.end |= ~PAGE_MASK;
2065
2066         /*
2067          * kms is not valid when either object is completely fresh (so that no
2068          * locks are cached), or object was evicted. In the latter case cached
2069          * lock cannot be used, because it would prime inode state with
2070          * potentially stale LVB.
2071          */
2072         if (!kms_valid)
2073                 goto no_match;
2074
2075         /* Next, search for already existing extent locks that will cover us */
2076         /* If we're trying to read, we also search for an existing PW lock.  The
2077          * VFS and page cache already protect us locally, so lots of readers/
2078          * writers can share a single PW lock.
2079          *
2080          * There are problems with conversion deadlocks, so instead of
2081          * converting a read lock to a write lock, we'll just enqueue a new
2082          * one.
2083          *
2084          * At some point we should cancel the read lock instead of making them
2085          * send us a blocking callback, but there are problems with canceling
2086          * locks out from other users right now, too. */
2087         mode = einfo->ei_mode;
2088         if (einfo->ei_mode == LCK_PR)
2089                 mode |= LCK_PW;
2090         if (agl == 0)
2091                 match_flags |= LDLM_FL_LVB_READY;
2092         if (intent != 0)
2093                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2094         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2095                                einfo->ei_type, policy, mode, &lockh, 0);
2096         if (mode) {
2097                 struct ldlm_lock *matched;
2098
2099                 if (*flags & LDLM_FL_TEST_LOCK)
2100                         RETURN(ELDLM_OK);
2101
2102                 matched = ldlm_handle2lock(&lockh);
2103                 if (agl) {
2104                         /* AGL enqueues DLM locks speculatively. Therefore if
2105                          * it already exists a DLM lock, it wll just inform the
2106                          * caller to cancel the AGL process for this stripe. */
2107                         ldlm_lock_decref(&lockh, mode);
2108                         LDLM_LOCK_PUT(matched);
2109                         RETURN(-ECANCELED);
2110                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2111                         *flags |= LDLM_FL_LVB_READY;
2112
2113                         /* We already have a lock, and it's referenced. */
2114                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2115
2116                         ldlm_lock_decref(&lockh, mode);
2117                         LDLM_LOCK_PUT(matched);
2118                         RETURN(ELDLM_OK);
2119                 } else {
2120                         ldlm_lock_decref(&lockh, mode);
2121                         LDLM_LOCK_PUT(matched);
2122                 }
2123         }
2124
2125 no_match:
2126         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2127                 RETURN(-ENOLCK);
2128
2129         if (intent) {
2130                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2131                                            &RQF_LDLM_ENQUEUE_LVB);
2132                 if (req == NULL)
2133                         RETURN(-ENOMEM);
2134
2135                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2136                 if (rc) {
2137                         ptlrpc_request_free(req);
2138                         RETURN(rc);
2139                 }
2140
2141                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2142                                      sizeof *lvb);
2143                 ptlrpc_request_set_replen(req);
2144         }
2145
2146         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2147         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2148
2149         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2150                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2151         if (async) {
2152                 if (!rc) {
2153                         struct osc_enqueue_args *aa;
2154                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2155                         aa = ptlrpc_req_async_args(req);
2156                         aa->oa_exp    = exp;
2157                         aa->oa_mode   = einfo->ei_mode;
2158                         aa->oa_type   = einfo->ei_type;
2159                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2160                         aa->oa_upcall = upcall;
2161                         aa->oa_cookie = cookie;
2162                         aa->oa_agl    = !!agl;
2163                         if (!agl) {
2164                                 aa->oa_flags  = flags;
2165                                 aa->oa_lvb    = lvb;
2166                         } else {
2167                                 /* AGL is essentially to enqueue an DLM lock
2168                                  * in advance, so we don't care about the
2169                                  * result of AGL enqueue. */
2170                                 aa->oa_lvb    = NULL;
2171                                 aa->oa_flags  = NULL;
2172                         }
2173
2174                         req->rq_interpret_reply =
2175                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2176                         if (rqset == PTLRPCD_SET)
2177                                 ptlrpcd_add_req(req);
2178                         else
2179                                 ptlrpc_set_add_req(rqset, req);
2180                 } else if (intent) {
2181                         ptlrpc_req_finished(req);
2182                 }
2183                 RETURN(rc);
2184         }
2185
2186         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2187                               flags, agl, rc);
2188         if (intent)
2189                 ptlrpc_req_finished(req);
2190
2191         RETURN(rc);
2192 }
2193
2194 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2195                    enum ldlm_type type, union ldlm_policy_data *policy,
2196                    enum ldlm_mode mode, __u64 *flags, void *data,
2197                    struct lustre_handle *lockh, int unref)
2198 {
2199         struct obd_device *obd = exp->exp_obd;
2200         __u64 lflags = *flags;
2201         enum ldlm_mode rc;
2202         ENTRY;
2203
2204         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2205                 RETURN(-EIO);
2206
2207         /* Filesystem lock extents are extended to page boundaries so that
2208          * dealing with the page cache is a little smoother */
2209         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2210         policy->l_extent.end |= ~PAGE_MASK;
2211
2212         /* Next, search for already existing extent locks that will cover us */
2213         /* If we're trying to read, we also search for an existing PW lock.  The
2214          * VFS and page cache already protect us locally, so lots of readers/
2215          * writers can share a single PW lock. */
2216         rc = mode;
2217         if (mode == LCK_PR)
2218                 rc |= LCK_PW;
2219         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2220                              res_id, type, policy, rc, lockh, unref);
2221         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2222                 RETURN(rc);
2223
2224         if (data != NULL) {
2225                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2226
2227                 LASSERT(lock != NULL);
2228                 if (!osc_set_lock_data(lock, data)) {
2229                         ldlm_lock_decref(lockh, rc);
2230                         rc = 0;
2231                 }
2232                 LDLM_LOCK_PUT(lock);
2233         }
2234         RETURN(rc);
2235 }
2236
2237 static int osc_statfs_interpret(const struct lu_env *env,
2238                                 struct ptlrpc_request *req,
2239                                 struct osc_async_args *aa, int rc)
2240 {
2241         struct obd_statfs *msfs;
2242         ENTRY;
2243
2244         if (rc == -EBADR)
2245                 /* The request has in fact never been sent
2246                  * due to issues at a higher level (LOV).
2247                  * Exit immediately since the caller is
2248                  * aware of the problem and takes care
2249                  * of the clean up */
2250                  RETURN(rc);
2251
2252         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2253             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2254                 GOTO(out, rc = 0);
2255
2256         if (rc != 0)
2257                 GOTO(out, rc);
2258
2259         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2260         if (msfs == NULL) {
2261                 GOTO(out, rc = -EPROTO);
2262         }
2263
2264         *aa->aa_oi->oi_osfs = *msfs;
2265 out:
2266         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2267         RETURN(rc);
2268 }
2269
2270 static int osc_statfs_async(struct obd_export *exp,
2271                             struct obd_info *oinfo, __u64 max_age,
2272                             struct ptlrpc_request_set *rqset)
2273 {
2274         struct obd_device     *obd = class_exp2obd(exp);
2275         struct ptlrpc_request *req;
2276         struct osc_async_args *aa;
2277         int                    rc;
2278         ENTRY;
2279
2280         /* We could possibly pass max_age in the request (as an absolute
2281          * timestamp or a "seconds.usec ago") so the target can avoid doing
2282          * extra calls into the filesystem if that isn't necessary (e.g.
2283          * during mount that would help a bit).  Having relative timestamps
2284          * is not so great if request processing is slow, while absolute
2285          * timestamps are not ideal because they need time synchronization. */
2286         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2287         if (req == NULL)
2288                 RETURN(-ENOMEM);
2289
2290         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2291         if (rc) {
2292                 ptlrpc_request_free(req);
2293                 RETURN(rc);
2294         }
2295         ptlrpc_request_set_replen(req);
2296         req->rq_request_portal = OST_CREATE_PORTAL;
2297         ptlrpc_at_set_req_timeout(req);
2298
2299         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2300                 /* procfs requests not want stat in wait for avoid deadlock */
2301                 req->rq_no_resend = 1;
2302                 req->rq_no_delay = 1;
2303         }
2304
2305         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2306         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2307         aa = ptlrpc_req_async_args(req);
2308         aa->aa_oi = oinfo;
2309
2310         ptlrpc_set_add_req(rqset, req);
2311         RETURN(0);
2312 }
2313
2314 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2315                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2316 {
2317         struct obd_device     *obd = class_exp2obd(exp);
2318         struct obd_statfs     *msfs;
2319         struct ptlrpc_request *req;
2320         struct obd_import     *imp = NULL;
2321         int rc;
2322         ENTRY;
2323
2324         /*Since the request might also come from lprocfs, so we need
2325          *sync this with client_disconnect_export Bug15684*/
2326         down_read(&obd->u.cli.cl_sem);
2327         if (obd->u.cli.cl_import)
2328                 imp = class_import_get(obd->u.cli.cl_import);
2329         up_read(&obd->u.cli.cl_sem);
2330         if (!imp)
2331                 RETURN(-ENODEV);
2332
2333         /* We could possibly pass max_age in the request (as an absolute
2334          * timestamp or a "seconds.usec ago") so the target can avoid doing
2335          * extra calls into the filesystem if that isn't necessary (e.g.
2336          * during mount that would help a bit).  Having relative timestamps
2337          * is not so great if request processing is slow, while absolute
2338          * timestamps are not ideal because they need time synchronization. */
2339         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2340
2341         class_import_put(imp);
2342
2343         if (req == NULL)
2344                 RETURN(-ENOMEM);
2345
2346         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2347         if (rc) {
2348                 ptlrpc_request_free(req);
2349                 RETURN(rc);
2350         }
2351         ptlrpc_request_set_replen(req);
2352         req->rq_request_portal = OST_CREATE_PORTAL;
2353         ptlrpc_at_set_req_timeout(req);
2354
2355         if (flags & OBD_STATFS_NODELAY) {
2356                 /* procfs requests not want stat in wait for avoid deadlock */
2357                 req->rq_no_resend = 1;
2358                 req->rq_no_delay = 1;
2359         }
2360
2361         rc = ptlrpc_queue_wait(req);
2362         if (rc)
2363                 GOTO(out, rc);
2364
2365         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2366         if (msfs == NULL) {
2367                 GOTO(out, rc = -EPROTO);
2368         }
2369
2370         *osfs = *msfs;
2371
2372         EXIT;
2373  out:
2374         ptlrpc_req_finished(req);
2375         return rc;
2376 }
2377
2378 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2379                          void *karg, void __user *uarg)
2380 {
2381         struct obd_device *obd = exp->exp_obd;
2382         struct obd_ioctl_data *data = karg;
2383         int err = 0;
2384         ENTRY;
2385
2386         if (!try_module_get(THIS_MODULE)) {
2387                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2388                        module_name(THIS_MODULE));
2389                 return -EINVAL;
2390         }
2391         switch (cmd) {
2392         case OBD_IOC_CLIENT_RECOVER:
2393                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2394                                             data->ioc_inlbuf1, 0);
2395                 if (err > 0)
2396                         err = 0;
2397                 GOTO(out, err);
2398         case IOC_OSC_SET_ACTIVE:
2399                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2400                                                data->ioc_offset);
2401                 GOTO(out, err);
2402         case OBD_IOC_PING_TARGET:
2403                 err = ptlrpc_obd_ping(obd);
2404                 GOTO(out, err);
2405         default:
2406                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2407                        cmd, current_comm());
2408                 GOTO(out, err = -ENOTTY);
2409         }
2410 out:
2411         module_put(THIS_MODULE);
2412         return err;
2413 }
2414
2415 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2416                               u32 keylen, void *key,
2417                               u32 vallen, void *val,
2418                               struct ptlrpc_request_set *set)
2419 {
2420         struct ptlrpc_request *req;
2421         struct obd_device     *obd = exp->exp_obd;
2422         struct obd_import     *imp = class_exp2cliimp(exp);
2423         char                  *tmp;
2424         int                    rc;
2425         ENTRY;
2426
2427         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2428
2429         if (KEY_IS(KEY_CHECKSUM)) {
2430                 if (vallen != sizeof(int))
2431                         RETURN(-EINVAL);
2432                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2433                 RETURN(0);
2434         }
2435
2436         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2437                 sptlrpc_conf_client_adapt(obd);
2438                 RETURN(0);
2439         }
2440
2441         if (KEY_IS(KEY_FLUSH_CTX)) {
2442                 sptlrpc_import_flush_my_ctx(imp);
2443                 RETURN(0);
2444         }
2445
2446         if (KEY_IS(KEY_CACHE_SET)) {
2447                 struct client_obd *cli = &obd->u.cli;
2448
2449                 LASSERT(cli->cl_cache == NULL); /* only once */
2450                 cli->cl_cache = (struct cl_client_cache *)val;
2451                 cl_cache_incref(cli->cl_cache);
2452                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2453
2454                 /* add this osc into entity list */
2455                 LASSERT(list_empty(&cli->cl_lru_osc));
2456                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2457                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2458                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2459
2460                 RETURN(0);
2461         }
2462
2463         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2464                 struct client_obd *cli = &obd->u.cli;
2465                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2466                 long target = *(long *)val;
2467
2468                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2469                 *(long *)val -= nr;
2470                 RETURN(0);
2471         }
2472
2473         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2474                 RETURN(-EINVAL);
2475
2476         /* We pass all other commands directly to OST. Since nobody calls osc
2477            methods directly and everybody is supposed to go through LOV, we
2478            assume lov checked invalid values for us.
2479            The only recognised values so far are evict_by_nid and mds_conn.
2480            Even if something bad goes through, we'd get a -EINVAL from OST
2481            anyway. */
2482
2483         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2484                                                 &RQF_OST_SET_GRANT_INFO :
2485                                                 &RQF_OBD_SET_INFO);
2486         if (req == NULL)
2487                 RETURN(-ENOMEM);
2488
2489         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2490                              RCL_CLIENT, keylen);
2491         if (!KEY_IS(KEY_GRANT_SHRINK))
2492                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2493                                      RCL_CLIENT, vallen);
2494         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2495         if (rc) {
2496                 ptlrpc_request_free(req);
2497                 RETURN(rc);
2498         }
2499
2500         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2501         memcpy(tmp, key, keylen);
2502         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2503                                                         &RMF_OST_BODY :
2504                                                         &RMF_SETINFO_VAL);
2505         memcpy(tmp, val, vallen);
2506
2507         if (KEY_IS(KEY_GRANT_SHRINK)) {
2508                 struct osc_grant_args *aa;
2509                 struct obdo *oa;
2510
2511                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2512                 aa = ptlrpc_req_async_args(req);
2513                 OBDO_ALLOC(oa);
2514                 if (!oa) {
2515                         ptlrpc_req_finished(req);
2516                         RETURN(-ENOMEM);
2517                 }
2518                 *oa = ((struct ost_body *)val)->oa;
2519                 aa->aa_oa = oa;
2520                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2521         }
2522
2523         ptlrpc_request_set_replen(req);
2524         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2525                 LASSERT(set != NULL);
2526                 ptlrpc_set_add_req(set, req);
2527                 ptlrpc_check_set(NULL, set);
2528         } else {
2529                 ptlrpcd_add_req(req);
2530         }
2531
2532         RETURN(0);
2533 }
2534
2535 static int osc_reconnect(const struct lu_env *env,
2536                          struct obd_export *exp, struct obd_device *obd,
2537                          struct obd_uuid *cluuid,
2538                          struct obd_connect_data *data,
2539                          void *localdata)
2540 {
2541         struct client_obd *cli = &obd->u.cli;
2542
2543         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2544                 long lost_grant;
2545                 long grant;
2546
2547                 spin_lock(&cli->cl_loi_list_lock);
2548                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2549                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2550                         grant += cli->cl_dirty_grant;
2551                 else
2552                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2553                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2554                 lost_grant = cli->cl_lost_grant;
2555                 cli->cl_lost_grant = 0;
2556                 spin_unlock(&cli->cl_loi_list_lock);
2557
2558                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2559                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2560                        data->ocd_version, data->ocd_grant, lost_grant);
2561         }
2562
2563         RETURN(0);
2564 }
2565
2566 static int osc_disconnect(struct obd_export *exp)
2567 {
2568         struct obd_device *obd = class_exp2obd(exp);
2569         int rc;
2570
2571         rc = client_disconnect_export(exp);
2572         /**
2573          * Initially we put del_shrink_grant before disconnect_export, but it
2574          * causes the following problem if setup (connect) and cleanup
2575          * (disconnect) are tangled together.
2576          *      connect p1                     disconnect p2
2577          *   ptlrpc_connect_import
2578          *     ...............               class_manual_cleanup
2579          *                                     osc_disconnect
2580          *                                     del_shrink_grant
2581          *   ptlrpc_connect_interrupt
2582          *     init_grant_shrink
2583          *   add this client to shrink list
2584          *                                      cleanup_osc
2585          * Bang! pinger trigger the shrink.
2586          * So the osc should be disconnected from the shrink list, after we
2587          * are sure the import has been destroyed. BUG18662
2588          */
2589         if (obd->u.cli.cl_import == NULL)
2590                 osc_del_shrink_grant(&obd->u.cli);
2591         return rc;
2592 }
2593
2594 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2595         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2596 {
2597         struct lu_env *env = arg;
2598         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2599         struct ldlm_lock *lock;
2600         struct osc_object *osc = NULL;
2601         ENTRY;
2602
2603         lock_res(res);
2604         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2605                 if (lock->l_ast_data != NULL && osc == NULL) {
2606                         osc = lock->l_ast_data;
2607                         cl_object_get(osc2cl(osc));
2608                 }
2609
2610                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2611                  * by the 2nd round of ldlm_namespace_clean() call in
2612                  * osc_import_event(). */
2613                 ldlm_clear_cleaned(lock);
2614         }
2615         unlock_res(res);
2616
2617         if (osc != NULL) {
2618                 osc_object_invalidate(env, osc);
2619                 cl_object_put(env, osc2cl(osc));
2620         }
2621
2622         RETURN(0);
2623 }
2624
2625 static int osc_import_event(struct obd_device *obd,
2626                             struct obd_import *imp,
2627                             enum obd_import_event event)
2628 {
2629         struct client_obd *cli;
2630         int rc = 0;
2631
2632         ENTRY;
2633         LASSERT(imp->imp_obd == obd);
2634
2635         switch (event) {
2636         case IMP_EVENT_DISCON: {
2637                 cli = &obd->u.cli;
2638                 spin_lock(&cli->cl_loi_list_lock);
2639                 cli->cl_avail_grant = 0;
2640                 cli->cl_lost_grant = 0;
2641                 spin_unlock(&cli->cl_loi_list_lock);
2642                 break;
2643         }
2644         case IMP_EVENT_INACTIVE: {
2645                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2646                 break;
2647         }
2648         case IMP_EVENT_INVALIDATE: {
2649                 struct ldlm_namespace *ns = obd->obd_namespace;
2650                 struct lu_env         *env;
2651                 __u16                  refcheck;
2652
2653                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2654
2655                 env = cl_env_get(&refcheck);
2656                 if (!IS_ERR(env)) {
2657                         osc_io_unplug(env, &obd->u.cli, NULL);
2658
2659                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2660                                                  osc_ldlm_resource_invalidate,
2661                                                  env, 0);
2662                         cl_env_put(env, &refcheck);
2663
2664                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2665                 } else
2666                         rc = PTR_ERR(env);
2667                 break;
2668         }
2669         case IMP_EVENT_ACTIVE: {
2670                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2671                 break;
2672         }
2673         case IMP_EVENT_OCD: {
2674                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2675
2676                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2677                         osc_init_grant(&obd->u.cli, ocd);
2678
2679                 /* See bug 7198 */
2680                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2681                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2682
2683                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2684                 break;
2685         }
2686         case IMP_EVENT_DEACTIVATE: {
2687                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2688                 break;
2689         }
2690         case IMP_EVENT_ACTIVATE: {
2691                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2692                 break;
2693         }
2694         default:
2695                 CERROR("Unknown import event %d\n", event);
2696                 LBUG();
2697         }
2698         RETURN(rc);
2699 }
2700
2701 /**
2702  * Determine whether the lock can be canceled before replaying the lock
2703  * during recovery, see bug16774 for detailed information.
2704  *
2705  * \retval zero the lock can't be canceled
2706  * \retval other ok to cancel
2707  */
2708 static int osc_cancel_weight(struct ldlm_lock *lock)
2709 {
2710         /*
2711          * Cancel all unused and granted extent lock.
2712          */
2713         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2714             lock->l_granted_mode == lock->l_req_mode &&
2715             osc_ldlm_weigh_ast(lock) == 0)
2716                 RETURN(1);
2717
2718         RETURN(0);
2719 }
2720
2721 static int brw_queue_work(const struct lu_env *env, void *data)
2722 {
2723         struct client_obd *cli = data;
2724
2725         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2726
2727         osc_io_unplug(env, cli, NULL);
2728         RETURN(0);
2729 }
2730
2731 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2732 {
2733         struct client_obd *cli = &obd->u.cli;
2734         struct obd_type   *type;
2735         void              *handler;
2736         int                rc;
2737         int                adding;
2738         int                added;
2739         int                req_count;
2740         ENTRY;
2741
2742         rc = ptlrpcd_addref();
2743         if (rc)
2744                 RETURN(rc);
2745
2746         rc = client_obd_setup(obd, lcfg);
2747         if (rc)
2748                 GOTO(out_ptlrpcd, rc);
2749
2750         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2751         if (IS_ERR(handler))
2752                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2753         cli->cl_writeback_work = handler;
2754
2755         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2756         if (IS_ERR(handler))
2757                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2758         cli->cl_lru_work = handler;
2759
2760         rc = osc_quota_setup(obd);
2761         if (rc)
2762                 GOTO(out_ptlrpcd_work, rc);
2763
2764         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2765
2766 #ifdef CONFIG_PROC_FS
2767         obd->obd_vars = lprocfs_osc_obd_vars;
2768 #endif
2769         /* If this is true then both client (osc) and server (osp) are on the
2770          * same node. The osp layer if loaded first will register the osc proc
2771          * directory. In that case this obd_device will be attached its proc
2772          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2773         type = class_search_type(LUSTRE_OSP_NAME);
2774         if (type && type->typ_procsym) {
2775                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2776                                                        type->typ_procsym,
2777                                                        obd->obd_vars, obd);
2778                 if (IS_ERR(obd->obd_proc_entry)) {
2779                         rc = PTR_ERR(obd->obd_proc_entry);
2780                         CERROR("error %d setting up lprocfs for %s\n", rc,
2781                                obd->obd_name);
2782                         obd->obd_proc_entry = NULL;
2783                 }
2784         } else {
2785                 rc = lprocfs_obd_setup(obd);
2786         }
2787
2788         /* If the basic OSC proc tree construction succeeded then
2789          * lets do the rest. */
2790         if (rc == 0) {
2791                 lproc_osc_attach_seqstat(obd);
2792                 sptlrpc_lprocfs_cliobd_attach(obd);
2793                 ptlrpc_lprocfs_register_obd(obd);
2794         }
2795
2796         /*
2797          * We try to control the total number of requests with a upper limit
2798          * osc_reqpool_maxreqcount. There might be some race which will cause
2799          * over-limit allocation, but it is fine.
2800          */
2801         req_count = atomic_read(&osc_pool_req_count);
2802         if (req_count < osc_reqpool_maxreqcount) {
2803                 adding = cli->cl_max_rpcs_in_flight + 2;
2804                 if (req_count + adding > osc_reqpool_maxreqcount)
2805                         adding = osc_reqpool_maxreqcount - req_count;
2806
2807                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2808                 atomic_add(added, &osc_pool_req_count);
2809         }
2810
2811         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2812         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2813
2814         spin_lock(&osc_shrink_lock);
2815         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2816         spin_unlock(&osc_shrink_lock);
2817
2818         RETURN(0);
2819
2820 out_ptlrpcd_work:
2821         if (cli->cl_writeback_work != NULL) {
2822                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2823                 cli->cl_writeback_work = NULL;
2824         }
2825         if (cli->cl_lru_work != NULL) {
2826                 ptlrpcd_destroy_work(cli->cl_lru_work);
2827                 cli->cl_lru_work = NULL;
2828         }
2829 out_client_setup:
2830         client_obd_cleanup(obd);
2831 out_ptlrpcd:
2832         ptlrpcd_decref();
2833         RETURN(rc);
2834 }
2835
2836 static int osc_precleanup(struct obd_device *obd)
2837 {
2838         struct client_obd *cli = &obd->u.cli;
2839         ENTRY;
2840
2841         /* LU-464
2842          * for echo client, export may be on zombie list, wait for
2843          * zombie thread to cull it, because cli.cl_import will be
2844          * cleared in client_disconnect_export():
2845          *   class_export_destroy() -> obd_cleanup() ->
2846          *   echo_device_free() -> echo_client_cleanup() ->
2847          *   obd_disconnect() -> osc_disconnect() ->
2848          *   client_disconnect_export()
2849          */
2850         obd_zombie_barrier();
2851         if (cli->cl_writeback_work) {
2852                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2853                 cli->cl_writeback_work = NULL;
2854         }
2855
2856         if (cli->cl_lru_work) {
2857                 ptlrpcd_destroy_work(cli->cl_lru_work);
2858                 cli->cl_lru_work = NULL;
2859         }
2860
2861         obd_cleanup_client_import(obd);
2862         ptlrpc_lprocfs_unregister_obd(obd);
2863         lprocfs_obd_cleanup(obd);
2864         RETURN(0);
2865 }
2866
2867 int osc_cleanup(struct obd_device *obd)
2868 {
2869         struct client_obd *cli = &obd->u.cli;
2870         int rc;
2871
2872         ENTRY;
2873
2874         spin_lock(&osc_shrink_lock);
2875         list_del(&cli->cl_shrink_list);
2876         spin_unlock(&osc_shrink_lock);
2877
2878         /* lru cleanup */
2879         if (cli->cl_cache != NULL) {
2880                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2881                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2882                 list_del_init(&cli->cl_lru_osc);
2883                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2884                 cli->cl_lru_left = NULL;
2885                 cl_cache_decref(cli->cl_cache);
2886                 cli->cl_cache = NULL;
2887         }
2888
2889         /* free memory of osc quota cache */
2890         osc_quota_cleanup(obd);
2891
2892         rc = client_obd_cleanup(obd);
2893
2894         ptlrpcd_decref();
2895         RETURN(rc);
2896 }
2897
2898 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2899 {
2900         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2901         return rc > 0 ? 0: rc;
2902 }
2903
2904 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2905 {
2906         return osc_process_config_base(obd, buf);
2907 }
2908
2909 static struct obd_ops osc_obd_ops = {
2910         .o_owner                = THIS_MODULE,
2911         .o_setup                = osc_setup,
2912         .o_precleanup           = osc_precleanup,
2913         .o_cleanup              = osc_cleanup,
2914         .o_add_conn             = client_import_add_conn,
2915         .o_del_conn             = client_import_del_conn,
2916         .o_connect              = client_connect_import,
2917         .o_reconnect            = osc_reconnect,
2918         .o_disconnect           = osc_disconnect,
2919         .o_statfs               = osc_statfs,
2920         .o_statfs_async         = osc_statfs_async,
2921         .o_create               = osc_create,
2922         .o_destroy              = osc_destroy,
2923         .o_getattr              = osc_getattr,
2924         .o_setattr              = osc_setattr,
2925         .o_iocontrol            = osc_iocontrol,
2926         .o_set_info_async       = osc_set_info_async,
2927         .o_import_event         = osc_import_event,
2928         .o_process_config       = osc_process_config,
2929         .o_quotactl             = osc_quotactl,
2930 };
2931
2932 static struct shrinker *osc_cache_shrinker;
2933 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2934 DEFINE_SPINLOCK(osc_shrink_lock);
2935
2936 #ifndef HAVE_SHRINKER_COUNT
2937 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2938 {
2939         struct shrink_control scv = {
2940                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2941                 .gfp_mask   = shrink_param(sc, gfp_mask)
2942         };
2943 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2944         struct shrinker *shrinker = NULL;
2945 #endif
2946
2947         (void)osc_cache_shrink_scan(shrinker, &scv);
2948
2949         return osc_cache_shrink_count(shrinker, &scv);
2950 }
2951 #endif
2952
2953 static int __init osc_init(void)
2954 {
2955         bool enable_proc = true;
2956         struct obd_type *type;
2957         unsigned int reqpool_size;
2958         unsigned int reqsize;
2959         int rc;
2960         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2961                          osc_cache_shrink_count, osc_cache_shrink_scan);
2962         ENTRY;
2963
2964         /* print an address of _any_ initialized kernel symbol from this
2965          * module, to allow debugging with gdb that doesn't support data
2966          * symbols from modules.*/
2967         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2968
2969         rc = lu_kmem_init(osc_caches);
2970         if (rc)
2971                 RETURN(rc);
2972
2973         type = class_search_type(LUSTRE_OSP_NAME);
2974         if (type != NULL && type->typ_procsym != NULL)
2975                 enable_proc = false;
2976
2977         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2978                                  LUSTRE_OSC_NAME, &osc_device_type);
2979         if (rc)
2980                 GOTO(out_kmem, rc);
2981
2982         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2983
2984         /* This is obviously too much memory, only prevent overflow here */
2985         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2986                 GOTO(out_type, rc = -EINVAL);
2987
2988         reqpool_size = osc_reqpool_mem_max << 20;
2989
2990         reqsize = 1;
2991         while (reqsize < OST_IO_MAXREQSIZE)
2992                 reqsize = reqsize << 1;
2993
2994         /*
2995          * We don't enlarge the request count in OSC pool according to
2996          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2997          * tried after normal allocation failed. So a small OSC pool won't
2998          * cause much performance degression in most of cases.
2999          */
3000         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3001
3002         atomic_set(&osc_pool_req_count, 0);
3003         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3004                                           ptlrpc_add_rqs_to_pool);
3005
3006         if (osc_rq_pool != NULL)
3007                 GOTO(out, rc);
3008         rc = -ENOMEM;
3009 out_type:
3010         class_unregister_type(LUSTRE_OSC_NAME);
3011 out_kmem:
3012         lu_kmem_fini(osc_caches);
3013 out:
3014         RETURN(rc);
3015 }
3016
3017 static void __exit osc_exit(void)
3018 {
3019         remove_shrinker(osc_cache_shrinker);
3020         class_unregister_type(LUSTRE_OSC_NAME);
3021         lu_kmem_fini(osc_caches);
3022         ptlrpc_free_rq_pool(osc_rq_pool);
3023 }
3024
3025 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3026 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3027 MODULE_VERSION(LUSTRE_VERSION_STRING);
3028 MODULE_LICENSE("GPL");
3029
3030 module_init(osc_init);
3031 module_exit(osc_exit);