Whamcloud - gitweb
LU-4017 quota: redefine LL_MAXQUOTAS for Lustre
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_ladvise_args {
95         struct obdo             *la_oa;
96         obd_enqueue_update_f     la_upcall;
97         void                    *la_cookie;
98 };
99
100 struct osc_enqueue_args {
101         struct obd_export       *oa_exp;
102         enum ldlm_type          oa_type;
103         enum ldlm_mode          oa_mode;
104         __u64                   *oa_flags;
105         osc_enqueue_upcall_f    oa_upcall;
106         void                    *oa_cookie;
107         struct ost_lvb          *oa_lvb;
108         struct lustre_handle    oa_lockh;
109         unsigned int            oa_agl:1;
110 };
111
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
114                          void *data, int rc);
115
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
117 {
118         struct ost_body *body;
119
120         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
121         LASSERT(body);
122
123         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
124 }
125
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
127                        struct obdo *oa)
128 {
129         struct ptlrpc_request   *req;
130         struct ost_body         *body;
131         int                      rc;
132
133         ENTRY;
134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
135         if (req == NULL)
136                 RETURN(-ENOMEM);
137
138         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
139         if (rc) {
140                 ptlrpc_request_free(req);
141                 RETURN(rc);
142         }
143
144         osc_pack_req_body(req, oa);
145
146         ptlrpc_request_set_replen(req);
147
148         rc = ptlrpc_queue_wait(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
153         if (body == NULL)
154                 GOTO(out, rc = -EPROTO);
155
156         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
158
159         oa->o_blksize = cli_brw_size(exp->exp_obd);
160         oa->o_valid |= OBD_MD_FLBLKSZ;
161
162         EXIT;
163 out:
164         ptlrpc_req_finished(req);
165
166         return rc;
167 }
168
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
170                        struct obdo *oa)
171 {
172         struct ptlrpc_request   *req;
173         struct ost_body         *body;
174         int                      rc;
175
176         ENTRY;
177         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
178
179         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
180         if (req == NULL)
181                 RETURN(-ENOMEM);
182
183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
184         if (rc) {
185                 ptlrpc_request_free(req);
186                 RETURN(rc);
187         }
188
189         osc_pack_req_body(req, oa);
190
191         ptlrpc_request_set_replen(req);
192
193         rc = ptlrpc_queue_wait(req);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
198         if (body == NULL)
199                 GOTO(out, rc = -EPROTO);
200
201         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
202
203         EXIT;
204 out:
205         ptlrpc_req_finished(req);
206
207         RETURN(rc);
208 }
209
210 static int osc_setattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_setattr_args *sa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 GOTO(out, rc = -EPROTO);
223
224         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
225                              &body->oa);
226 out:
227         rc = sa->sa_upcall(sa->sa_cookie, rc);
228         RETURN(rc);
229 }
230
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232                       obd_enqueue_update_f upcall, void *cookie,
233                       struct ptlrpc_request_set *rqset)
234 {
235         struct ptlrpc_request   *req;
236         struct osc_setattr_args *sa;
237         int                      rc;
238
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oa);
252
253         ptlrpc_request_set_replen(req);
254
255         /* do mds to ost setattr asynchronously */
256         if (!rqset) {
257                 /* Do not wait for response. */
258                 ptlrpcd_add_req(req);
259         } else {
260                 req->rq_interpret_reply =
261                         (ptlrpc_interpterer_t)osc_setattr_interpret;
262
263                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264                 sa = ptlrpc_req_async_args(req);
265                 sa->sa_oa = oa;
266                 sa->sa_upcall = upcall;
267                 sa->sa_cookie = cookie;
268
269                 if (rqset == PTLRPCD_SET)
270                         ptlrpcd_add_req(req);
271                 else
272                         ptlrpc_set_add_req(rqset, req);
273         }
274
275         RETURN(0);
276 }
277
278 static int osc_ladvise_interpret(const struct lu_env *env,
279                                  struct ptlrpc_request *req,
280                                  void *arg, int rc)
281 {
282         struct osc_ladvise_args *la = arg;
283         struct ost_body *body;
284         ENTRY;
285
286         if (rc != 0)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         *la->la_oa = body->oa;
294 out:
295         rc = la->la_upcall(la->la_cookie, rc);
296         RETURN(rc);
297 }
298
299 /**
300  * If rqset is NULL, do not wait for response. Upcall and cookie could also
301  * be NULL in this case
302  */
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304                      struct ladvise_hdr *ladvise_hdr,
305                      obd_enqueue_update_f upcall, void *cookie,
306                      struct ptlrpc_request_set *rqset)
307 {
308         struct ptlrpc_request   *req;
309         struct ost_body         *body;
310         struct osc_ladvise_args *la;
311         int                      rc;
312         struct lu_ladvise       *req_ladvise;
313         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
314         int                      num_advise = ladvise_hdr->lah_count;
315         struct ladvise_hdr      *req_ladvise_hdr;
316         ENTRY;
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323                              num_advise * sizeof(*ladvise));
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
325         if (rc != 0) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329         req->rq_request_portal = OST_IO_PORTAL;
330         ptlrpc_at_set_req_timeout(req);
331
332         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
333         LASSERT(body);
334         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
335                              oa);
336
337         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338                                                  &RMF_OST_LADVISE_HDR);
339         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
340
341         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343         ptlrpc_request_set_replen(req);
344
345         if (rqset == NULL) {
346                 /* Do not wait for response. */
347                 ptlrpcd_add_req(req);
348                 RETURN(0);
349         }
350
351         req->rq_interpret_reply = osc_ladvise_interpret;
352         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353         la = ptlrpc_req_async_args(req);
354         la->la_oa = oa;
355         la->la_upcall = upcall;
356         la->la_cookie = cookie;
357
358         if (rqset == PTLRPCD_SET)
359                 ptlrpcd_add_req(req);
360         else
361                 ptlrpc_set_add_req(rqset, req);
362
363         RETURN(0);
364 }
365
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
367                       struct obdo *oa)
368 {
369         struct ptlrpc_request *req;
370         struct ost_body       *body;
371         int                    rc;
372         ENTRY;
373
374         LASSERT(oa != NULL);
375         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
379         if (req == NULL)
380                 GOTO(out, rc = -ENOMEM);
381
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 GOTO(out, rc);
386         }
387
388         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
389         LASSERT(body);
390
391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
392
393         ptlrpc_request_set_replen(req);
394
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL)
401                 GOTO(out_req, rc = -EPROTO);
402
403         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
405
406         oa->o_blksize = cli_brw_size(exp->exp_obd);
407         oa->o_valid |= OBD_MD_FLBLKSZ;
408
409         CDEBUG(D_HA, "transno: "LPD64"\n",
410                lustre_msg_get_transno(req->rq_repmsg));
411 out_req:
412         ptlrpc_req_finished(req);
413 out:
414         RETURN(rc);
415 }
416
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418                    obd_enqueue_update_f upcall, void *cookie,
419                    struct ptlrpc_request_set *rqset)
420 {
421         struct ptlrpc_request   *req;
422         struct osc_setattr_args *sa;
423         struct ost_body         *body;
424         int                      rc;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
428         if (req == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(rc);
435         }
436         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437         ptlrpc_at_set_req_timeout(req);
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
442
443         ptlrpc_request_set_replen(req);
444
445         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447         sa = ptlrpc_req_async_args(req);
448         sa->sa_oa = oa;
449         sa->sa_upcall = upcall;
450         sa->sa_cookie = cookie;
451         if (rqset == PTLRPCD_SET)
452                 ptlrpcd_add_req(req);
453         else
454                 ptlrpc_set_add_req(rqset, req);
455
456         RETURN(0);
457 }
458
459 static int osc_sync_interpret(const struct lu_env *env,
460                               struct ptlrpc_request *req,
461                               void *arg, int rc)
462 {
463         struct osc_fsync_args   *fa = arg;
464         struct ost_body         *body;
465         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
466         unsigned long           valid = 0;
467         struct cl_object        *obj;
468         ENTRY;
469
470         if (rc != 0)
471                 GOTO(out, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL) {
475                 CERROR("can't unpack ost_body\n");
476                 GOTO(out, rc = -EPROTO);
477         }
478
479         *fa->fa_oa = body->oa;
480         obj = osc2cl(fa->fa_obj);
481
482         /* Update osc object's blocks attribute */
483         cl_object_attr_lock(obj);
484         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485                 attr->cat_blocks = body->oa.o_blocks;
486                 valid |= CAT_BLOCKS;
487         }
488
489         if (valid != 0)
490                 cl_object_attr_update(env, obj, attr, valid);
491         cl_object_attr_unlock(obj);
492
493 out:
494         rc = fa->fa_upcall(fa->fa_cookie, rc);
495         RETURN(rc);
496 }
497
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499                   obd_enqueue_update_f upcall, void *cookie,
500                   struct ptlrpc_request_set *rqset)
501 {
502         struct obd_export     *exp = osc_export(obj);
503         struct ptlrpc_request *req;
504         struct ost_body       *body;
505         struct osc_fsync_args *fa;
506         int                    rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
510         if (req == NULL)
511                 RETURN(-ENOMEM);
512
513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(rc);
517         }
518
519         /* overload the size and blocks fields in the oa with start/end */
520         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
521         LASSERT(body);
522         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
523
524         ptlrpc_request_set_replen(req);
525         req->rq_interpret_reply = osc_sync_interpret;
526
527         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528         fa = ptlrpc_req_async_args(req);
529         fa->fa_obj = obj;
530         fa->fa_oa = oa;
531         fa->fa_upcall = upcall;
532         fa->fa_cookie = cookie;
533
534         if (rqset == PTLRPCD_SET)
535                 ptlrpcd_add_req(req);
536         else
537                 ptlrpc_set_add_req(rqset, req);
538
539         RETURN (0);
540 }
541
542 /* Find and cancel locally locks matched by @mode in the resource found by
543  * @objid. Found locks are added into @cancel list. Returns the amount of
544  * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546                                    struct list_head *cancels,
547                                    enum ldlm_mode mode, __u64 lock_flags)
548 {
549         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550         struct ldlm_res_id res_id;
551         struct ldlm_resource *res;
552         int count;
553         ENTRY;
554
555         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556          * export) but disabled through procfs (flag in NS).
557          *
558          * This distinguishes from a case when ELC is not supported originally,
559          * when we still want to cancel locks in advance and just cancel them
560          * locally, without sending any RPC. */
561         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
562                 RETURN(0);
563
564         ostid_build_res_name(&oa->o_oi, &res_id);
565         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
566         if (IS_ERR(res))
567                 RETURN(0);
568
569         LDLM_RESOURCE_ADDREF(res);
570         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571                                            lock_flags, 0, NULL);
572         LDLM_RESOURCE_DELREF(res);
573         ldlm_resource_putref(res);
574         RETURN(count);
575 }
576
577 static int osc_destroy_interpret(const struct lu_env *env,
578                                  struct ptlrpc_request *req, void *data,
579                                  int rc)
580 {
581         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
582
583         atomic_dec(&cli->cl_destroy_in_flight);
584         wake_up(&cli->cl_destroy_waitq);
585         return 0;
586 }
587
588 static int osc_can_send_destroy(struct client_obd *cli)
589 {
590         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591             cli->cl_max_rpcs_in_flight) {
592                 /* The destroy request can be sent */
593                 return 1;
594         }
595         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596             cli->cl_max_rpcs_in_flight) {
597                 /*
598                  * The counter has been modified between the two atomic
599                  * operations.
600                  */
601                 wake_up(&cli->cl_destroy_waitq);
602         }
603         return 0;
604 }
605
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
607                        struct obdo *oa)
608 {
609         struct client_obd     *cli = &exp->exp_obd->u.cli;
610         struct ptlrpc_request *req;
611         struct ost_body       *body;
612         struct list_head       cancels = LIST_HEAD_INIT(cancels);
613         int rc, count;
614         ENTRY;
615
616         if (!oa) {
617                 CDEBUG(D_INFO, "oa NULL\n");
618                 RETURN(-EINVAL);
619         }
620
621         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622                                         LDLM_FL_DISCARD_DATA);
623
624         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
625         if (req == NULL) {
626                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
627                 RETURN(-ENOMEM);
628         }
629
630         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
631                                0, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(rc);
635         }
636
637         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638         ptlrpc_at_set_req_timeout(req);
639
640         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
641         LASSERT(body);
642         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
643
644         ptlrpc_request_set_replen(req);
645
646         req->rq_interpret_reply = osc_destroy_interpret;
647         if (!osc_can_send_destroy(cli)) {
648                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
649
650                 /*
651                  * Wait until the number of on-going destroy RPCs drops
652                  * under max_rpc_in_flight
653                  */
654                 l_wait_event_exclusive(cli->cl_destroy_waitq,
655                                        osc_can_send_destroy(cli), &lwi);
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
927         }
928
929         if (cli->cl_avail_grant < 0) {
930                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931                       cli_name(cli), cli->cl_avail_grant,
932                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933                 /* workaround for servers which do not have the patch from
934                  * LU-2679 */
935                 cli->cl_avail_grant = ocd->ocd_grant;
936         }
937
938         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
939                 u64 size;
940
941                 /* overhead for each extent insertion */
942                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943                 /* determine the appropriate chunk size used by osc_extent. */
944                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945                                           ocd->ocd_grant_blkbits);
946                 /* determine maximum extent size, in #pages */
947                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949                 if (cli->cl_max_extent_pages == 0)
950                         cli->cl_max_extent_pages = 1;
951         } else {
952                 cli->cl_grant_extent_tax = 0;
953                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
955         }
956         spin_unlock(&cli->cl_loi_list_lock);
957
958         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959                 "chunk bits: %d cl_max_extent_pages: %d\n",
960                 cli_name(cli),
961                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962                 cli->cl_max_extent_pages);
963
964         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965             list_empty(&cli->cl_grant_shrink_list))
966                 osc_add_shrink_grant(cli);
967 }
968
969 /* We assume that the reason this OSC got a short read is because it read
970  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971  * via the LOV, and it _knows_ it's reading inside the file, it's just that
972  * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974                               struct brw_page **pga)
975 {
976         char *ptr;
977         int i = 0;
978
979         /* skip bytes read OK */
980         while (nob_read > 0) {
981                 LASSERT (page_count > 0);
982
983                 if (pga[i]->count > nob_read) {
984                         /* EOF inside this page */
985                         ptr = kmap(pga[i]->pg) +
986                                 (pga[i]->off & ~PAGE_MASK);
987                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988                         kunmap(pga[i]->pg);
989                         page_count--;
990                         i++;
991                         break;
992                 }
993
994                 nob_read -= pga[i]->count;
995                 page_count--;
996                 i++;
997         }
998
999         /* zero remaining pages */
1000         while (page_count-- > 0) {
1001                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002                 memset(ptr, 0, pga[i]->count);
1003                 kunmap(pga[i]->pg);
1004                 i++;
1005         }
1006 }
1007
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009                            int requested_nob, int niocount,
1010                            size_t page_count, struct brw_page **pga)
1011 {
1012         int     i;
1013         __u32   *remote_rcs;
1014
1015         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016                                                   sizeof(*remote_rcs) *
1017                                                   niocount);
1018         if (remote_rcs == NULL) {
1019                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1020                 return(-EPROTO);
1021         }
1022
1023         /* return error if any niobuf was in error */
1024         for (i = 0; i < niocount; i++) {
1025                 if ((int)remote_rcs[i] < 0)
1026                         return(remote_rcs[i]);
1027
1028                 if (remote_rcs[i] != 0) {
1029                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030                                 i, remote_rcs[i], req);
1031                         return(-EPROTO);
1032                 }
1033         }
1034
1035         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037                        req->rq_bulk->bd_nob_transferred, requested_nob);
1038                 return(-EPROTO);
1039         }
1040
1041         return (0);
1042 }
1043
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1045 {
1046         if (p1->flag != p2->flag) {
1047                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1049                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1050
1051                 /* warn if we try to combine flags that we don't know to be
1052                  * safe to combine */
1053                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055                               "report this at https://jira.hpdd.intel.com/\n",
1056                               p1->flag, p2->flag);
1057                 }
1058                 return 0;
1059         }
1060
1061         return (p1->off + p1->count == p2->off);
1062 }
1063
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065                              struct brw_page **pga, int opc,
1066                              cksum_type_t cksum_type)
1067 {
1068         u32                             cksum;
1069         int                             i = 0;
1070         struct cfs_crypto_hash_desc     *hdesc;
1071         unsigned int                    bufsize;
1072         int                             err;
1073         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1074
1075         LASSERT(pg_count > 0);
1076
1077         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078         if (IS_ERR(hdesc)) {
1079                 CERROR("Unable to initialize checksum hash %s\n",
1080                        cfs_crypto_hash_name(cfs_alg));
1081                 return PTR_ERR(hdesc);
1082         }
1083
1084         while (nob > 0 && pg_count > 0) {
1085                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1086
1087                 /* corrupt the data before we compute the checksum, to
1088                  * simulate an OST->client data error */
1089                 if (i == 0 && opc == OST_READ &&
1090                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091                         unsigned char *ptr = kmap(pga[i]->pg);
1092                         int off = pga[i]->off & ~PAGE_MASK;
1093
1094                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1095                         kunmap(pga[i]->pg);
1096                 }
1097                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098                                             pga[i]->off & ~PAGE_MASK,
1099                                             count);
1100                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101                                (int)(pga[i]->off & ~PAGE_MASK));
1102
1103                 nob -= pga[i]->count;
1104                 pg_count--;
1105                 i++;
1106         }
1107
1108         bufsize = sizeof(cksum);
1109         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1110
1111         /* For sending we only compute the wrong checksum instead
1112          * of corrupting the data so it is still correct on a redo */
1113         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1114                 cksum++;
1115
1116         return cksum;
1117 }
1118
1119 static int
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121                      u32 page_count, struct brw_page **pga,
1122                      struct ptlrpc_request **reqp, int resend)
1123 {
1124         struct ptlrpc_request   *req;
1125         struct ptlrpc_bulk_desc *desc;
1126         struct ost_body         *body;
1127         struct obd_ioobj        *ioobj;
1128         struct niobuf_remote    *niobuf;
1129         int niocount, i, requested_nob, opc, rc;
1130         struct osc_brw_async_args *aa;
1131         struct req_capsule      *pill;
1132         struct brw_page *pg_prev;
1133
1134         ENTRY;
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136                 RETURN(-ENOMEM); /* Recoverable */
1137         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138                 RETURN(-EINVAL); /* Fatal */
1139
1140         if ((cmd & OBD_BRW_WRITE) != 0) {
1141                 opc = OST_WRITE;
1142                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1143                                                 osc_rq_pool,
1144                                                 &RQF_OST_BRW_WRITE);
1145         } else {
1146                 opc = OST_READ;
1147                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1148         }
1149         if (req == NULL)
1150                 RETURN(-ENOMEM);
1151
1152         for (niocount = i = 1; i < page_count; i++) {
1153                 if (!can_merge_pages(pga[i - 1], pga[i]))
1154                         niocount++;
1155         }
1156
1157         pill = &req->rq_pill;
1158         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1159                              sizeof(*ioobj));
1160         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161                              niocount * sizeof(*niobuf));
1162
1163         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1164         if (rc) {
1165                 ptlrpc_request_free(req);
1166                 RETURN(rc);
1167         }
1168         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169         ptlrpc_at_set_req_timeout(req);
1170         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1171          * retry logic */
1172         req->rq_no_retry_einprogress = 1;
1173
1174         desc = ptlrpc_prep_bulk_imp(req, page_count,
1175                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177                         PTLRPC_BULK_PUT_SINK) |
1178                         PTLRPC_BULK_BUF_KIOV,
1179                 OST_BULK_PORTAL,
1180                 &ptlrpc_bulk_kiov_pin_ops);
1181
1182         if (desc == NULL)
1183                 GOTO(out, rc = -ENOMEM);
1184         /* NB request now owns desc and will free it when it gets freed */
1185
1186         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1190
1191         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1192
1193         obdo_to_ioobj(oa, ioobj);
1194         ioobj->ioo_bufcnt = niocount;
1195         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196          * that might be send for this request.  The actual number is decided
1197          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198          * "max - 1" for old client compatibility sending "0", and also so the
1199          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201         LASSERT(page_count > 0);
1202         pg_prev = pga[0];
1203         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204                 struct brw_page *pg = pga[i];
1205                 int poff = pg->off & ~PAGE_MASK;
1206
1207                 LASSERT(pg->count > 0);
1208                 /* make sure there is no gap in the middle of page array */
1209                 LASSERTF(page_count == 1 ||
1210                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211                           ergo(i > 0 && i < page_count - 1,
1212                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1213                           ergo(i == page_count - 1, poff == 0)),
1214                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215                          i, page_count, pg, pg->off, pg->count);
1216                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1219                          i, page_count,
1220                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221                          pg_prev->pg, page_private(pg_prev->pg),
1222                          pg_prev->pg->index, pg_prev->off);
1223                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224                         (pg->flag & OBD_BRW_SRVLOCK));
1225
1226                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227                 requested_nob += pg->count;
1228
1229                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1230                         niobuf--;
1231                         niobuf->rnb_len += pg->count;
1232                 } else {
1233                         niobuf->rnb_offset = pg->off;
1234                         niobuf->rnb_len    = pg->count;
1235                         niobuf->rnb_flags  = pg->flag;
1236                 }
1237                 pg_prev = pg;
1238         }
1239
1240         LASSERTF((void *)(niobuf - niocount) ==
1241                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1244
1245         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1246         if (resend) {
1247                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1249                         body->oa.o_flags = 0;
1250                 }
1251                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1252         }
1253
1254         if (osc_should_shrink_grant(cli))
1255                 osc_shrink_grant_local(cli, &body->oa);
1256
1257         /* size[REQ_REC_OFF] still sizeof (*body) */
1258         if (opc == OST_WRITE) {
1259                 if (cli->cl_checksum &&
1260                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261                         /* store cl_cksum_type in a local variable since
1262                          * it can be changed via lprocfs */
1263                         cksum_type_t cksum_type = cli->cl_cksum_type;
1264
1265                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267                                 body->oa.o_flags = 0;
1268                         }
1269                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1270                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1272                                                              page_count, pga,
1273                                                              OST_WRITE,
1274                                                              cksum_type);
1275                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1276                                body->oa.o_cksum);
1277                         /* save this in 'oa', too, for later checking */
1278                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279                         oa->o_flags |= cksum_type_pack(cksum_type);
1280                 } else {
1281                         /* clear out the checksum flag, in case this is a
1282                          * resend but cl_checksum is no longer set. b=11238 */
1283                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1284                 }
1285                 oa->o_cksum = body->oa.o_cksum;
1286                 /* 1 RC per niobuf */
1287                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288                                      sizeof(__u32) * niocount);
1289         } else {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293                                 body->oa.o_flags = 0;
1294                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                 }
1297         }
1298         ptlrpc_request_set_replen(req);
1299
1300         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301         aa = ptlrpc_req_async_args(req);
1302         aa->aa_oa = oa;
1303         aa->aa_requested_nob = requested_nob;
1304         aa->aa_nio_count = niocount;
1305         aa->aa_page_count = page_count;
1306         aa->aa_resends = 0;
1307         aa->aa_ppga = pga;
1308         aa->aa_cli = cli;
1309         INIT_LIST_HEAD(&aa->aa_oaps);
1310
1311         *reqp = req;
1312         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316         RETURN(0);
1317
1318  out:
1319         ptlrpc_req_finished(req);
1320         RETURN(rc);
1321 }
1322
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324                                 __u32 client_cksum, __u32 server_cksum, int nob,
1325                                 size_t page_count, struct brw_page **pga,
1326                                 cksum_type_t client_cksum_type)
1327 {
1328         __u32 new_cksum;
1329         char *msg;
1330         cksum_type_t cksum_type;
1331
1332         if (server_cksum == client_cksum) {
1333                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1334                 return 0;
1335         }
1336
1337         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1338                                        oa->o_flags : 0);
1339         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1340                                       cksum_type);
1341
1342         if (cksum_type != client_cksum_type)
1343                 msg = "the server did not use the checksum type specified in "
1344                       "the original request - likely a protocol problem";
1345         else if (new_cksum == server_cksum)
1346                 msg = "changed on the client after we checksummed it - "
1347                       "likely false positive due to mmap IO (bug 11742)";
1348         else if (new_cksum == client_cksum)
1349                 msg = "changed in transit before arrival at OST";
1350         else
1351                 msg = "changed in transit AND doesn't match the original - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353
1354         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356                            msg, libcfs_nid2str(peer->nid),
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360                            POSTID(&oa->o_oi), pga[0]->off,
1361                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363                "client csum now %x\n", client_cksum, client_cksum_type,
1364                server_cksum, cksum_type, new_cksum);
1365         return 1;
1366 }
1367
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1370 {
1371         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372         const lnet_process_id_t *peer =
1373                         &req->rq_import->imp_connection->c_peer;
1374         struct client_obd *cli = aa->aa_cli;
1375         struct ost_body *body;
1376         u32 client_cksum = 0;
1377         ENTRY;
1378
1379         if (rc < 0 && rc != -EDQUOT) {
1380                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1381                 RETURN(rc);
1382         }
1383
1384         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1386         if (body == NULL) {
1387                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1388                 RETURN(-EPROTO);
1389         }
1390
1391         /* set/clear over quota flag for a uid/gid */
1392         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394                 unsigned int qid[LL_MAXQUOTAS] =
1395                                         {body->oa.o_uid, body->oa.o_gid};
1396
1397                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1398                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1399                        body->oa.o_flags);
1400                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1401         }
1402
1403         osc_update_grant(cli, body);
1404
1405         if (rc < 0)
1406                 RETURN(rc);
1407
1408         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1409                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1410
1411         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1412                 if (rc > 0) {
1413                         CERROR("Unexpected +ve rc %d\n", rc);
1414                         RETURN(-EPROTO);
1415                 }
1416                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1417
1418                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1419                         RETURN(-EAGAIN);
1420
1421                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422                     check_write_checksum(&body->oa, peer, client_cksum,
1423                                          body->oa.o_cksum, aa->aa_requested_nob,
1424                                          aa->aa_page_count, aa->aa_ppga,
1425                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1426                         RETURN(-EAGAIN);
1427
1428                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1429                                      aa->aa_page_count, aa->aa_ppga);
1430                 GOTO(out, rc);
1431         }
1432
1433         /* The rest of this function executes only for OST_READs */
1434
1435         /* if unwrap_bulk failed, return -EAGAIN to retry */
1436         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1437         if (rc < 0)
1438                 GOTO(out, rc = -EAGAIN);
1439
1440         if (rc > aa->aa_requested_nob) {
1441                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1442                        aa->aa_requested_nob);
1443                 RETURN(-EPROTO);
1444         }
1445
1446         if (rc != req->rq_bulk->bd_nob_transferred) {
1447                 CERROR ("Unexpected rc %d (%d transferred)\n",
1448                         rc, req->rq_bulk->bd_nob_transferred);
1449                 return (-EPROTO);
1450         }
1451
1452         if (rc < aa->aa_requested_nob)
1453                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1454
1455         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1456                 static int cksum_counter;
1457                 u32        server_cksum = body->oa.o_cksum;
1458                 char      *via = "";
1459                 char      *router = "";
1460                 cksum_type_t cksum_type;
1461
1462                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1463                                                body->oa.o_flags : 0);
1464                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1465                                                  aa->aa_ppga, OST_READ,
1466                                                  cksum_type);
1467
1468                 if (peer->nid != req->rq_bulk->bd_sender) {
1469                         via = " via ";
1470                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1471                 }
1472
1473                 if (server_cksum != client_cksum) {
1474                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475                                            "%s%s%s inode "DFID" object "DOSTID
1476                                            " extent ["LPU64"-"LPU64"]\n",
1477                                            req->rq_import->imp_obd->obd_name,
1478                                            libcfs_nid2str(peer->nid),
1479                                            via, router,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_seq : (__u64)0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_oid : 0,
1484                                            body->oa.o_valid & OBD_MD_FLFID ?
1485                                                 body->oa.o_parent_ver : 0,
1486                                            POSTID(&body->oa.o_oi),
1487                                            aa->aa_ppga[0]->off,
1488                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1489                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1490                                                                         1);
1491                         CERROR("client %x, server %x, cksum_type %x\n",
1492                                client_cksum, server_cksum, cksum_type);
1493                         cksum_counter = 0;
1494                         aa->aa_oa->o_cksum = client_cksum;
1495                         rc = -EAGAIN;
1496                 } else {
1497                         cksum_counter++;
1498                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1499                         rc = 0;
1500                 }
1501         } else if (unlikely(client_cksum)) {
1502                 static int cksum_missed;
1503
1504                 cksum_missed++;
1505                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506                         CERROR("Checksum %u requested from %s but not sent\n",
1507                                cksum_missed, libcfs_nid2str(peer->nid));
1508         } else {
1509                 rc = 0;
1510         }
1511 out:
1512         if (rc >= 0)
1513                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514                                      aa->aa_oa, &body->oa);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520                                 struct osc_brw_async_args *aa, int rc)
1521 {
1522         struct ptlrpc_request *new_req;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         ENTRY;
1526
1527         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528                   "redo for recoverable error %d", rc);
1529
1530         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1532                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1533                                   aa->aa_ppga, &new_req, 1);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538                 if (oap->oap_request != NULL) {
1539                         LASSERTF(request == oap->oap_request,
1540                                  "request %p != oap_request %p\n",
1541                                  request, oap->oap_request);
1542                         if (oap->oap_interrupted) {
1543                                 ptlrpc_req_finished(new_req);
1544                                 RETURN(-EINTR);
1545                         }
1546                 }
1547         }
1548         /* New request takes over pga and oaps from old request.
1549          * Note that copying a list_head doesn't work, need to move it... */
1550         aa->aa_resends++;
1551         new_req->rq_interpret_reply = request->rq_interpret_reply;
1552         new_req->rq_async_args = request->rq_async_args;
1553         new_req->rq_commit_cb = request->rq_commit_cb;
1554         /* cap resend delay to the current request timeout, this is similar to
1555          * what ptlrpc does (see after_reply()) */
1556         if (aa->aa_resends > new_req->rq_timeout)
1557                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558         else
1559                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560         new_req->rq_generation_set = 1;
1561         new_req->rq_import_generation = request->rq_import_generation;
1562
1563         new_aa = ptlrpc_req_async_args(new_req);
1564
1565         INIT_LIST_HEAD(&new_aa->aa_oaps);
1566         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567         INIT_LIST_HEAD(&new_aa->aa_exts);
1568         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569         new_aa->aa_resends = aa->aa_resends;
1570
1571         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572                 if (oap->oap_request) {
1573                         ptlrpc_req_finished(oap->oap_request);
1574                         oap->oap_request = ptlrpc_request_addref(new_req);
1575                 }
1576         }
1577
1578         /* XXX: This code will run into problem if we're going to support
1579          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1580          * and wait for all of them to be finished. We should inherit request
1581          * set from old request. */
1582         ptlrpcd_add_req(new_req);
1583
1584         DEBUG_REQ(D_INFO, new_req, "new request");
1585         RETURN(0);
1586 }
1587
1588 /*
1589  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1590  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1591  * fine for our small page arrays and doesn't require allocation.  its an
1592  * insertion sort that swaps elements that are strides apart, shrinking the
1593  * stride down until its '1' and the array is sorted.
1594  */
1595 static void sort_brw_pages(struct brw_page **array, int num)
1596 {
1597         int stride, i, j;
1598         struct brw_page *tmp;
1599
1600         if (num == 1)
1601                 return;
1602         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1603                 ;
1604
1605         do {
1606                 stride /= 3;
1607                 for (i = stride ; i < num ; i++) {
1608                         tmp = array[i];
1609                         j = i;
1610                         while (j >= stride && array[j - stride]->off > tmp->off) {
1611                                 array[j] = array[j - stride];
1612                                 j -= stride;
1613                         }
1614                         array[j] = tmp;
1615                 }
1616         } while (stride > 1);
1617 }
1618
1619 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1620 {
1621         LASSERT(ppga != NULL);
1622         OBD_FREE(ppga, sizeof(*ppga) * count);
1623 }
1624
1625 static int brw_interpret(const struct lu_env *env,
1626                          struct ptlrpc_request *req, void *data, int rc)
1627 {
1628         struct osc_brw_async_args *aa = data;
1629         struct osc_extent *ext;
1630         struct osc_extent *tmp;
1631         struct client_obd *cli = aa->aa_cli;
1632         ENTRY;
1633
1634         rc = osc_brw_fini_request(req, rc);
1635         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1636         /* When server return -EINPROGRESS, client should always retry
1637          * regardless of the number of times the bulk was resent already. */
1638         if (osc_recoverable_error(rc)) {
1639                 if (req->rq_import_generation !=
1640                     req->rq_import->imp_generation) {
1641                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1642                                ""DOSTID", rc = %d.\n",
1643                                req->rq_import->imp_obd->obd_name,
1644                                POSTID(&aa->aa_oa->o_oi), rc);
1645                 } else if (rc == -EINPROGRESS ||
1646                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1647                         rc = osc_brw_redo_request(req, aa, rc);
1648                 } else {
1649                         CERROR("%s: too many resent retries for object: "
1650                                ""LPU64":"LPU64", rc = %d.\n",
1651                                req->rq_import->imp_obd->obd_name,
1652                                POSTID(&aa->aa_oa->o_oi), rc);
1653                 }
1654
1655                 if (rc == 0)
1656                         RETURN(0);
1657                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1658                         rc = -EIO;
1659         }
1660
1661         if (rc == 0) {
1662                 struct obdo *oa = aa->aa_oa;
1663                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1664                 unsigned long valid = 0;
1665                 struct cl_object *obj;
1666                 struct osc_async_page *last;
1667
1668                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1669                 obj = osc2cl(last->oap_obj);
1670
1671                 cl_object_attr_lock(obj);
1672                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1673                         attr->cat_blocks = oa->o_blocks;
1674                         valid |= CAT_BLOCKS;
1675                 }
1676                 if (oa->o_valid & OBD_MD_FLMTIME) {
1677                         attr->cat_mtime = oa->o_mtime;
1678                         valid |= CAT_MTIME;
1679                 }
1680                 if (oa->o_valid & OBD_MD_FLATIME) {
1681                         attr->cat_atime = oa->o_atime;
1682                         valid |= CAT_ATIME;
1683                 }
1684                 if (oa->o_valid & OBD_MD_FLCTIME) {
1685                         attr->cat_ctime = oa->o_ctime;
1686                         valid |= CAT_CTIME;
1687                 }
1688
1689                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1690                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1691                         loff_t last_off = last->oap_count + last->oap_obj_off +
1692                                 last->oap_page_off;
1693
1694                         /* Change file size if this is an out of quota or
1695                          * direct IO write and it extends the file size */
1696                         if (loi->loi_lvb.lvb_size < last_off) {
1697                                 attr->cat_size = last_off;
1698                                 valid |= CAT_SIZE;
1699                         }
1700                         /* Extend KMS if it's not a lockless write */
1701                         if (loi->loi_kms < last_off &&
1702                             oap2osc_page(last)->ops_srvlock == 0) {
1703                                 attr->cat_kms = last_off;
1704                                 valid |= CAT_KMS;
1705                         }
1706                 }
1707
1708                 if (valid != 0)
1709                         cl_object_attr_update(env, obj, attr, valid);
1710                 cl_object_attr_unlock(obj);
1711         }
1712         OBDO_FREE(aa->aa_oa);
1713
1714         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1715                 osc_inc_unstable_pages(req);
1716
1717         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1718                 list_del_init(&ext->oe_link);
1719                 osc_extent_finish(env, ext, 1, rc);
1720         }
1721         LASSERT(list_empty(&aa->aa_exts));
1722         LASSERT(list_empty(&aa->aa_oaps));
1723
1724         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1725         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1726
1727         spin_lock(&cli->cl_loi_list_lock);
1728         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1729          * is called so we know whether to go to sync BRWs or wait for more
1730          * RPCs to complete */
1731         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1732                 cli->cl_w_in_flight--;
1733         else
1734                 cli->cl_r_in_flight--;
1735         osc_wake_cache_waiters(cli);
1736         spin_unlock(&cli->cl_loi_list_lock);
1737
1738         osc_io_unplug(env, cli, NULL);
1739         RETURN(rc);
1740 }
1741
1742 static void brw_commit(struct ptlrpc_request *req)
1743 {
1744         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1745          * this called via the rq_commit_cb, I need to ensure
1746          * osc_dec_unstable_pages is still called. Otherwise unstable
1747          * pages may be leaked. */
1748         spin_lock(&req->rq_lock);
1749         if (likely(req->rq_unstable)) {
1750                 req->rq_unstable = 0;
1751                 spin_unlock(&req->rq_lock);
1752
1753                 osc_dec_unstable_pages(req);
1754         } else {
1755                 req->rq_committed = 1;
1756                 spin_unlock(&req->rq_lock);
1757         }
1758 }
1759
1760 /**
1761  * Build an RPC by the list of extent @ext_list. The caller must ensure
1762  * that the total pages in this list are NOT over max pages per RPC.
1763  * Extents in the list must be in OES_RPC state.
1764  */
1765 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1766                   struct list_head *ext_list, int cmd)
1767 {
1768         struct ptlrpc_request           *req = NULL;
1769         struct osc_extent               *ext;
1770         struct brw_page                 **pga = NULL;
1771         struct osc_brw_async_args       *aa = NULL;
1772         struct obdo                     *oa = NULL;
1773         struct osc_async_page           *oap;
1774         struct osc_object               *obj = NULL;
1775         struct cl_req_attr              *crattr = NULL;
1776         loff_t                          starting_offset = OBD_OBJECT_EOF;
1777         loff_t                          ending_offset = 0;
1778         int                             mpflag = 0;
1779         int                             mem_tight = 0;
1780         int                             page_count = 0;
1781         bool                            soft_sync = false;
1782         bool                            interrupted = false;
1783         int                             i;
1784         int                             grant = 0;
1785         int                             rc;
1786         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1787         struct ost_body                 *body;
1788         ENTRY;
1789         LASSERT(!list_empty(ext_list));
1790
1791         /* add pages into rpc_list to build BRW rpc */
1792         list_for_each_entry(ext, ext_list, oe_link) {
1793                 LASSERT(ext->oe_state == OES_RPC);
1794                 mem_tight |= ext->oe_memalloc;
1795                 grant += ext->oe_grants;
1796                 page_count += ext->oe_nr_pages;
1797                 if (obj == NULL)
1798                         obj = ext->oe_obj;
1799         }
1800
1801         soft_sync = osc_over_unstable_soft_limit(cli);
1802         if (mem_tight)
1803                 mpflag = cfs_memory_pressure_get_and_set();
1804
1805         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1806         if (pga == NULL)
1807                 GOTO(out, rc = -ENOMEM);
1808
1809         OBDO_ALLOC(oa);
1810         if (oa == NULL)
1811                 GOTO(out, rc = -ENOMEM);
1812
1813         i = 0;
1814         list_for_each_entry(ext, ext_list, oe_link) {
1815                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1816                         if (mem_tight)
1817                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1818                         if (soft_sync)
1819                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1820                         pga[i] = &oap->oap_brw_page;
1821                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1822                         i++;
1823
1824                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1825                         if (starting_offset == OBD_OBJECT_EOF ||
1826                             starting_offset > oap->oap_obj_off)
1827                                 starting_offset = oap->oap_obj_off;
1828                         else
1829                                 LASSERT(oap->oap_page_off == 0);
1830                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1831                                 ending_offset = oap->oap_obj_off +
1832                                                 oap->oap_count;
1833                         else
1834                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1835                                         PAGE_CACHE_SIZE);
1836                         if (oap->oap_interrupted)
1837                                 interrupted = true;
1838                 }
1839         }
1840
1841         /* first page in the list */
1842         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1843
1844         crattr = &osc_env_info(env)->oti_req_attr;
1845         memset(crattr, 0, sizeof(*crattr));
1846         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1847         crattr->cra_flags = ~0ULL;
1848         crattr->cra_page = oap2cl_page(oap);
1849         crattr->cra_oa = oa;
1850         cl_req_attr_set(env, osc2cl(obj), crattr);
1851
1852         if (cmd == OBD_BRW_WRITE)
1853                 oa->o_grant_used = grant;
1854
1855         sort_brw_pages(pga, page_count);
1856         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1857         if (rc != 0) {
1858                 CERROR("prep_req failed: %d\n", rc);
1859                 GOTO(out, rc);
1860         }
1861
1862         req->rq_commit_cb = brw_commit;
1863         req->rq_interpret_reply = brw_interpret;
1864         req->rq_memalloc = mem_tight != 0;
1865         oap->oap_request = ptlrpc_request_addref(req);
1866         if (interrupted && !req->rq_intr)
1867                 ptlrpc_mark_interrupted(req);
1868
1869         /* Need to update the timestamps after the request is built in case
1870          * we race with setattr (locally or in queue at OST).  If OST gets
1871          * later setattr before earlier BRW (as determined by the request xid),
1872          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1873          * way to do this in a single call.  bug 10150 */
1874         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1875         crattr->cra_oa = &body->oa;
1876         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1877         cl_req_attr_set(env, osc2cl(obj), crattr);
1878         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1879
1880         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1881         aa = ptlrpc_req_async_args(req);
1882         INIT_LIST_HEAD(&aa->aa_oaps);
1883         list_splice_init(&rpc_list, &aa->aa_oaps);
1884         INIT_LIST_HEAD(&aa->aa_exts);
1885         list_splice_init(ext_list, &aa->aa_exts);
1886
1887         spin_lock(&cli->cl_loi_list_lock);
1888         starting_offset >>= PAGE_CACHE_SHIFT;
1889         if (cmd == OBD_BRW_READ) {
1890                 cli->cl_r_in_flight++;
1891                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1892                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1893                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1894                                       starting_offset + 1);
1895         } else {
1896                 cli->cl_w_in_flight++;
1897                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1898                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1899                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1900                                       starting_offset + 1);
1901         }
1902         spin_unlock(&cli->cl_loi_list_lock);
1903
1904         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1905                   page_count, aa, cli->cl_r_in_flight,
1906                   cli->cl_w_in_flight);
1907
1908         ptlrpcd_add_req(req);
1909         rc = 0;
1910         EXIT;
1911
1912 out:
1913         if (mem_tight != 0)
1914                 cfs_memory_pressure_restore(mpflag);
1915
1916         if (rc != 0) {
1917                 LASSERT(req == NULL);
1918
1919                 if (oa)
1920                         OBDO_FREE(oa);
1921                 if (pga)
1922                         OBD_FREE(pga, sizeof(*pga) * page_count);
1923                 /* this should happen rarely and is pretty bad, it makes the
1924                  * pending list not follow the dirty order */
1925                 while (!list_empty(ext_list)) {
1926                         ext = list_entry(ext_list->next, struct osc_extent,
1927                                          oe_link);
1928                         list_del_init(&ext->oe_link);
1929                         osc_extent_finish(env, ext, 0, rc);
1930                 }
1931         }
1932         RETURN(rc);
1933 }
1934
1935 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1936 {
1937         int set = 0;
1938
1939         LASSERT(lock != NULL);
1940
1941         lock_res_and_lock(lock);
1942
1943         if (lock->l_ast_data == NULL)
1944                 lock->l_ast_data = data;
1945         if (lock->l_ast_data == data)
1946                 set = 1;
1947
1948         unlock_res_and_lock(lock);
1949
1950         return set;
1951 }
1952
1953 static int osc_enqueue_fini(struct ptlrpc_request *req,
1954                             osc_enqueue_upcall_f upcall, void *cookie,
1955                             struct lustre_handle *lockh, enum ldlm_mode mode,
1956                             __u64 *flags, int agl, int errcode)
1957 {
1958         bool intent = *flags & LDLM_FL_HAS_INTENT;
1959         int rc;
1960         ENTRY;
1961
1962         /* The request was created before ldlm_cli_enqueue call. */
1963         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1964                 struct ldlm_reply *rep;
1965
1966                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1967                 LASSERT(rep != NULL);
1968
1969                 rep->lock_policy_res1 =
1970                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1971                 if (rep->lock_policy_res1)
1972                         errcode = rep->lock_policy_res1;
1973                 if (!agl)
1974                         *flags |= LDLM_FL_LVB_READY;
1975         } else if (errcode == ELDLM_OK) {
1976                 *flags |= LDLM_FL_LVB_READY;
1977         }
1978
1979         /* Call the update callback. */
1980         rc = (*upcall)(cookie, lockh, errcode);
1981
1982         /* release the reference taken in ldlm_cli_enqueue() */
1983         if (errcode == ELDLM_LOCK_MATCHED)
1984                 errcode = ELDLM_OK;
1985         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1986                 ldlm_lock_decref(lockh, mode);
1987
1988         RETURN(rc);
1989 }
1990
1991 static int osc_enqueue_interpret(const struct lu_env *env,
1992                                  struct ptlrpc_request *req,
1993                                  struct osc_enqueue_args *aa, int rc)
1994 {
1995         struct ldlm_lock *lock;
1996         struct lustre_handle *lockh = &aa->oa_lockh;
1997         enum ldlm_mode mode = aa->oa_mode;
1998         struct ost_lvb *lvb = aa->oa_lvb;
1999         __u32 lvb_len = sizeof(*lvb);
2000         __u64 flags = 0;
2001
2002         ENTRY;
2003
2004         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2005          * be valid. */
2006         lock = ldlm_handle2lock(lockh);
2007         LASSERTF(lock != NULL,
2008                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2009                  lockh->cookie, req, aa);
2010
2011         /* Take an additional reference so that a blocking AST that
2012          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2013          * to arrive after an upcall has been executed by
2014          * osc_enqueue_fini(). */
2015         ldlm_lock_addref(lockh, mode);
2016
2017         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2018         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2019
2020         /* Let CP AST to grant the lock first. */
2021         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2022
2023         if (aa->oa_agl) {
2024                 LASSERT(aa->oa_lvb == NULL);
2025                 LASSERT(aa->oa_flags == NULL);
2026                 aa->oa_flags = &flags;
2027         }
2028
2029         /* Complete obtaining the lock procedure. */
2030         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2031                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2032                                    lockh, rc);
2033         /* Complete osc stuff. */
2034         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2035                               aa->oa_flags, aa->oa_agl, rc);
2036
2037         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2038
2039         ldlm_lock_decref(lockh, mode);
2040         LDLM_LOCK_PUT(lock);
2041         RETURN(rc);
2042 }
2043
2044 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2045
2046 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2047  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2048  * other synchronous requests, however keeping some locks and trying to obtain
2049  * others may take a considerable amount of time in a case of ost failure; and
2050  * when other sync requests do not get released lock from a client, the client
2051  * is evicted from the cluster -- such scenarious make the life difficult, so
2052  * release locks just after they are obtained. */
2053 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2054                      __u64 *flags, union ldlm_policy_data *policy,
2055                      struct ost_lvb *lvb, int kms_valid,
2056                      osc_enqueue_upcall_f upcall, void *cookie,
2057                      struct ldlm_enqueue_info *einfo,
2058                      struct ptlrpc_request_set *rqset, int async, int agl)
2059 {
2060         struct obd_device *obd = exp->exp_obd;
2061         struct lustre_handle lockh = { 0 };
2062         struct ptlrpc_request *req = NULL;
2063         int intent = *flags & LDLM_FL_HAS_INTENT;
2064         __u64 match_flags = *flags;
2065         enum ldlm_mode mode;
2066         int rc;
2067         ENTRY;
2068
2069         /* Filesystem lock extents are extended to page boundaries so that
2070          * dealing with the page cache is a little smoother.  */
2071         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2072         policy->l_extent.end |= ~PAGE_MASK;
2073
2074         /*
2075          * kms is not valid when either object is completely fresh (so that no
2076          * locks are cached), or object was evicted. In the latter case cached
2077          * lock cannot be used, because it would prime inode state with
2078          * potentially stale LVB.
2079          */
2080         if (!kms_valid)
2081                 goto no_match;
2082
2083         /* Next, search for already existing extent locks that will cover us */
2084         /* If we're trying to read, we also search for an existing PW lock.  The
2085          * VFS and page cache already protect us locally, so lots of readers/
2086          * writers can share a single PW lock.
2087          *
2088          * There are problems with conversion deadlocks, so instead of
2089          * converting a read lock to a write lock, we'll just enqueue a new
2090          * one.
2091          *
2092          * At some point we should cancel the read lock instead of making them
2093          * send us a blocking callback, but there are problems with canceling
2094          * locks out from other users right now, too. */
2095         mode = einfo->ei_mode;
2096         if (einfo->ei_mode == LCK_PR)
2097                 mode |= LCK_PW;
2098         if (agl == 0)
2099                 match_flags |= LDLM_FL_LVB_READY;
2100         if (intent != 0)
2101                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2102         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2103                                einfo->ei_type, policy, mode, &lockh, 0);
2104         if (mode) {
2105                 struct ldlm_lock *matched;
2106
2107                 if (*flags & LDLM_FL_TEST_LOCK)
2108                         RETURN(ELDLM_OK);
2109
2110                 matched = ldlm_handle2lock(&lockh);
2111                 if (agl) {
2112                         /* AGL enqueues DLM locks speculatively. Therefore if
2113                          * it already exists a DLM lock, it wll just inform the
2114                          * caller to cancel the AGL process for this stripe. */
2115                         ldlm_lock_decref(&lockh, mode);
2116                         LDLM_LOCK_PUT(matched);
2117                         RETURN(-ECANCELED);
2118                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2119                         *flags |= LDLM_FL_LVB_READY;
2120
2121                         /* We already have a lock, and it's referenced. */
2122                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2123
2124                         ldlm_lock_decref(&lockh, mode);
2125                         LDLM_LOCK_PUT(matched);
2126                         RETURN(ELDLM_OK);
2127                 } else {
2128                         ldlm_lock_decref(&lockh, mode);
2129                         LDLM_LOCK_PUT(matched);
2130                 }
2131         }
2132
2133 no_match:
2134         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2135                 RETURN(-ENOLCK);
2136
2137         if (intent) {
2138                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2139                                            &RQF_LDLM_ENQUEUE_LVB);
2140                 if (req == NULL)
2141                         RETURN(-ENOMEM);
2142
2143                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2144                 if (rc) {
2145                         ptlrpc_request_free(req);
2146                         RETURN(rc);
2147                 }
2148
2149                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2150                                      sizeof *lvb);
2151                 ptlrpc_request_set_replen(req);
2152         }
2153
2154         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2155         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2156
2157         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2158                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2159         if (async) {
2160                 if (!rc) {
2161                         struct osc_enqueue_args *aa;
2162                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2163                         aa = ptlrpc_req_async_args(req);
2164                         aa->oa_exp    = exp;
2165                         aa->oa_mode   = einfo->ei_mode;
2166                         aa->oa_type   = einfo->ei_type;
2167                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2168                         aa->oa_upcall = upcall;
2169                         aa->oa_cookie = cookie;
2170                         aa->oa_agl    = !!agl;
2171                         if (!agl) {
2172                                 aa->oa_flags  = flags;
2173                                 aa->oa_lvb    = lvb;
2174                         } else {
2175                                 /* AGL is essentially to enqueue an DLM lock
2176                                  * in advance, so we don't care about the
2177                                  * result of AGL enqueue. */
2178                                 aa->oa_lvb    = NULL;
2179                                 aa->oa_flags  = NULL;
2180                         }
2181
2182                         req->rq_interpret_reply =
2183                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2184                         if (rqset == PTLRPCD_SET)
2185                                 ptlrpcd_add_req(req);
2186                         else
2187                                 ptlrpc_set_add_req(rqset, req);
2188                 } else if (intent) {
2189                         ptlrpc_req_finished(req);
2190                 }
2191                 RETURN(rc);
2192         }
2193
2194         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2195                               flags, agl, rc);
2196         if (intent)
2197                 ptlrpc_req_finished(req);
2198
2199         RETURN(rc);
2200 }
2201
2202 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2203                    enum ldlm_type type, union ldlm_policy_data *policy,
2204                    enum ldlm_mode mode, __u64 *flags, void *data,
2205                    struct lustre_handle *lockh, int unref)
2206 {
2207         struct obd_device *obd = exp->exp_obd;
2208         __u64 lflags = *flags;
2209         enum ldlm_mode rc;
2210         ENTRY;
2211
2212         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2213                 RETURN(-EIO);
2214
2215         /* Filesystem lock extents are extended to page boundaries so that
2216          * dealing with the page cache is a little smoother */
2217         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2218         policy->l_extent.end |= ~PAGE_MASK;
2219
2220         /* Next, search for already existing extent locks that will cover us */
2221         /* If we're trying to read, we also search for an existing PW lock.  The
2222          * VFS and page cache already protect us locally, so lots of readers/
2223          * writers can share a single PW lock. */
2224         rc = mode;
2225         if (mode == LCK_PR)
2226                 rc |= LCK_PW;
2227         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2228                              res_id, type, policy, rc, lockh, unref);
2229         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2230                 RETURN(rc);
2231
2232         if (data != NULL) {
2233                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2234
2235                 LASSERT(lock != NULL);
2236                 if (!osc_set_lock_data(lock, data)) {
2237                         ldlm_lock_decref(lockh, rc);
2238                         rc = 0;
2239                 }
2240                 LDLM_LOCK_PUT(lock);
2241         }
2242         RETURN(rc);
2243 }
2244
2245 static int osc_statfs_interpret(const struct lu_env *env,
2246                                 struct ptlrpc_request *req,
2247                                 struct osc_async_args *aa, int rc)
2248 {
2249         struct obd_statfs *msfs;
2250         ENTRY;
2251
2252         if (rc == -EBADR)
2253                 /* The request has in fact never been sent
2254                  * due to issues at a higher level (LOV).
2255                  * Exit immediately since the caller is
2256                  * aware of the problem and takes care
2257                  * of the clean up */
2258                  RETURN(rc);
2259
2260         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2261             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2262                 GOTO(out, rc = 0);
2263
2264         if (rc != 0)
2265                 GOTO(out, rc);
2266
2267         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2268         if (msfs == NULL) {
2269                 GOTO(out, rc = -EPROTO);
2270         }
2271
2272         *aa->aa_oi->oi_osfs = *msfs;
2273 out:
2274         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2275         RETURN(rc);
2276 }
2277
2278 static int osc_statfs_async(struct obd_export *exp,
2279                             struct obd_info *oinfo, __u64 max_age,
2280                             struct ptlrpc_request_set *rqset)
2281 {
2282         struct obd_device     *obd = class_exp2obd(exp);
2283         struct ptlrpc_request *req;
2284         struct osc_async_args *aa;
2285         int                    rc;
2286         ENTRY;
2287
2288         /* We could possibly pass max_age in the request (as an absolute
2289          * timestamp or a "seconds.usec ago") so the target can avoid doing
2290          * extra calls into the filesystem if that isn't necessary (e.g.
2291          * during mount that would help a bit).  Having relative timestamps
2292          * is not so great if request processing is slow, while absolute
2293          * timestamps are not ideal because they need time synchronization. */
2294         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2295         if (req == NULL)
2296                 RETURN(-ENOMEM);
2297
2298         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2299         if (rc) {
2300                 ptlrpc_request_free(req);
2301                 RETURN(rc);
2302         }
2303         ptlrpc_request_set_replen(req);
2304         req->rq_request_portal = OST_CREATE_PORTAL;
2305         ptlrpc_at_set_req_timeout(req);
2306
2307         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2308                 /* procfs requests not want stat in wait for avoid deadlock */
2309                 req->rq_no_resend = 1;
2310                 req->rq_no_delay = 1;
2311         }
2312
2313         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2314         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2315         aa = ptlrpc_req_async_args(req);
2316         aa->aa_oi = oinfo;
2317
2318         ptlrpc_set_add_req(rqset, req);
2319         RETURN(0);
2320 }
2321
2322 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2323                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2324 {
2325         struct obd_device     *obd = class_exp2obd(exp);
2326         struct obd_statfs     *msfs;
2327         struct ptlrpc_request *req;
2328         struct obd_import     *imp = NULL;
2329         int rc;
2330         ENTRY;
2331
2332         /*Since the request might also come from lprocfs, so we need
2333          *sync this with client_disconnect_export Bug15684*/
2334         down_read(&obd->u.cli.cl_sem);
2335         if (obd->u.cli.cl_import)
2336                 imp = class_import_get(obd->u.cli.cl_import);
2337         up_read(&obd->u.cli.cl_sem);
2338         if (!imp)
2339                 RETURN(-ENODEV);
2340
2341         /* We could possibly pass max_age in the request (as an absolute
2342          * timestamp or a "seconds.usec ago") so the target can avoid doing
2343          * extra calls into the filesystem if that isn't necessary (e.g.
2344          * during mount that would help a bit).  Having relative timestamps
2345          * is not so great if request processing is slow, while absolute
2346          * timestamps are not ideal because they need time synchronization. */
2347         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2348
2349         class_import_put(imp);
2350
2351         if (req == NULL)
2352                 RETURN(-ENOMEM);
2353
2354         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2355         if (rc) {
2356                 ptlrpc_request_free(req);
2357                 RETURN(rc);
2358         }
2359         ptlrpc_request_set_replen(req);
2360         req->rq_request_portal = OST_CREATE_PORTAL;
2361         ptlrpc_at_set_req_timeout(req);
2362
2363         if (flags & OBD_STATFS_NODELAY) {
2364                 /* procfs requests not want stat in wait for avoid deadlock */
2365                 req->rq_no_resend = 1;
2366                 req->rq_no_delay = 1;
2367         }
2368
2369         rc = ptlrpc_queue_wait(req);
2370         if (rc)
2371                 GOTO(out, rc);
2372
2373         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2374         if (msfs == NULL) {
2375                 GOTO(out, rc = -EPROTO);
2376         }
2377
2378         *osfs = *msfs;
2379
2380         EXIT;
2381  out:
2382         ptlrpc_req_finished(req);
2383         return rc;
2384 }
2385
2386 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2387                          void *karg, void __user *uarg)
2388 {
2389         struct obd_device *obd = exp->exp_obd;
2390         struct obd_ioctl_data *data = karg;
2391         int err = 0;
2392         ENTRY;
2393
2394         if (!try_module_get(THIS_MODULE)) {
2395                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2396                        module_name(THIS_MODULE));
2397                 return -EINVAL;
2398         }
2399         switch (cmd) {
2400         case OBD_IOC_CLIENT_RECOVER:
2401                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2402                                             data->ioc_inlbuf1, 0);
2403                 if (err > 0)
2404                         err = 0;
2405                 GOTO(out, err);
2406         case IOC_OSC_SET_ACTIVE:
2407                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2408                                                data->ioc_offset);
2409                 GOTO(out, err);
2410         case OBD_IOC_PING_TARGET:
2411                 err = ptlrpc_obd_ping(obd);
2412                 GOTO(out, err);
2413         default:
2414                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2415                        cmd, current_comm());
2416                 GOTO(out, err = -ENOTTY);
2417         }
2418 out:
2419         module_put(THIS_MODULE);
2420         return err;
2421 }
2422
2423 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2424                               u32 keylen, void *key,
2425                               u32 vallen, void *val,
2426                               struct ptlrpc_request_set *set)
2427 {
2428         struct ptlrpc_request *req;
2429         struct obd_device     *obd = exp->exp_obd;
2430         struct obd_import     *imp = class_exp2cliimp(exp);
2431         char                  *tmp;
2432         int                    rc;
2433         ENTRY;
2434
2435         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2436
2437         if (KEY_IS(KEY_CHECKSUM)) {
2438                 if (vallen != sizeof(int))
2439                         RETURN(-EINVAL);
2440                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2441                 RETURN(0);
2442         }
2443
2444         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2445                 sptlrpc_conf_client_adapt(obd);
2446                 RETURN(0);
2447         }
2448
2449         if (KEY_IS(KEY_FLUSH_CTX)) {
2450                 sptlrpc_import_flush_my_ctx(imp);
2451                 RETURN(0);
2452         }
2453
2454         if (KEY_IS(KEY_CACHE_SET)) {
2455                 struct client_obd *cli = &obd->u.cli;
2456
2457                 LASSERT(cli->cl_cache == NULL); /* only once */
2458                 cli->cl_cache = (struct cl_client_cache *)val;
2459                 cl_cache_incref(cli->cl_cache);
2460                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2461
2462                 /* add this osc into entity list */
2463                 LASSERT(list_empty(&cli->cl_lru_osc));
2464                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2465                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2466                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2467
2468                 RETURN(0);
2469         }
2470
2471         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2472                 struct client_obd *cli = &obd->u.cli;
2473                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2474                 long target = *(long *)val;
2475
2476                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2477                 *(long *)val -= nr;
2478                 RETURN(0);
2479         }
2480
2481         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2482                 RETURN(-EINVAL);
2483
2484         /* We pass all other commands directly to OST. Since nobody calls osc
2485            methods directly and everybody is supposed to go through LOV, we
2486            assume lov checked invalid values for us.
2487            The only recognised values so far are evict_by_nid and mds_conn.
2488            Even if something bad goes through, we'd get a -EINVAL from OST
2489            anyway. */
2490
2491         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2492                                                 &RQF_OST_SET_GRANT_INFO :
2493                                                 &RQF_OBD_SET_INFO);
2494         if (req == NULL)
2495                 RETURN(-ENOMEM);
2496
2497         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2498                              RCL_CLIENT, keylen);
2499         if (!KEY_IS(KEY_GRANT_SHRINK))
2500                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2501                                      RCL_CLIENT, vallen);
2502         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2503         if (rc) {
2504                 ptlrpc_request_free(req);
2505                 RETURN(rc);
2506         }
2507
2508         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2509         memcpy(tmp, key, keylen);
2510         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2511                                                         &RMF_OST_BODY :
2512                                                         &RMF_SETINFO_VAL);
2513         memcpy(tmp, val, vallen);
2514
2515         if (KEY_IS(KEY_GRANT_SHRINK)) {
2516                 struct osc_grant_args *aa;
2517                 struct obdo *oa;
2518
2519                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2520                 aa = ptlrpc_req_async_args(req);
2521                 OBDO_ALLOC(oa);
2522                 if (!oa) {
2523                         ptlrpc_req_finished(req);
2524                         RETURN(-ENOMEM);
2525                 }
2526                 *oa = ((struct ost_body *)val)->oa;
2527                 aa->aa_oa = oa;
2528                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2529         }
2530
2531         ptlrpc_request_set_replen(req);
2532         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2533                 LASSERT(set != NULL);
2534                 ptlrpc_set_add_req(set, req);
2535                 ptlrpc_check_set(NULL, set);
2536         } else {
2537                 ptlrpcd_add_req(req);
2538         }
2539
2540         RETURN(0);
2541 }
2542
2543 static int osc_reconnect(const struct lu_env *env,
2544                          struct obd_export *exp, struct obd_device *obd,
2545                          struct obd_uuid *cluuid,
2546                          struct obd_connect_data *data,
2547                          void *localdata)
2548 {
2549         struct client_obd *cli = &obd->u.cli;
2550
2551         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2552                 long lost_grant;
2553                 long grant;
2554
2555                 spin_lock(&cli->cl_loi_list_lock);
2556                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2557                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2558                         grant += cli->cl_dirty_grant;
2559                 else
2560                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2561                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2562                 lost_grant = cli->cl_lost_grant;
2563                 cli->cl_lost_grant = 0;
2564                 spin_unlock(&cli->cl_loi_list_lock);
2565
2566                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2567                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2568                        data->ocd_version, data->ocd_grant, lost_grant);
2569         }
2570
2571         RETURN(0);
2572 }
2573
2574 static int osc_disconnect(struct obd_export *exp)
2575 {
2576         struct obd_device *obd = class_exp2obd(exp);
2577         int rc;
2578
2579         rc = client_disconnect_export(exp);
2580         /**
2581          * Initially we put del_shrink_grant before disconnect_export, but it
2582          * causes the following problem if setup (connect) and cleanup
2583          * (disconnect) are tangled together.
2584          *      connect p1                     disconnect p2
2585          *   ptlrpc_connect_import
2586          *     ...............               class_manual_cleanup
2587          *                                     osc_disconnect
2588          *                                     del_shrink_grant
2589          *   ptlrpc_connect_interrupt
2590          *     init_grant_shrink
2591          *   add this client to shrink list
2592          *                                      cleanup_osc
2593          * Bang! pinger trigger the shrink.
2594          * So the osc should be disconnected from the shrink list, after we
2595          * are sure the import has been destroyed. BUG18662
2596          */
2597         if (obd->u.cli.cl_import == NULL)
2598                 osc_del_shrink_grant(&obd->u.cli);
2599         return rc;
2600 }
2601
2602 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2603         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2604 {
2605         struct lu_env *env = arg;
2606         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2607         struct ldlm_lock *lock;
2608         struct osc_object *osc = NULL;
2609         ENTRY;
2610
2611         lock_res(res);
2612         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2613                 if (lock->l_ast_data != NULL && osc == NULL) {
2614                         osc = lock->l_ast_data;
2615                         cl_object_get(osc2cl(osc));
2616                 }
2617
2618                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2619                  * by the 2nd round of ldlm_namespace_clean() call in
2620                  * osc_import_event(). */
2621                 ldlm_clear_cleaned(lock);
2622         }
2623         unlock_res(res);
2624
2625         if (osc != NULL) {
2626                 osc_object_invalidate(env, osc);
2627                 cl_object_put(env, osc2cl(osc));
2628         }
2629
2630         RETURN(0);
2631 }
2632
2633 static int osc_import_event(struct obd_device *obd,
2634                             struct obd_import *imp,
2635                             enum obd_import_event event)
2636 {
2637         struct client_obd *cli;
2638         int rc = 0;
2639
2640         ENTRY;
2641         LASSERT(imp->imp_obd == obd);
2642
2643         switch (event) {
2644         case IMP_EVENT_DISCON: {
2645                 cli = &obd->u.cli;
2646                 spin_lock(&cli->cl_loi_list_lock);
2647                 cli->cl_avail_grant = 0;
2648                 cli->cl_lost_grant = 0;
2649                 spin_unlock(&cli->cl_loi_list_lock);
2650                 break;
2651         }
2652         case IMP_EVENT_INACTIVE: {
2653                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2654                 break;
2655         }
2656         case IMP_EVENT_INVALIDATE: {
2657                 struct ldlm_namespace *ns = obd->obd_namespace;
2658                 struct lu_env         *env;
2659                 __u16                  refcheck;
2660
2661                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2662
2663                 env = cl_env_get(&refcheck);
2664                 if (!IS_ERR(env)) {
2665                         osc_io_unplug(env, &obd->u.cli, NULL);
2666
2667                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2668                                                  osc_ldlm_resource_invalidate,
2669                                                  env, 0);
2670                         cl_env_put(env, &refcheck);
2671
2672                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2673                 } else
2674                         rc = PTR_ERR(env);
2675                 break;
2676         }
2677         case IMP_EVENT_ACTIVE: {
2678                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2679                 break;
2680         }
2681         case IMP_EVENT_OCD: {
2682                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2683
2684                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2685                         osc_init_grant(&obd->u.cli, ocd);
2686
2687                 /* See bug 7198 */
2688                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2689                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2690
2691                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2692                 break;
2693         }
2694         case IMP_EVENT_DEACTIVATE: {
2695                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2696                 break;
2697         }
2698         case IMP_EVENT_ACTIVATE: {
2699                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2700                 break;
2701         }
2702         default:
2703                 CERROR("Unknown import event %d\n", event);
2704                 LBUG();
2705         }
2706         RETURN(rc);
2707 }
2708
2709 /**
2710  * Determine whether the lock can be canceled before replaying the lock
2711  * during recovery, see bug16774 for detailed information.
2712  *
2713  * \retval zero the lock can't be canceled
2714  * \retval other ok to cancel
2715  */
2716 static int osc_cancel_weight(struct ldlm_lock *lock)
2717 {
2718         /*
2719          * Cancel all unused and granted extent lock.
2720          */
2721         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2722             lock->l_granted_mode == lock->l_req_mode &&
2723             osc_ldlm_weigh_ast(lock) == 0)
2724                 RETURN(1);
2725
2726         RETURN(0);
2727 }
2728
2729 static int brw_queue_work(const struct lu_env *env, void *data)
2730 {
2731         struct client_obd *cli = data;
2732
2733         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2734
2735         osc_io_unplug(env, cli, NULL);
2736         RETURN(0);
2737 }
2738
2739 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2740 {
2741         struct client_obd *cli = &obd->u.cli;
2742         struct obd_type   *type;
2743         void              *handler;
2744         int                rc;
2745         int                adding;
2746         int                added;
2747         int                req_count;
2748         ENTRY;
2749
2750         rc = ptlrpcd_addref();
2751         if (rc)
2752                 RETURN(rc);
2753
2754         rc = client_obd_setup(obd, lcfg);
2755         if (rc)
2756                 GOTO(out_ptlrpcd, rc);
2757
2758         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2759         if (IS_ERR(handler))
2760                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2761         cli->cl_writeback_work = handler;
2762
2763         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2764         if (IS_ERR(handler))
2765                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2766         cli->cl_lru_work = handler;
2767
2768         rc = osc_quota_setup(obd);
2769         if (rc)
2770                 GOTO(out_ptlrpcd_work, rc);
2771
2772         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2773
2774 #ifdef CONFIG_PROC_FS
2775         obd->obd_vars = lprocfs_osc_obd_vars;
2776 #endif
2777         /* If this is true then both client (osc) and server (osp) are on the
2778          * same node. The osp layer if loaded first will register the osc proc
2779          * directory. In that case this obd_device will be attached its proc
2780          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2781         type = class_search_type(LUSTRE_OSP_NAME);
2782         if (type && type->typ_procsym) {
2783                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2784                                                        type->typ_procsym,
2785                                                        obd->obd_vars, obd);
2786                 if (IS_ERR(obd->obd_proc_entry)) {
2787                         rc = PTR_ERR(obd->obd_proc_entry);
2788                         CERROR("error %d setting up lprocfs for %s\n", rc,
2789                                obd->obd_name);
2790                         obd->obd_proc_entry = NULL;
2791                 }
2792         } else {
2793                 rc = lprocfs_obd_setup(obd);
2794         }
2795
2796         /* If the basic OSC proc tree construction succeeded then
2797          * lets do the rest. */
2798         if (rc == 0) {
2799                 lproc_osc_attach_seqstat(obd);
2800                 sptlrpc_lprocfs_cliobd_attach(obd);
2801                 ptlrpc_lprocfs_register_obd(obd);
2802         }
2803
2804         /*
2805          * We try to control the total number of requests with a upper limit
2806          * osc_reqpool_maxreqcount. There might be some race which will cause
2807          * over-limit allocation, but it is fine.
2808          */
2809         req_count = atomic_read(&osc_pool_req_count);
2810         if (req_count < osc_reqpool_maxreqcount) {
2811                 adding = cli->cl_max_rpcs_in_flight + 2;
2812                 if (req_count + adding > osc_reqpool_maxreqcount)
2813                         adding = osc_reqpool_maxreqcount - req_count;
2814
2815                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2816                 atomic_add(added, &osc_pool_req_count);
2817         }
2818
2819         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2820         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2821
2822         spin_lock(&osc_shrink_lock);
2823         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2824         spin_unlock(&osc_shrink_lock);
2825
2826         RETURN(0);
2827
2828 out_ptlrpcd_work:
2829         if (cli->cl_writeback_work != NULL) {
2830                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2831                 cli->cl_writeback_work = NULL;
2832         }
2833         if (cli->cl_lru_work != NULL) {
2834                 ptlrpcd_destroy_work(cli->cl_lru_work);
2835                 cli->cl_lru_work = NULL;
2836         }
2837 out_client_setup:
2838         client_obd_cleanup(obd);
2839 out_ptlrpcd:
2840         ptlrpcd_decref();
2841         RETURN(rc);
2842 }
2843
2844 static int osc_precleanup(struct obd_device *obd)
2845 {
2846         struct client_obd *cli = &obd->u.cli;
2847         ENTRY;
2848
2849         /* LU-464
2850          * for echo client, export may be on zombie list, wait for
2851          * zombie thread to cull it, because cli.cl_import will be
2852          * cleared in client_disconnect_export():
2853          *   class_export_destroy() -> obd_cleanup() ->
2854          *   echo_device_free() -> echo_client_cleanup() ->
2855          *   obd_disconnect() -> osc_disconnect() ->
2856          *   client_disconnect_export()
2857          */
2858         obd_zombie_barrier();
2859         if (cli->cl_writeback_work) {
2860                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2861                 cli->cl_writeback_work = NULL;
2862         }
2863
2864         if (cli->cl_lru_work) {
2865                 ptlrpcd_destroy_work(cli->cl_lru_work);
2866                 cli->cl_lru_work = NULL;
2867         }
2868
2869         obd_cleanup_client_import(obd);
2870         ptlrpc_lprocfs_unregister_obd(obd);
2871         lprocfs_obd_cleanup(obd);
2872         RETURN(0);
2873 }
2874
2875 int osc_cleanup(struct obd_device *obd)
2876 {
2877         struct client_obd *cli = &obd->u.cli;
2878         int rc;
2879
2880         ENTRY;
2881
2882         spin_lock(&osc_shrink_lock);
2883         list_del(&cli->cl_shrink_list);
2884         spin_unlock(&osc_shrink_lock);
2885
2886         /* lru cleanup */
2887         if (cli->cl_cache != NULL) {
2888                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2889                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2890                 list_del_init(&cli->cl_lru_osc);
2891                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2892                 cli->cl_lru_left = NULL;
2893                 cl_cache_decref(cli->cl_cache);
2894                 cli->cl_cache = NULL;
2895         }
2896
2897         /* free memory of osc quota cache */
2898         osc_quota_cleanup(obd);
2899
2900         rc = client_obd_cleanup(obd);
2901
2902         ptlrpcd_decref();
2903         RETURN(rc);
2904 }
2905
2906 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2907 {
2908         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2909         return rc > 0 ? 0: rc;
2910 }
2911
2912 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2913 {
2914         return osc_process_config_base(obd, buf);
2915 }
2916
2917 static struct obd_ops osc_obd_ops = {
2918         .o_owner                = THIS_MODULE,
2919         .o_setup                = osc_setup,
2920         .o_precleanup           = osc_precleanup,
2921         .o_cleanup              = osc_cleanup,
2922         .o_add_conn             = client_import_add_conn,
2923         .o_del_conn             = client_import_del_conn,
2924         .o_connect              = client_connect_import,
2925         .o_reconnect            = osc_reconnect,
2926         .o_disconnect           = osc_disconnect,
2927         .o_statfs               = osc_statfs,
2928         .o_statfs_async         = osc_statfs_async,
2929         .o_create               = osc_create,
2930         .o_destroy              = osc_destroy,
2931         .o_getattr              = osc_getattr,
2932         .o_setattr              = osc_setattr,
2933         .o_iocontrol            = osc_iocontrol,
2934         .o_set_info_async       = osc_set_info_async,
2935         .o_import_event         = osc_import_event,
2936         .o_process_config       = osc_process_config,
2937         .o_quotactl             = osc_quotactl,
2938 };
2939
2940 static struct shrinker *osc_cache_shrinker;
2941 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2942 DEFINE_SPINLOCK(osc_shrink_lock);
2943
2944 #ifndef HAVE_SHRINKER_COUNT
2945 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2946 {
2947         struct shrink_control scv = {
2948                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2949                 .gfp_mask   = shrink_param(sc, gfp_mask)
2950         };
2951 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2952         struct shrinker *shrinker = NULL;
2953 #endif
2954
2955         (void)osc_cache_shrink_scan(shrinker, &scv);
2956
2957         return osc_cache_shrink_count(shrinker, &scv);
2958 }
2959 #endif
2960
2961 static int __init osc_init(void)
2962 {
2963         bool enable_proc = true;
2964         struct obd_type *type;
2965         unsigned int reqpool_size;
2966         unsigned int reqsize;
2967         int rc;
2968         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2969                          osc_cache_shrink_count, osc_cache_shrink_scan);
2970         ENTRY;
2971
2972         /* print an address of _any_ initialized kernel symbol from this
2973          * module, to allow debugging with gdb that doesn't support data
2974          * symbols from modules.*/
2975         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2976
2977         rc = lu_kmem_init(osc_caches);
2978         if (rc)
2979                 RETURN(rc);
2980
2981         type = class_search_type(LUSTRE_OSP_NAME);
2982         if (type != NULL && type->typ_procsym != NULL)
2983                 enable_proc = false;
2984
2985         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2986                                  LUSTRE_OSC_NAME, &osc_device_type);
2987         if (rc)
2988                 GOTO(out_kmem, rc);
2989
2990         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2991
2992         /* This is obviously too much memory, only prevent overflow here */
2993         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2994                 GOTO(out_type, rc = -EINVAL);
2995
2996         reqpool_size = osc_reqpool_mem_max << 20;
2997
2998         reqsize = 1;
2999         while (reqsize < OST_IO_MAXREQSIZE)
3000                 reqsize = reqsize << 1;
3001
3002         /*
3003          * We don't enlarge the request count in OSC pool according to
3004          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3005          * tried after normal allocation failed. So a small OSC pool won't
3006          * cause much performance degression in most of cases.
3007          */
3008         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3009
3010         atomic_set(&osc_pool_req_count, 0);
3011         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3012                                           ptlrpc_add_rqs_to_pool);
3013
3014         if (osc_rq_pool != NULL)
3015                 GOTO(out, rc);
3016         rc = -ENOMEM;
3017 out_type:
3018         class_unregister_type(LUSTRE_OSC_NAME);
3019 out_kmem:
3020         lu_kmem_fini(osc_caches);
3021 out:
3022         RETURN(rc);
3023 }
3024
3025 static void __exit osc_exit(void)
3026 {
3027         remove_shrinker(osc_cache_shrinker);
3028         class_unregister_type(LUSTRE_OSC_NAME);
3029         lu_kmem_fini(osc_caches);
3030         ptlrpc_free_rq_pool(osc_rq_pool);
3031 }
3032
3033 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3034 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3035 MODULE_VERSION(LUSTRE_VERSION_STRING);
3036 MODULE_LICENSE("GPL");
3037
3038 module_init(osc_init);
3039 module_exit(osc_exit);