Whamcloud - gitweb
LU-8005 osc: set lock data for readahead lock
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_ladvise_args {
95         struct obdo             *la_oa;
96         obd_enqueue_update_f     la_upcall;
97         void                    *la_cookie;
98 };
99
100 struct osc_enqueue_args {
101         struct obd_export       *oa_exp;
102         enum ldlm_type          oa_type;
103         enum ldlm_mode          oa_mode;
104         __u64                   *oa_flags;
105         osc_enqueue_upcall_f    oa_upcall;
106         void                    *oa_cookie;
107         struct ost_lvb          *oa_lvb;
108         struct lustre_handle    oa_lockh;
109         unsigned int            oa_agl:1;
110 };
111
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
114                          void *data, int rc);
115
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
117 {
118         struct ost_body *body;
119
120         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
121         LASSERT(body);
122
123         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
124 }
125
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
127                        struct obdo *oa)
128 {
129         struct ptlrpc_request   *req;
130         struct ost_body         *body;
131         int                      rc;
132
133         ENTRY;
134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
135         if (req == NULL)
136                 RETURN(-ENOMEM);
137
138         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
139         if (rc) {
140                 ptlrpc_request_free(req);
141                 RETURN(rc);
142         }
143
144         osc_pack_req_body(req, oa);
145
146         ptlrpc_request_set_replen(req);
147
148         rc = ptlrpc_queue_wait(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
153         if (body == NULL)
154                 GOTO(out, rc = -EPROTO);
155
156         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
158
159         oa->o_blksize = cli_brw_size(exp->exp_obd);
160         oa->o_valid |= OBD_MD_FLBLKSZ;
161
162         EXIT;
163 out:
164         ptlrpc_req_finished(req);
165
166         return rc;
167 }
168
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
170                        struct obdo *oa)
171 {
172         struct ptlrpc_request   *req;
173         struct ost_body         *body;
174         int                      rc;
175
176         ENTRY;
177         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
178
179         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
180         if (req == NULL)
181                 RETURN(-ENOMEM);
182
183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
184         if (rc) {
185                 ptlrpc_request_free(req);
186                 RETURN(rc);
187         }
188
189         osc_pack_req_body(req, oa);
190
191         ptlrpc_request_set_replen(req);
192
193         rc = ptlrpc_queue_wait(req);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
198         if (body == NULL)
199                 GOTO(out, rc = -EPROTO);
200
201         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
202
203         EXIT;
204 out:
205         ptlrpc_req_finished(req);
206
207         RETURN(rc);
208 }
209
210 static int osc_setattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_setattr_args *sa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 GOTO(out, rc = -EPROTO);
223
224         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
225                              &body->oa);
226 out:
227         rc = sa->sa_upcall(sa->sa_cookie, rc);
228         RETURN(rc);
229 }
230
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232                       obd_enqueue_update_f upcall, void *cookie,
233                       struct ptlrpc_request_set *rqset)
234 {
235         struct ptlrpc_request   *req;
236         struct osc_setattr_args *sa;
237         int                      rc;
238
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oa);
252
253         ptlrpc_request_set_replen(req);
254
255         /* do mds to ost setattr asynchronously */
256         if (!rqset) {
257                 /* Do not wait for response. */
258                 ptlrpcd_add_req(req);
259         } else {
260                 req->rq_interpret_reply =
261                         (ptlrpc_interpterer_t)osc_setattr_interpret;
262
263                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264                 sa = ptlrpc_req_async_args(req);
265                 sa->sa_oa = oa;
266                 sa->sa_upcall = upcall;
267                 sa->sa_cookie = cookie;
268
269                 if (rqset == PTLRPCD_SET)
270                         ptlrpcd_add_req(req);
271                 else
272                         ptlrpc_set_add_req(rqset, req);
273         }
274
275         RETURN(0);
276 }
277
278 static int osc_ladvise_interpret(const struct lu_env *env,
279                                  struct ptlrpc_request *req,
280                                  void *arg, int rc)
281 {
282         struct osc_ladvise_args *la = arg;
283         struct ost_body *body;
284         ENTRY;
285
286         if (rc != 0)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         *la->la_oa = body->oa;
294 out:
295         rc = la->la_upcall(la->la_cookie, rc);
296         RETURN(rc);
297 }
298
299 /**
300  * If rqset is NULL, do not wait for response. Upcall and cookie could also
301  * be NULL in this case
302  */
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304                      struct ladvise_hdr *ladvise_hdr,
305                      obd_enqueue_update_f upcall, void *cookie,
306                      struct ptlrpc_request_set *rqset)
307 {
308         struct ptlrpc_request   *req;
309         struct ost_body         *body;
310         struct osc_ladvise_args *la;
311         int                      rc;
312         struct lu_ladvise       *req_ladvise;
313         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
314         int                      num_advise = ladvise_hdr->lah_count;
315         struct ladvise_hdr      *req_ladvise_hdr;
316         ENTRY;
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323                              num_advise * sizeof(*ladvise));
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
325         if (rc != 0) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329         req->rq_request_portal = OST_IO_PORTAL;
330         ptlrpc_at_set_req_timeout(req);
331
332         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
333         LASSERT(body);
334         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
335                              oa);
336
337         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338                                                  &RMF_OST_LADVISE_HDR);
339         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
340
341         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343         ptlrpc_request_set_replen(req);
344
345         if (rqset == NULL) {
346                 /* Do not wait for response. */
347                 ptlrpcd_add_req(req);
348                 RETURN(0);
349         }
350
351         req->rq_interpret_reply = osc_ladvise_interpret;
352         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353         la = ptlrpc_req_async_args(req);
354         la->la_oa = oa;
355         la->la_upcall = upcall;
356         la->la_cookie = cookie;
357
358         if (rqset == PTLRPCD_SET)
359                 ptlrpcd_add_req(req);
360         else
361                 ptlrpc_set_add_req(rqset, req);
362
363         RETURN(0);
364 }
365
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
367                       struct obdo *oa)
368 {
369         struct ptlrpc_request *req;
370         struct ost_body       *body;
371         int                    rc;
372         ENTRY;
373
374         LASSERT(oa != NULL);
375         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
379         if (req == NULL)
380                 GOTO(out, rc = -ENOMEM);
381
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 GOTO(out, rc);
386         }
387
388         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
389         LASSERT(body);
390
391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
392
393         ptlrpc_request_set_replen(req);
394
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL)
401                 GOTO(out_req, rc = -EPROTO);
402
403         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
405
406         oa->o_blksize = cli_brw_size(exp->exp_obd);
407         oa->o_valid |= OBD_MD_FLBLKSZ;
408
409         CDEBUG(D_HA, "transno: "LPD64"\n",
410                lustre_msg_get_transno(req->rq_repmsg));
411 out_req:
412         ptlrpc_req_finished(req);
413 out:
414         RETURN(rc);
415 }
416
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418                    obd_enqueue_update_f upcall, void *cookie,
419                    struct ptlrpc_request_set *rqset)
420 {
421         struct ptlrpc_request   *req;
422         struct osc_setattr_args *sa;
423         struct ost_body         *body;
424         int                      rc;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
428         if (req == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(rc);
435         }
436         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437         ptlrpc_at_set_req_timeout(req);
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
442
443         ptlrpc_request_set_replen(req);
444
445         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447         sa = ptlrpc_req_async_args(req);
448         sa->sa_oa = oa;
449         sa->sa_upcall = upcall;
450         sa->sa_cookie = cookie;
451         if (rqset == PTLRPCD_SET)
452                 ptlrpcd_add_req(req);
453         else
454                 ptlrpc_set_add_req(rqset, req);
455
456         RETURN(0);
457 }
458
459 static int osc_sync_interpret(const struct lu_env *env,
460                               struct ptlrpc_request *req,
461                               void *arg, int rc)
462 {
463         struct osc_fsync_args   *fa = arg;
464         struct ost_body         *body;
465         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
466         unsigned long           valid = 0;
467         struct cl_object        *obj;
468         ENTRY;
469
470         if (rc != 0)
471                 GOTO(out, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL) {
475                 CERROR("can't unpack ost_body\n");
476                 GOTO(out, rc = -EPROTO);
477         }
478
479         *fa->fa_oa = body->oa;
480         obj = osc2cl(fa->fa_obj);
481
482         /* Update osc object's blocks attribute */
483         cl_object_attr_lock(obj);
484         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485                 attr->cat_blocks = body->oa.o_blocks;
486                 valid |= CAT_BLOCKS;
487         }
488
489         if (valid != 0)
490                 cl_object_attr_update(env, obj, attr, valid);
491         cl_object_attr_unlock(obj);
492
493 out:
494         rc = fa->fa_upcall(fa->fa_cookie, rc);
495         RETURN(rc);
496 }
497
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499                   obd_enqueue_update_f upcall, void *cookie,
500                   struct ptlrpc_request_set *rqset)
501 {
502         struct obd_export     *exp = osc_export(obj);
503         struct ptlrpc_request *req;
504         struct ost_body       *body;
505         struct osc_fsync_args *fa;
506         int                    rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
510         if (req == NULL)
511                 RETURN(-ENOMEM);
512
513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(rc);
517         }
518
519         /* overload the size and blocks fields in the oa with start/end */
520         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
521         LASSERT(body);
522         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
523
524         ptlrpc_request_set_replen(req);
525         req->rq_interpret_reply = osc_sync_interpret;
526
527         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528         fa = ptlrpc_req_async_args(req);
529         fa->fa_obj = obj;
530         fa->fa_oa = oa;
531         fa->fa_upcall = upcall;
532         fa->fa_cookie = cookie;
533
534         if (rqset == PTLRPCD_SET)
535                 ptlrpcd_add_req(req);
536         else
537                 ptlrpc_set_add_req(rqset, req);
538
539         RETURN (0);
540 }
541
542 /* Find and cancel locally locks matched by @mode in the resource found by
543  * @objid. Found locks are added into @cancel list. Returns the amount of
544  * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546                                    struct list_head *cancels,
547                                    enum ldlm_mode mode, __u64 lock_flags)
548 {
549         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550         struct ldlm_res_id res_id;
551         struct ldlm_resource *res;
552         int count;
553         ENTRY;
554
555         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556          * export) but disabled through procfs (flag in NS).
557          *
558          * This distinguishes from a case when ELC is not supported originally,
559          * when we still want to cancel locks in advance and just cancel them
560          * locally, without sending any RPC. */
561         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
562                 RETURN(0);
563
564         ostid_build_res_name(&oa->o_oi, &res_id);
565         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
566         if (IS_ERR(res))
567                 RETURN(0);
568
569         LDLM_RESOURCE_ADDREF(res);
570         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571                                            lock_flags, 0, NULL);
572         LDLM_RESOURCE_DELREF(res);
573         ldlm_resource_putref(res);
574         RETURN(count);
575 }
576
577 static int osc_destroy_interpret(const struct lu_env *env,
578                                  struct ptlrpc_request *req, void *data,
579                                  int rc)
580 {
581         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
582
583         atomic_dec(&cli->cl_destroy_in_flight);
584         wake_up(&cli->cl_destroy_waitq);
585         return 0;
586 }
587
588 static int osc_can_send_destroy(struct client_obd *cli)
589 {
590         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591             cli->cl_max_rpcs_in_flight) {
592                 /* The destroy request can be sent */
593                 return 1;
594         }
595         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596             cli->cl_max_rpcs_in_flight) {
597                 /*
598                  * The counter has been modified between the two atomic
599                  * operations.
600                  */
601                 wake_up(&cli->cl_destroy_waitq);
602         }
603         return 0;
604 }
605
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
607                        struct obdo *oa)
608 {
609         struct client_obd     *cli = &exp->exp_obd->u.cli;
610         struct ptlrpc_request *req;
611         struct ost_body       *body;
612         struct list_head       cancels = LIST_HEAD_INIT(cancels);
613         int rc, count;
614         ENTRY;
615
616         if (!oa) {
617                 CDEBUG(D_INFO, "oa NULL\n");
618                 RETURN(-EINVAL);
619         }
620
621         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622                                         LDLM_FL_DISCARD_DATA);
623
624         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
625         if (req == NULL) {
626                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
627                 RETURN(-ENOMEM);
628         }
629
630         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
631                                0, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(rc);
635         }
636
637         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638         ptlrpc_at_set_req_timeout(req);
639
640         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
641         LASSERT(body);
642         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
643
644         ptlrpc_request_set_replen(req);
645
646         req->rq_interpret_reply = osc_destroy_interpret;
647         if (!osc_can_send_destroy(cli)) {
648                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
649
650                 /*
651                  * Wait until the number of on-going destroy RPCs drops
652                  * under max_rpc_in_flight
653                  */
654                 l_wait_event_exclusive(cli->cl_destroy_waitq,
655                                        osc_can_send_destroy(cli), &lwi);
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
927         }
928
929         if (cli->cl_avail_grant < 0) {
930                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931                       cli_name(cli), cli->cl_avail_grant,
932                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933                 /* workaround for servers which do not have the patch from
934                  * LU-2679 */
935                 cli->cl_avail_grant = ocd->ocd_grant;
936         }
937
938         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
939                 u64 size;
940
941                 /* overhead for each extent insertion */
942                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943                 /* determine the appropriate chunk size used by osc_extent. */
944                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945                                           ocd->ocd_grant_blkbits);
946                 /* determine maximum extent size, in #pages */
947                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949                 if (cli->cl_max_extent_pages == 0)
950                         cli->cl_max_extent_pages = 1;
951         } else {
952                 cli->cl_grant_extent_tax = 0;
953                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
955         }
956         spin_unlock(&cli->cl_loi_list_lock);
957
958         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959                 "chunk bits: %d cl_max_extent_pages: %d\n",
960                 cli_name(cli),
961                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962                 cli->cl_max_extent_pages);
963
964         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965             list_empty(&cli->cl_grant_shrink_list))
966                 osc_add_shrink_grant(cli);
967 }
968
969 /* We assume that the reason this OSC got a short read is because it read
970  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971  * via the LOV, and it _knows_ it's reading inside the file, it's just that
972  * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974                               struct brw_page **pga)
975 {
976         char *ptr;
977         int i = 0;
978
979         /* skip bytes read OK */
980         while (nob_read > 0) {
981                 LASSERT (page_count > 0);
982
983                 if (pga[i]->count > nob_read) {
984                         /* EOF inside this page */
985                         ptr = kmap(pga[i]->pg) +
986                                 (pga[i]->off & ~PAGE_MASK);
987                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988                         kunmap(pga[i]->pg);
989                         page_count--;
990                         i++;
991                         break;
992                 }
993
994                 nob_read -= pga[i]->count;
995                 page_count--;
996                 i++;
997         }
998
999         /* zero remaining pages */
1000         while (page_count-- > 0) {
1001                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002                 memset(ptr, 0, pga[i]->count);
1003                 kunmap(pga[i]->pg);
1004                 i++;
1005         }
1006 }
1007
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009                            int requested_nob, int niocount,
1010                            size_t page_count, struct brw_page **pga)
1011 {
1012         int     i;
1013         __u32   *remote_rcs;
1014
1015         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016                                                   sizeof(*remote_rcs) *
1017                                                   niocount);
1018         if (remote_rcs == NULL) {
1019                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1020                 return(-EPROTO);
1021         }
1022
1023         /* return error if any niobuf was in error */
1024         for (i = 0; i < niocount; i++) {
1025                 if ((int)remote_rcs[i] < 0)
1026                         return(remote_rcs[i]);
1027
1028                 if (remote_rcs[i] != 0) {
1029                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030                                 i, remote_rcs[i], req);
1031                         return(-EPROTO);
1032                 }
1033         }
1034
1035         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037                        req->rq_bulk->bd_nob_transferred, requested_nob);
1038                 return(-EPROTO);
1039         }
1040
1041         return (0);
1042 }
1043
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1045 {
1046         if (p1->flag != p2->flag) {
1047                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1049                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1050
1051                 /* warn if we try to combine flags that we don't know to be
1052                  * safe to combine */
1053                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055                               "report this at https://jira.hpdd.intel.com/\n",
1056                               p1->flag, p2->flag);
1057                 }
1058                 return 0;
1059         }
1060
1061         return (p1->off + p1->count == p2->off);
1062 }
1063
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065                              struct brw_page **pga, int opc,
1066                              cksum_type_t cksum_type)
1067 {
1068         u32                             cksum;
1069         int                             i = 0;
1070         struct cfs_crypto_hash_desc     *hdesc;
1071         unsigned int                    bufsize;
1072         int                             err;
1073         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1074
1075         LASSERT(pg_count > 0);
1076
1077         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078         if (IS_ERR(hdesc)) {
1079                 CERROR("Unable to initialize checksum hash %s\n",
1080                        cfs_crypto_hash_name(cfs_alg));
1081                 return PTR_ERR(hdesc);
1082         }
1083
1084         while (nob > 0 && pg_count > 0) {
1085                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1086
1087                 /* corrupt the data before we compute the checksum, to
1088                  * simulate an OST->client data error */
1089                 if (i == 0 && opc == OST_READ &&
1090                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091                         unsigned char *ptr = kmap(pga[i]->pg);
1092                         int off = pga[i]->off & ~PAGE_MASK;
1093
1094                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1095                         kunmap(pga[i]->pg);
1096                 }
1097                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098                                             pga[i]->off & ~PAGE_MASK,
1099                                             count);
1100                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101                                (int)(pga[i]->off & ~PAGE_MASK));
1102
1103                 nob -= pga[i]->count;
1104                 pg_count--;
1105                 i++;
1106         }
1107
1108         bufsize = sizeof(cksum);
1109         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1110
1111         /* For sending we only compute the wrong checksum instead
1112          * of corrupting the data so it is still correct on a redo */
1113         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1114                 cksum++;
1115
1116         return cksum;
1117 }
1118
1119 static int
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121                      u32 page_count, struct brw_page **pga,
1122                      struct ptlrpc_request **reqp, int resend)
1123 {
1124         struct ptlrpc_request   *req;
1125         struct ptlrpc_bulk_desc *desc;
1126         struct ost_body         *body;
1127         struct obd_ioobj        *ioobj;
1128         struct niobuf_remote    *niobuf;
1129         int niocount, i, requested_nob, opc, rc;
1130         struct osc_brw_async_args *aa;
1131         struct req_capsule      *pill;
1132         struct brw_page *pg_prev;
1133
1134         ENTRY;
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136                 RETURN(-ENOMEM); /* Recoverable */
1137         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138                 RETURN(-EINVAL); /* Fatal */
1139
1140         if ((cmd & OBD_BRW_WRITE) != 0) {
1141                 opc = OST_WRITE;
1142                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1143                                                 osc_rq_pool,
1144                                                 &RQF_OST_BRW_WRITE);
1145         } else {
1146                 opc = OST_READ;
1147                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1148         }
1149         if (req == NULL)
1150                 RETURN(-ENOMEM);
1151
1152         for (niocount = i = 1; i < page_count; i++) {
1153                 if (!can_merge_pages(pga[i - 1], pga[i]))
1154                         niocount++;
1155         }
1156
1157         pill = &req->rq_pill;
1158         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1159                              sizeof(*ioobj));
1160         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161                              niocount * sizeof(*niobuf));
1162
1163         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1164         if (rc) {
1165                 ptlrpc_request_free(req);
1166                 RETURN(rc);
1167         }
1168         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169         ptlrpc_at_set_req_timeout(req);
1170         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1171          * retry logic */
1172         req->rq_no_retry_einprogress = 1;
1173
1174         desc = ptlrpc_prep_bulk_imp(req, page_count,
1175                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177                         PTLRPC_BULK_PUT_SINK) |
1178                         PTLRPC_BULK_BUF_KIOV,
1179                 OST_BULK_PORTAL,
1180                 &ptlrpc_bulk_kiov_pin_ops);
1181
1182         if (desc == NULL)
1183                 GOTO(out, rc = -ENOMEM);
1184         /* NB request now owns desc and will free it when it gets freed */
1185
1186         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1190
1191         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1192
1193         obdo_to_ioobj(oa, ioobj);
1194         ioobj->ioo_bufcnt = niocount;
1195         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196          * that might be send for this request.  The actual number is decided
1197          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198          * "max - 1" for old client compatibility sending "0", and also so the
1199          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201         LASSERT(page_count > 0);
1202         pg_prev = pga[0];
1203         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204                 struct brw_page *pg = pga[i];
1205                 int poff = pg->off & ~PAGE_MASK;
1206
1207                 LASSERT(pg->count > 0);
1208                 /* make sure there is no gap in the middle of page array */
1209                 LASSERTF(page_count == 1 ||
1210                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211                           ergo(i > 0 && i < page_count - 1,
1212                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1213                           ergo(i == page_count - 1, poff == 0)),
1214                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215                          i, page_count, pg, pg->off, pg->count);
1216                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1219                          i, page_count,
1220                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221                          pg_prev->pg, page_private(pg_prev->pg),
1222                          pg_prev->pg->index, pg_prev->off);
1223                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224                         (pg->flag & OBD_BRW_SRVLOCK));
1225
1226                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227                 requested_nob += pg->count;
1228
1229                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1230                         niobuf--;
1231                         niobuf->rnb_len += pg->count;
1232                 } else {
1233                         niobuf->rnb_offset = pg->off;
1234                         niobuf->rnb_len    = pg->count;
1235                         niobuf->rnb_flags  = pg->flag;
1236                 }
1237                 pg_prev = pg;
1238         }
1239
1240         LASSERTF((void *)(niobuf - niocount) ==
1241                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1244
1245         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1246         if (resend) {
1247                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1249                         body->oa.o_flags = 0;
1250                 }
1251                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1252         }
1253
1254         if (osc_should_shrink_grant(cli))
1255                 osc_shrink_grant_local(cli, &body->oa);
1256
1257         /* size[REQ_REC_OFF] still sizeof (*body) */
1258         if (opc == OST_WRITE) {
1259                 if (cli->cl_checksum &&
1260                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261                         /* store cl_cksum_type in a local variable since
1262                          * it can be changed via lprocfs */
1263                         cksum_type_t cksum_type = cli->cl_cksum_type;
1264
1265                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267                                 body->oa.o_flags = 0;
1268                         }
1269                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1270                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1272                                                              page_count, pga,
1273                                                              OST_WRITE,
1274                                                              cksum_type);
1275                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1276                                body->oa.o_cksum);
1277                         /* save this in 'oa', too, for later checking */
1278                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279                         oa->o_flags |= cksum_type_pack(cksum_type);
1280                 } else {
1281                         /* clear out the checksum flag, in case this is a
1282                          * resend but cl_checksum is no longer set. b=11238 */
1283                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1284                 }
1285                 oa->o_cksum = body->oa.o_cksum;
1286                 /* 1 RC per niobuf */
1287                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288                                      sizeof(__u32) * niocount);
1289         } else {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293                                 body->oa.o_flags = 0;
1294                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                 }
1297         }
1298         ptlrpc_request_set_replen(req);
1299
1300         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301         aa = ptlrpc_req_async_args(req);
1302         aa->aa_oa = oa;
1303         aa->aa_requested_nob = requested_nob;
1304         aa->aa_nio_count = niocount;
1305         aa->aa_page_count = page_count;
1306         aa->aa_resends = 0;
1307         aa->aa_ppga = pga;
1308         aa->aa_cli = cli;
1309         INIT_LIST_HEAD(&aa->aa_oaps);
1310
1311         *reqp = req;
1312         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316         RETURN(0);
1317
1318  out:
1319         ptlrpc_req_finished(req);
1320         RETURN(rc);
1321 }
1322
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324                                 __u32 client_cksum, __u32 server_cksum, int nob,
1325                                 size_t page_count, struct brw_page **pga,
1326                                 cksum_type_t client_cksum_type)
1327 {
1328         __u32 new_cksum;
1329         char *msg;
1330         cksum_type_t cksum_type;
1331
1332         if (server_cksum == client_cksum) {
1333                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1334                 return 0;
1335         }
1336
1337         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1338                                        oa->o_flags : 0);
1339         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1340                                       cksum_type);
1341
1342         if (cksum_type != client_cksum_type)
1343                 msg = "the server did not use the checksum type specified in "
1344                       "the original request - likely a protocol problem";
1345         else if (new_cksum == server_cksum)
1346                 msg = "changed on the client after we checksummed it - "
1347                       "likely false positive due to mmap IO (bug 11742)";
1348         else if (new_cksum == client_cksum)
1349                 msg = "changed in transit before arrival at OST";
1350         else
1351                 msg = "changed in transit AND doesn't match the original - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353
1354         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356                            msg, libcfs_nid2str(peer->nid),
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360                            POSTID(&oa->o_oi), pga[0]->off,
1361                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363                "client csum now %x\n", client_cksum, client_cksum_type,
1364                server_cksum, cksum_type, new_cksum);
1365         return 1;
1366 }
1367
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1370 {
1371         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372         const lnet_process_id_t *peer =
1373                         &req->rq_import->imp_connection->c_peer;
1374         struct client_obd *cli = aa->aa_cli;
1375         struct ost_body *body;
1376         u32 client_cksum = 0;
1377         ENTRY;
1378
1379         if (rc < 0 && rc != -EDQUOT) {
1380                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1381                 RETURN(rc);
1382         }
1383
1384         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1386         if (body == NULL) {
1387                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1388                 RETURN(-EPROTO);
1389         }
1390
1391         /* set/clear over quota flag for a uid/gid */
1392         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1395
1396                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1397                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1398                        body->oa.o_flags);
1399                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1400         }
1401
1402         osc_update_grant(cli, body);
1403
1404         if (rc < 0)
1405                 RETURN(rc);
1406
1407         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1408                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1409
1410         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1411                 if (rc > 0) {
1412                         CERROR("Unexpected +ve rc %d\n", rc);
1413                         RETURN(-EPROTO);
1414                 }
1415                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1416
1417                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1418                         RETURN(-EAGAIN);
1419
1420                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1421                     check_write_checksum(&body->oa, peer, client_cksum,
1422                                          body->oa.o_cksum, aa->aa_requested_nob,
1423                                          aa->aa_page_count, aa->aa_ppga,
1424                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1425                         RETURN(-EAGAIN);
1426
1427                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1428                                      aa->aa_page_count, aa->aa_ppga);
1429                 GOTO(out, rc);
1430         }
1431
1432         /* The rest of this function executes only for OST_READs */
1433
1434         /* if unwrap_bulk failed, return -EAGAIN to retry */
1435         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1436         if (rc < 0)
1437                 GOTO(out, rc = -EAGAIN);
1438
1439         if (rc > aa->aa_requested_nob) {
1440                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1441                        aa->aa_requested_nob);
1442                 RETURN(-EPROTO);
1443         }
1444
1445         if (rc != req->rq_bulk->bd_nob_transferred) {
1446                 CERROR ("Unexpected rc %d (%d transferred)\n",
1447                         rc, req->rq_bulk->bd_nob_transferred);
1448                 return (-EPROTO);
1449         }
1450
1451         if (rc < aa->aa_requested_nob)
1452                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1453
1454         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1455                 static int cksum_counter;
1456                 u32        server_cksum = body->oa.o_cksum;
1457                 char      *via = "";
1458                 char      *router = "";
1459                 cksum_type_t cksum_type;
1460
1461                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1462                                                body->oa.o_flags : 0);
1463                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1464                                                  aa->aa_ppga, OST_READ,
1465                                                  cksum_type);
1466
1467                 if (peer->nid != req->rq_bulk->bd_sender) {
1468                         via = " via ";
1469                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1470                 }
1471
1472                 if (server_cksum != client_cksum) {
1473                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1474                                            "%s%s%s inode "DFID" object "DOSTID
1475                                            " extent ["LPU64"-"LPU64"]\n",
1476                                            req->rq_import->imp_obd->obd_name,
1477                                            libcfs_nid2str(peer->nid),
1478                                            via, router,
1479                                            body->oa.o_valid & OBD_MD_FLFID ?
1480                                                 body->oa.o_parent_seq : (__u64)0,
1481                                            body->oa.o_valid & OBD_MD_FLFID ?
1482                                                 body->oa.o_parent_oid : 0,
1483                                            body->oa.o_valid & OBD_MD_FLFID ?
1484                                                 body->oa.o_parent_ver : 0,
1485                                            POSTID(&body->oa.o_oi),
1486                                            aa->aa_ppga[0]->off,
1487                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1488                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1489                                                                         1);
1490                         CERROR("client %x, server %x, cksum_type %x\n",
1491                                client_cksum, server_cksum, cksum_type);
1492                         cksum_counter = 0;
1493                         aa->aa_oa->o_cksum = client_cksum;
1494                         rc = -EAGAIN;
1495                 } else {
1496                         cksum_counter++;
1497                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1498                         rc = 0;
1499                 }
1500         } else if (unlikely(client_cksum)) {
1501                 static int cksum_missed;
1502
1503                 cksum_missed++;
1504                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1505                         CERROR("Checksum %u requested from %s but not sent\n",
1506                                cksum_missed, libcfs_nid2str(peer->nid));
1507         } else {
1508                 rc = 0;
1509         }
1510 out:
1511         if (rc >= 0)
1512                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1513                                      aa->aa_oa, &body->oa);
1514
1515         RETURN(rc);
1516 }
1517
1518 static int osc_brw_redo_request(struct ptlrpc_request *request,
1519                                 struct osc_brw_async_args *aa, int rc)
1520 {
1521         struct ptlrpc_request *new_req;
1522         struct osc_brw_async_args *new_aa;
1523         struct osc_async_page *oap;
1524         ENTRY;
1525
1526         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1527                   "redo for recoverable error %d", rc);
1528
1529         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1530                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1531                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1532                                   aa->aa_ppga, &new_req, 1);
1533         if (rc)
1534                 RETURN(rc);
1535
1536         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1537                 if (oap->oap_request != NULL) {
1538                         LASSERTF(request == oap->oap_request,
1539                                  "request %p != oap_request %p\n",
1540                                  request, oap->oap_request);
1541                         if (oap->oap_interrupted) {
1542                                 ptlrpc_req_finished(new_req);
1543                                 RETURN(-EINTR);
1544                         }
1545                 }
1546         }
1547         /* New request takes over pga and oaps from old request.
1548          * Note that copying a list_head doesn't work, need to move it... */
1549         aa->aa_resends++;
1550         new_req->rq_interpret_reply = request->rq_interpret_reply;
1551         new_req->rq_async_args = request->rq_async_args;
1552         new_req->rq_commit_cb = request->rq_commit_cb;
1553         /* cap resend delay to the current request timeout, this is similar to
1554          * what ptlrpc does (see after_reply()) */
1555         if (aa->aa_resends > new_req->rq_timeout)
1556                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1557         else
1558                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1559         new_req->rq_generation_set = 1;
1560         new_req->rq_import_generation = request->rq_import_generation;
1561
1562         new_aa = ptlrpc_req_async_args(new_req);
1563
1564         INIT_LIST_HEAD(&new_aa->aa_oaps);
1565         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1566         INIT_LIST_HEAD(&new_aa->aa_exts);
1567         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1568         new_aa->aa_resends = aa->aa_resends;
1569
1570         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1571                 if (oap->oap_request) {
1572                         ptlrpc_req_finished(oap->oap_request);
1573                         oap->oap_request = ptlrpc_request_addref(new_req);
1574                 }
1575         }
1576
1577         /* XXX: This code will run into problem if we're going to support
1578          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1579          * and wait for all of them to be finished. We should inherit request
1580          * set from old request. */
1581         ptlrpcd_add_req(new_req);
1582
1583         DEBUG_REQ(D_INFO, new_req, "new request");
1584         RETURN(0);
1585 }
1586
1587 /*
1588  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1589  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1590  * fine for our small page arrays and doesn't require allocation.  its an
1591  * insertion sort that swaps elements that are strides apart, shrinking the
1592  * stride down until its '1' and the array is sorted.
1593  */
1594 static void sort_brw_pages(struct brw_page **array, int num)
1595 {
1596         int stride, i, j;
1597         struct brw_page *tmp;
1598
1599         if (num == 1)
1600                 return;
1601         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1602                 ;
1603
1604         do {
1605                 stride /= 3;
1606                 for (i = stride ; i < num ; i++) {
1607                         tmp = array[i];
1608                         j = i;
1609                         while (j >= stride && array[j - stride]->off > tmp->off) {
1610                                 array[j] = array[j - stride];
1611                                 j -= stride;
1612                         }
1613                         array[j] = tmp;
1614                 }
1615         } while (stride > 1);
1616 }
1617
1618 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1619 {
1620         LASSERT(ppga != NULL);
1621         OBD_FREE(ppga, sizeof(*ppga) * count);
1622 }
1623
1624 static int brw_interpret(const struct lu_env *env,
1625                          struct ptlrpc_request *req, void *data, int rc)
1626 {
1627         struct osc_brw_async_args *aa = data;
1628         struct osc_extent *ext;
1629         struct osc_extent *tmp;
1630         struct client_obd *cli = aa->aa_cli;
1631         ENTRY;
1632
1633         rc = osc_brw_fini_request(req, rc);
1634         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1635         /* When server return -EINPROGRESS, client should always retry
1636          * regardless of the number of times the bulk was resent already. */
1637         if (osc_recoverable_error(rc)) {
1638                 if (req->rq_import_generation !=
1639                     req->rq_import->imp_generation) {
1640                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1641                                ""DOSTID", rc = %d.\n",
1642                                req->rq_import->imp_obd->obd_name,
1643                                POSTID(&aa->aa_oa->o_oi), rc);
1644                 } else if (rc == -EINPROGRESS ||
1645                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1646                         rc = osc_brw_redo_request(req, aa, rc);
1647                 } else {
1648                         CERROR("%s: too many resent retries for object: "
1649                                ""LPU64":"LPU64", rc = %d.\n",
1650                                req->rq_import->imp_obd->obd_name,
1651                                POSTID(&aa->aa_oa->o_oi), rc);
1652                 }
1653
1654                 if (rc == 0)
1655                         RETURN(0);
1656                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1657                         rc = -EIO;
1658         }
1659
1660         if (rc == 0) {
1661                 struct obdo *oa = aa->aa_oa;
1662                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1663                 unsigned long valid = 0;
1664                 struct cl_object *obj;
1665                 struct osc_async_page *last;
1666
1667                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1668                 obj = osc2cl(last->oap_obj);
1669
1670                 cl_object_attr_lock(obj);
1671                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1672                         attr->cat_blocks = oa->o_blocks;
1673                         valid |= CAT_BLOCKS;
1674                 }
1675                 if (oa->o_valid & OBD_MD_FLMTIME) {
1676                         attr->cat_mtime = oa->o_mtime;
1677                         valid |= CAT_MTIME;
1678                 }
1679                 if (oa->o_valid & OBD_MD_FLATIME) {
1680                         attr->cat_atime = oa->o_atime;
1681                         valid |= CAT_ATIME;
1682                 }
1683                 if (oa->o_valid & OBD_MD_FLCTIME) {
1684                         attr->cat_ctime = oa->o_ctime;
1685                         valid |= CAT_CTIME;
1686                 }
1687
1688                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1689                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1690                         loff_t last_off = last->oap_count + last->oap_obj_off +
1691                                 last->oap_page_off;
1692
1693                         /* Change file size if this is an out of quota or
1694                          * direct IO write and it extends the file size */
1695                         if (loi->loi_lvb.lvb_size < last_off) {
1696                                 attr->cat_size = last_off;
1697                                 valid |= CAT_SIZE;
1698                         }
1699                         /* Extend KMS if it's not a lockless write */
1700                         if (loi->loi_kms < last_off &&
1701                             oap2osc_page(last)->ops_srvlock == 0) {
1702                                 attr->cat_kms = last_off;
1703                                 valid |= CAT_KMS;
1704                         }
1705                 }
1706
1707                 if (valid != 0)
1708                         cl_object_attr_update(env, obj, attr, valid);
1709                 cl_object_attr_unlock(obj);
1710         }
1711         OBDO_FREE(aa->aa_oa);
1712
1713         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1714                 osc_inc_unstable_pages(req);
1715
1716         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1717                 list_del_init(&ext->oe_link);
1718                 osc_extent_finish(env, ext, 1, rc);
1719         }
1720         LASSERT(list_empty(&aa->aa_exts));
1721         LASSERT(list_empty(&aa->aa_oaps));
1722
1723         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1724         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1725
1726         spin_lock(&cli->cl_loi_list_lock);
1727         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1728          * is called so we know whether to go to sync BRWs or wait for more
1729          * RPCs to complete */
1730         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1731                 cli->cl_w_in_flight--;
1732         else
1733                 cli->cl_r_in_flight--;
1734         osc_wake_cache_waiters(cli);
1735         spin_unlock(&cli->cl_loi_list_lock);
1736
1737         osc_io_unplug(env, cli, NULL);
1738         RETURN(rc);
1739 }
1740
1741 static void brw_commit(struct ptlrpc_request *req)
1742 {
1743         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1744          * this called via the rq_commit_cb, I need to ensure
1745          * osc_dec_unstable_pages is still called. Otherwise unstable
1746          * pages may be leaked. */
1747         spin_lock(&req->rq_lock);
1748         if (likely(req->rq_unstable)) {
1749                 req->rq_unstable = 0;
1750                 spin_unlock(&req->rq_lock);
1751
1752                 osc_dec_unstable_pages(req);
1753         } else {
1754                 req->rq_committed = 1;
1755                 spin_unlock(&req->rq_lock);
1756         }
1757 }
1758
1759 /**
1760  * Build an RPC by the list of extent @ext_list. The caller must ensure
1761  * that the total pages in this list are NOT over max pages per RPC.
1762  * Extents in the list must be in OES_RPC state.
1763  */
1764 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1765                   struct list_head *ext_list, int cmd)
1766 {
1767         struct ptlrpc_request           *req = NULL;
1768         struct osc_extent               *ext;
1769         struct brw_page                 **pga = NULL;
1770         struct osc_brw_async_args       *aa = NULL;
1771         struct obdo                     *oa = NULL;
1772         struct osc_async_page           *oap;
1773         struct osc_object               *obj = NULL;
1774         struct cl_req_attr              *crattr = NULL;
1775         loff_t                          starting_offset = OBD_OBJECT_EOF;
1776         loff_t                          ending_offset = 0;
1777         int                             mpflag = 0;
1778         int                             mem_tight = 0;
1779         int                             page_count = 0;
1780         bool                            soft_sync = false;
1781         bool                            interrupted = false;
1782         int                             i;
1783         int                             grant = 0;
1784         int                             rc;
1785         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1786         struct ost_body                 *body;
1787         ENTRY;
1788         LASSERT(!list_empty(ext_list));
1789
1790         /* add pages into rpc_list to build BRW rpc */
1791         list_for_each_entry(ext, ext_list, oe_link) {
1792                 LASSERT(ext->oe_state == OES_RPC);
1793                 mem_tight |= ext->oe_memalloc;
1794                 grant += ext->oe_grants;
1795                 page_count += ext->oe_nr_pages;
1796                 if (obj == NULL)
1797                         obj = ext->oe_obj;
1798         }
1799
1800         soft_sync = osc_over_unstable_soft_limit(cli);
1801         if (mem_tight)
1802                 mpflag = cfs_memory_pressure_get_and_set();
1803
1804         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1805         if (pga == NULL)
1806                 GOTO(out, rc = -ENOMEM);
1807
1808         OBDO_ALLOC(oa);
1809         if (oa == NULL)
1810                 GOTO(out, rc = -ENOMEM);
1811
1812         i = 0;
1813         list_for_each_entry(ext, ext_list, oe_link) {
1814                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1815                         if (mem_tight)
1816                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1817                         if (soft_sync)
1818                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1819                         pga[i] = &oap->oap_brw_page;
1820                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1821                         i++;
1822
1823                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1824                         if (starting_offset == OBD_OBJECT_EOF ||
1825                             starting_offset > oap->oap_obj_off)
1826                                 starting_offset = oap->oap_obj_off;
1827                         else
1828                                 LASSERT(oap->oap_page_off == 0);
1829                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1830                                 ending_offset = oap->oap_obj_off +
1831                                                 oap->oap_count;
1832                         else
1833                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1834                                         PAGE_CACHE_SIZE);
1835                         if (oap->oap_interrupted)
1836                                 interrupted = true;
1837                 }
1838         }
1839
1840         /* first page in the list */
1841         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1842
1843         crattr = &osc_env_info(env)->oti_req_attr;
1844         memset(crattr, 0, sizeof(*crattr));
1845         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1846         crattr->cra_flags = ~0ULL;
1847         crattr->cra_page = oap2cl_page(oap);
1848         crattr->cra_oa = oa;
1849         cl_req_attr_set(env, osc2cl(obj), crattr);
1850
1851         if (cmd == OBD_BRW_WRITE)
1852                 oa->o_grant_used = grant;
1853
1854         sort_brw_pages(pga, page_count);
1855         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1856         if (rc != 0) {
1857                 CERROR("prep_req failed: %d\n", rc);
1858                 GOTO(out, rc);
1859         }
1860
1861         req->rq_commit_cb = brw_commit;
1862         req->rq_interpret_reply = brw_interpret;
1863         req->rq_memalloc = mem_tight != 0;
1864         oap->oap_request = ptlrpc_request_addref(req);
1865         if (interrupted && !req->rq_intr)
1866                 ptlrpc_mark_interrupted(req);
1867
1868         /* Need to update the timestamps after the request is built in case
1869          * we race with setattr (locally or in queue at OST).  If OST gets
1870          * later setattr before earlier BRW (as determined by the request xid),
1871          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1872          * way to do this in a single call.  bug 10150 */
1873         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1874         crattr->cra_oa = &body->oa;
1875         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1876         cl_req_attr_set(env, osc2cl(obj), crattr);
1877         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1878
1879         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1880         aa = ptlrpc_req_async_args(req);
1881         INIT_LIST_HEAD(&aa->aa_oaps);
1882         list_splice_init(&rpc_list, &aa->aa_oaps);
1883         INIT_LIST_HEAD(&aa->aa_exts);
1884         list_splice_init(ext_list, &aa->aa_exts);
1885
1886         spin_lock(&cli->cl_loi_list_lock);
1887         starting_offset >>= PAGE_CACHE_SHIFT;
1888         if (cmd == OBD_BRW_READ) {
1889                 cli->cl_r_in_flight++;
1890                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893                                       starting_offset + 1);
1894         } else {
1895                 cli->cl_w_in_flight++;
1896                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899                                       starting_offset + 1);
1900         }
1901         spin_unlock(&cli->cl_loi_list_lock);
1902
1903         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904                   page_count, aa, cli->cl_r_in_flight,
1905                   cli->cl_w_in_flight);
1906
1907         ptlrpcd_add_req(req);
1908         rc = 0;
1909         EXIT;
1910
1911 out:
1912         if (mem_tight != 0)
1913                 cfs_memory_pressure_restore(mpflag);
1914
1915         if (rc != 0) {
1916                 LASSERT(req == NULL);
1917
1918                 if (oa)
1919                         OBDO_FREE(oa);
1920                 if (pga)
1921                         OBD_FREE(pga, sizeof(*pga) * page_count);
1922                 /* this should happen rarely and is pretty bad, it makes the
1923                  * pending list not follow the dirty order */
1924                 while (!list_empty(ext_list)) {
1925                         ext = list_entry(ext_list->next, struct osc_extent,
1926                                          oe_link);
1927                         list_del_init(&ext->oe_link);
1928                         osc_extent_finish(env, ext, 0, rc);
1929                 }
1930         }
1931         RETURN(rc);
1932 }
1933
1934 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1935 {
1936         int set = 0;
1937
1938         LASSERT(lock != NULL);
1939
1940         lock_res_and_lock(lock);
1941
1942         if (lock->l_ast_data == NULL)
1943                 lock->l_ast_data = data;
1944         if (lock->l_ast_data == data)
1945                 set = 1;
1946
1947         unlock_res_and_lock(lock);
1948
1949         return set;
1950 }
1951
1952 static int osc_enqueue_fini(struct ptlrpc_request *req,
1953                             osc_enqueue_upcall_f upcall, void *cookie,
1954                             struct lustre_handle *lockh, enum ldlm_mode mode,
1955                             __u64 *flags, int agl, int errcode)
1956 {
1957         bool intent = *flags & LDLM_FL_HAS_INTENT;
1958         int rc;
1959         ENTRY;
1960
1961         /* The request was created before ldlm_cli_enqueue call. */
1962         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1963                 struct ldlm_reply *rep;
1964
1965                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1966                 LASSERT(rep != NULL);
1967
1968                 rep->lock_policy_res1 =
1969                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1970                 if (rep->lock_policy_res1)
1971                         errcode = rep->lock_policy_res1;
1972                 if (!agl)
1973                         *flags |= LDLM_FL_LVB_READY;
1974         } else if (errcode == ELDLM_OK) {
1975                 *flags |= LDLM_FL_LVB_READY;
1976         }
1977
1978         /* Call the update callback. */
1979         rc = (*upcall)(cookie, lockh, errcode);
1980
1981         /* release the reference taken in ldlm_cli_enqueue() */
1982         if (errcode == ELDLM_LOCK_MATCHED)
1983                 errcode = ELDLM_OK;
1984         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1985                 ldlm_lock_decref(lockh, mode);
1986
1987         RETURN(rc);
1988 }
1989
1990 static int osc_enqueue_interpret(const struct lu_env *env,
1991                                  struct ptlrpc_request *req,
1992                                  struct osc_enqueue_args *aa, int rc)
1993 {
1994         struct ldlm_lock *lock;
1995         struct lustre_handle *lockh = &aa->oa_lockh;
1996         enum ldlm_mode mode = aa->oa_mode;
1997         struct ost_lvb *lvb = aa->oa_lvb;
1998         __u32 lvb_len = sizeof(*lvb);
1999         __u64 flags = 0;
2000
2001         ENTRY;
2002
2003         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2004          * be valid. */
2005         lock = ldlm_handle2lock(lockh);
2006         LASSERTF(lock != NULL,
2007                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2008                  lockh->cookie, req, aa);
2009
2010         /* Take an additional reference so that a blocking AST that
2011          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2012          * to arrive after an upcall has been executed by
2013          * osc_enqueue_fini(). */
2014         ldlm_lock_addref(lockh, mode);
2015
2016         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2017         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2018
2019         /* Let CP AST to grant the lock first. */
2020         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2021
2022         if (aa->oa_agl) {
2023                 LASSERT(aa->oa_lvb == NULL);
2024                 LASSERT(aa->oa_flags == NULL);
2025                 aa->oa_flags = &flags;
2026         }
2027
2028         /* Complete obtaining the lock procedure. */
2029         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2030                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2031                                    lockh, rc);
2032         /* Complete osc stuff. */
2033         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2034                               aa->oa_flags, aa->oa_agl, rc);
2035
2036         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2037
2038         ldlm_lock_decref(lockh, mode);
2039         LDLM_LOCK_PUT(lock);
2040         RETURN(rc);
2041 }
2042
2043 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2044
2045 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2046  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2047  * other synchronous requests, however keeping some locks and trying to obtain
2048  * others may take a considerable amount of time in a case of ost failure; and
2049  * when other sync requests do not get released lock from a client, the client
2050  * is evicted from the cluster -- such scenarious make the life difficult, so
2051  * release locks just after they are obtained. */
2052 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2053                      __u64 *flags, union ldlm_policy_data *policy,
2054                      struct ost_lvb *lvb, int kms_valid,
2055                      osc_enqueue_upcall_f upcall, void *cookie,
2056                      struct ldlm_enqueue_info *einfo,
2057                      struct ptlrpc_request_set *rqset, int async, int agl)
2058 {
2059         struct obd_device *obd = exp->exp_obd;
2060         struct lustre_handle lockh = { 0 };
2061         struct ptlrpc_request *req = NULL;
2062         int intent = *flags & LDLM_FL_HAS_INTENT;
2063         __u64 match_flags = *flags;
2064         enum ldlm_mode mode;
2065         int rc;
2066         ENTRY;
2067
2068         /* Filesystem lock extents are extended to page boundaries so that
2069          * dealing with the page cache is a little smoother.  */
2070         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2071         policy->l_extent.end |= ~PAGE_MASK;
2072
2073         /*
2074          * kms is not valid when either object is completely fresh (so that no
2075          * locks are cached), or object was evicted. In the latter case cached
2076          * lock cannot be used, because it would prime inode state with
2077          * potentially stale LVB.
2078          */
2079         if (!kms_valid)
2080                 goto no_match;
2081
2082         /* Next, search for already existing extent locks that will cover us */
2083         /* If we're trying to read, we also search for an existing PW lock.  The
2084          * VFS and page cache already protect us locally, so lots of readers/
2085          * writers can share a single PW lock.
2086          *
2087          * There are problems with conversion deadlocks, so instead of
2088          * converting a read lock to a write lock, we'll just enqueue a new
2089          * one.
2090          *
2091          * At some point we should cancel the read lock instead of making them
2092          * send us a blocking callback, but there are problems with canceling
2093          * locks out from other users right now, too. */
2094         mode = einfo->ei_mode;
2095         if (einfo->ei_mode == LCK_PR)
2096                 mode |= LCK_PW;
2097         if (agl == 0)
2098                 match_flags |= LDLM_FL_LVB_READY;
2099         if (intent != 0)
2100                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2101         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2102                                einfo->ei_type, policy, mode, &lockh, 0);
2103         if (mode) {
2104                 struct ldlm_lock *matched;
2105
2106                 if (*flags & LDLM_FL_TEST_LOCK)
2107                         RETURN(ELDLM_OK);
2108
2109                 matched = ldlm_handle2lock(&lockh);
2110                 if (agl) {
2111                         /* AGL enqueues DLM locks speculatively. Therefore if
2112                          * it already exists a DLM lock, it wll just inform the
2113                          * caller to cancel the AGL process for this stripe. */
2114                         ldlm_lock_decref(&lockh, mode);
2115                         LDLM_LOCK_PUT(matched);
2116                         RETURN(-ECANCELED);
2117                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2118                         *flags |= LDLM_FL_LVB_READY;
2119
2120                         /* We already have a lock, and it's referenced. */
2121                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2122
2123                         ldlm_lock_decref(&lockh, mode);
2124                         LDLM_LOCK_PUT(matched);
2125                         RETURN(ELDLM_OK);
2126                 } else {
2127                         ldlm_lock_decref(&lockh, mode);
2128                         LDLM_LOCK_PUT(matched);
2129                 }
2130         }
2131
2132 no_match:
2133         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2134                 RETURN(-ENOLCK);
2135
2136         if (intent) {
2137                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2138                                            &RQF_LDLM_ENQUEUE_LVB);
2139                 if (req == NULL)
2140                         RETURN(-ENOMEM);
2141
2142                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2143                 if (rc) {
2144                         ptlrpc_request_free(req);
2145                         RETURN(rc);
2146                 }
2147
2148                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2149                                      sizeof *lvb);
2150                 ptlrpc_request_set_replen(req);
2151         }
2152
2153         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2154         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2155
2156         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2157                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2158         if (async) {
2159                 if (!rc) {
2160                         struct osc_enqueue_args *aa;
2161                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2162                         aa = ptlrpc_req_async_args(req);
2163                         aa->oa_exp    = exp;
2164                         aa->oa_mode   = einfo->ei_mode;
2165                         aa->oa_type   = einfo->ei_type;
2166                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2167                         aa->oa_upcall = upcall;
2168                         aa->oa_cookie = cookie;
2169                         aa->oa_agl    = !!agl;
2170                         if (!agl) {
2171                                 aa->oa_flags  = flags;
2172                                 aa->oa_lvb    = lvb;
2173                         } else {
2174                                 /* AGL is essentially to enqueue an DLM lock
2175                                  * in advance, so we don't care about the
2176                                  * result of AGL enqueue. */
2177                                 aa->oa_lvb    = NULL;
2178                                 aa->oa_flags  = NULL;
2179                         }
2180
2181                         req->rq_interpret_reply =
2182                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2183                         if (rqset == PTLRPCD_SET)
2184                                 ptlrpcd_add_req(req);
2185                         else
2186                                 ptlrpc_set_add_req(rqset, req);
2187                 } else if (intent) {
2188                         ptlrpc_req_finished(req);
2189                 }
2190                 RETURN(rc);
2191         }
2192
2193         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2194                               flags, agl, rc);
2195         if (intent)
2196                 ptlrpc_req_finished(req);
2197
2198         RETURN(rc);
2199 }
2200
2201 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2202                    enum ldlm_type type, union ldlm_policy_data *policy,
2203                    enum ldlm_mode mode, __u64 *flags, void *data,
2204                    struct lustre_handle *lockh, int unref)
2205 {
2206         struct obd_device *obd = exp->exp_obd;
2207         __u64 lflags = *flags;
2208         enum ldlm_mode rc;
2209         ENTRY;
2210
2211         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2212                 RETURN(-EIO);
2213
2214         /* Filesystem lock extents are extended to page boundaries so that
2215          * dealing with the page cache is a little smoother */
2216         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2217         policy->l_extent.end |= ~PAGE_MASK;
2218
2219         /* Next, search for already existing extent locks that will cover us */
2220         /* If we're trying to read, we also search for an existing PW lock.  The
2221          * VFS and page cache already protect us locally, so lots of readers/
2222          * writers can share a single PW lock. */
2223         rc = mode;
2224         if (mode == LCK_PR)
2225                 rc |= LCK_PW;
2226         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2227                              res_id, type, policy, rc, lockh, unref);
2228         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2229                 RETURN(rc);
2230
2231         if (data != NULL) {
2232                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2233
2234                 LASSERT(lock != NULL);
2235                 if (!osc_set_lock_data(lock, data)) {
2236                         ldlm_lock_decref(lockh, rc);
2237                         rc = 0;
2238                 }
2239                 LDLM_LOCK_PUT(lock);
2240         }
2241         RETURN(rc);
2242 }
2243
2244 static int osc_statfs_interpret(const struct lu_env *env,
2245                                 struct ptlrpc_request *req,
2246                                 struct osc_async_args *aa, int rc)
2247 {
2248         struct obd_statfs *msfs;
2249         ENTRY;
2250
2251         if (rc == -EBADR)
2252                 /* The request has in fact never been sent
2253                  * due to issues at a higher level (LOV).
2254                  * Exit immediately since the caller is
2255                  * aware of the problem and takes care
2256                  * of the clean up */
2257                  RETURN(rc);
2258
2259         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2260             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2261                 GOTO(out, rc = 0);
2262
2263         if (rc != 0)
2264                 GOTO(out, rc);
2265
2266         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2267         if (msfs == NULL) {
2268                 GOTO(out, rc = -EPROTO);
2269         }
2270
2271         *aa->aa_oi->oi_osfs = *msfs;
2272 out:
2273         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2274         RETURN(rc);
2275 }
2276
2277 static int osc_statfs_async(struct obd_export *exp,
2278                             struct obd_info *oinfo, __u64 max_age,
2279                             struct ptlrpc_request_set *rqset)
2280 {
2281         struct obd_device     *obd = class_exp2obd(exp);
2282         struct ptlrpc_request *req;
2283         struct osc_async_args *aa;
2284         int                    rc;
2285         ENTRY;
2286
2287         /* We could possibly pass max_age in the request (as an absolute
2288          * timestamp or a "seconds.usec ago") so the target can avoid doing
2289          * extra calls into the filesystem if that isn't necessary (e.g.
2290          * during mount that would help a bit).  Having relative timestamps
2291          * is not so great if request processing is slow, while absolute
2292          * timestamps are not ideal because they need time synchronization. */
2293         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2294         if (req == NULL)
2295                 RETURN(-ENOMEM);
2296
2297         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2298         if (rc) {
2299                 ptlrpc_request_free(req);
2300                 RETURN(rc);
2301         }
2302         ptlrpc_request_set_replen(req);
2303         req->rq_request_portal = OST_CREATE_PORTAL;
2304         ptlrpc_at_set_req_timeout(req);
2305
2306         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2307                 /* procfs requests not want stat in wait for avoid deadlock */
2308                 req->rq_no_resend = 1;
2309                 req->rq_no_delay = 1;
2310         }
2311
2312         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2313         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2314         aa = ptlrpc_req_async_args(req);
2315         aa->aa_oi = oinfo;
2316
2317         ptlrpc_set_add_req(rqset, req);
2318         RETURN(0);
2319 }
2320
2321 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2322                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2323 {
2324         struct obd_device     *obd = class_exp2obd(exp);
2325         struct obd_statfs     *msfs;
2326         struct ptlrpc_request *req;
2327         struct obd_import     *imp = NULL;
2328         int rc;
2329         ENTRY;
2330
2331         /*Since the request might also come from lprocfs, so we need
2332          *sync this with client_disconnect_export Bug15684*/
2333         down_read(&obd->u.cli.cl_sem);
2334         if (obd->u.cli.cl_import)
2335                 imp = class_import_get(obd->u.cli.cl_import);
2336         up_read(&obd->u.cli.cl_sem);
2337         if (!imp)
2338                 RETURN(-ENODEV);
2339
2340         /* We could possibly pass max_age in the request (as an absolute
2341          * timestamp or a "seconds.usec ago") so the target can avoid doing
2342          * extra calls into the filesystem if that isn't necessary (e.g.
2343          * during mount that would help a bit).  Having relative timestamps
2344          * is not so great if request processing is slow, while absolute
2345          * timestamps are not ideal because they need time synchronization. */
2346         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2347
2348         class_import_put(imp);
2349
2350         if (req == NULL)
2351                 RETURN(-ENOMEM);
2352
2353         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2354         if (rc) {
2355                 ptlrpc_request_free(req);
2356                 RETURN(rc);
2357         }
2358         ptlrpc_request_set_replen(req);
2359         req->rq_request_portal = OST_CREATE_PORTAL;
2360         ptlrpc_at_set_req_timeout(req);
2361
2362         if (flags & OBD_STATFS_NODELAY) {
2363                 /* procfs requests not want stat in wait for avoid deadlock */
2364                 req->rq_no_resend = 1;
2365                 req->rq_no_delay = 1;
2366         }
2367
2368         rc = ptlrpc_queue_wait(req);
2369         if (rc)
2370                 GOTO(out, rc);
2371
2372         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2373         if (msfs == NULL) {
2374                 GOTO(out, rc = -EPROTO);
2375         }
2376
2377         *osfs = *msfs;
2378
2379         EXIT;
2380  out:
2381         ptlrpc_req_finished(req);
2382         return rc;
2383 }
2384
2385 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2386                          void *karg, void __user *uarg)
2387 {
2388         struct obd_device *obd = exp->exp_obd;
2389         struct obd_ioctl_data *data = karg;
2390         int err = 0;
2391         ENTRY;
2392
2393         if (!try_module_get(THIS_MODULE)) {
2394                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2395                        module_name(THIS_MODULE));
2396                 return -EINVAL;
2397         }
2398         switch (cmd) {
2399         case OBD_IOC_CLIENT_RECOVER:
2400                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2401                                             data->ioc_inlbuf1, 0);
2402                 if (err > 0)
2403                         err = 0;
2404                 GOTO(out, err);
2405         case IOC_OSC_SET_ACTIVE:
2406                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2407                                                data->ioc_offset);
2408                 GOTO(out, err);
2409         case OBD_IOC_PING_TARGET:
2410                 err = ptlrpc_obd_ping(obd);
2411                 GOTO(out, err);
2412         default:
2413                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2414                        cmd, current_comm());
2415                 GOTO(out, err = -ENOTTY);
2416         }
2417 out:
2418         module_put(THIS_MODULE);
2419         return err;
2420 }
2421
2422 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2423                               u32 keylen, void *key,
2424                               u32 vallen, void *val,
2425                               struct ptlrpc_request_set *set)
2426 {
2427         struct ptlrpc_request *req;
2428         struct obd_device     *obd = exp->exp_obd;
2429         struct obd_import     *imp = class_exp2cliimp(exp);
2430         char                  *tmp;
2431         int                    rc;
2432         ENTRY;
2433
2434         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2435
2436         if (KEY_IS(KEY_CHECKSUM)) {
2437                 if (vallen != sizeof(int))
2438                         RETURN(-EINVAL);
2439                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2440                 RETURN(0);
2441         }
2442
2443         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2444                 sptlrpc_conf_client_adapt(obd);
2445                 RETURN(0);
2446         }
2447
2448         if (KEY_IS(KEY_FLUSH_CTX)) {
2449                 sptlrpc_import_flush_my_ctx(imp);
2450                 RETURN(0);
2451         }
2452
2453         if (KEY_IS(KEY_CACHE_SET)) {
2454                 struct client_obd *cli = &obd->u.cli;
2455
2456                 LASSERT(cli->cl_cache == NULL); /* only once */
2457                 cli->cl_cache = (struct cl_client_cache *)val;
2458                 cl_cache_incref(cli->cl_cache);
2459                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2460
2461                 /* add this osc into entity list */
2462                 LASSERT(list_empty(&cli->cl_lru_osc));
2463                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2464                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2465                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2466
2467                 RETURN(0);
2468         }
2469
2470         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2471                 struct client_obd *cli = &obd->u.cli;
2472                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2473                 long target = *(long *)val;
2474
2475                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2476                 *(long *)val -= nr;
2477                 RETURN(0);
2478         }
2479
2480         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2481                 RETURN(-EINVAL);
2482
2483         /* We pass all other commands directly to OST. Since nobody calls osc
2484            methods directly and everybody is supposed to go through LOV, we
2485            assume lov checked invalid values for us.
2486            The only recognised values so far are evict_by_nid and mds_conn.
2487            Even if something bad goes through, we'd get a -EINVAL from OST
2488            anyway. */
2489
2490         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2491                                                 &RQF_OST_SET_GRANT_INFO :
2492                                                 &RQF_OBD_SET_INFO);
2493         if (req == NULL)
2494                 RETURN(-ENOMEM);
2495
2496         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2497                              RCL_CLIENT, keylen);
2498         if (!KEY_IS(KEY_GRANT_SHRINK))
2499                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2500                                      RCL_CLIENT, vallen);
2501         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2502         if (rc) {
2503                 ptlrpc_request_free(req);
2504                 RETURN(rc);
2505         }
2506
2507         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2508         memcpy(tmp, key, keylen);
2509         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2510                                                         &RMF_OST_BODY :
2511                                                         &RMF_SETINFO_VAL);
2512         memcpy(tmp, val, vallen);
2513
2514         if (KEY_IS(KEY_GRANT_SHRINK)) {
2515                 struct osc_grant_args *aa;
2516                 struct obdo *oa;
2517
2518                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2519                 aa = ptlrpc_req_async_args(req);
2520                 OBDO_ALLOC(oa);
2521                 if (!oa) {
2522                         ptlrpc_req_finished(req);
2523                         RETURN(-ENOMEM);
2524                 }
2525                 *oa = ((struct ost_body *)val)->oa;
2526                 aa->aa_oa = oa;
2527                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2528         }
2529
2530         ptlrpc_request_set_replen(req);
2531         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2532                 LASSERT(set != NULL);
2533                 ptlrpc_set_add_req(set, req);
2534                 ptlrpc_check_set(NULL, set);
2535         } else {
2536                 ptlrpcd_add_req(req);
2537         }
2538
2539         RETURN(0);
2540 }
2541
2542 static int osc_reconnect(const struct lu_env *env,
2543                          struct obd_export *exp, struct obd_device *obd,
2544                          struct obd_uuid *cluuid,
2545                          struct obd_connect_data *data,
2546                          void *localdata)
2547 {
2548         struct client_obd *cli = &obd->u.cli;
2549
2550         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2551                 long lost_grant;
2552                 long grant;
2553
2554                 spin_lock(&cli->cl_loi_list_lock);
2555                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2556                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2557                         grant += cli->cl_dirty_grant;
2558                 else
2559                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2560                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2561                 lost_grant = cli->cl_lost_grant;
2562                 cli->cl_lost_grant = 0;
2563                 spin_unlock(&cli->cl_loi_list_lock);
2564
2565                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2566                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2567                        data->ocd_version, data->ocd_grant, lost_grant);
2568         }
2569
2570         RETURN(0);
2571 }
2572
2573 static int osc_disconnect(struct obd_export *exp)
2574 {
2575         struct obd_device *obd = class_exp2obd(exp);
2576         int rc;
2577
2578         rc = client_disconnect_export(exp);
2579         /**
2580          * Initially we put del_shrink_grant before disconnect_export, but it
2581          * causes the following problem if setup (connect) and cleanup
2582          * (disconnect) are tangled together.
2583          *      connect p1                     disconnect p2
2584          *   ptlrpc_connect_import
2585          *     ...............               class_manual_cleanup
2586          *                                     osc_disconnect
2587          *                                     del_shrink_grant
2588          *   ptlrpc_connect_interrupt
2589          *     init_grant_shrink
2590          *   add this client to shrink list
2591          *                                      cleanup_osc
2592          * Bang! pinger trigger the shrink.
2593          * So the osc should be disconnected from the shrink list, after we
2594          * are sure the import has been destroyed. BUG18662
2595          */
2596         if (obd->u.cli.cl_import == NULL)
2597                 osc_del_shrink_grant(&obd->u.cli);
2598         return rc;
2599 }
2600
2601 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2602         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2603 {
2604         struct lu_env *env = arg;
2605         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2606         struct ldlm_lock *lock;
2607         struct osc_object *osc = NULL;
2608         ENTRY;
2609
2610         lock_res(res);
2611         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2612                 if (lock->l_ast_data != NULL && osc == NULL) {
2613                         osc = lock->l_ast_data;
2614                         cl_object_get(osc2cl(osc));
2615                 }
2616
2617                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2618                  * by the 2nd round of ldlm_namespace_clean() call in
2619                  * osc_import_event(). */
2620                 ldlm_clear_cleaned(lock);
2621         }
2622         unlock_res(res);
2623
2624         if (osc != NULL) {
2625                 osc_object_invalidate(env, osc);
2626                 cl_object_put(env, osc2cl(osc));
2627         }
2628
2629         RETURN(0);
2630 }
2631
2632 static int osc_import_event(struct obd_device *obd,
2633                             struct obd_import *imp,
2634                             enum obd_import_event event)
2635 {
2636         struct client_obd *cli;
2637         int rc = 0;
2638
2639         ENTRY;
2640         LASSERT(imp->imp_obd == obd);
2641
2642         switch (event) {
2643         case IMP_EVENT_DISCON: {
2644                 cli = &obd->u.cli;
2645                 spin_lock(&cli->cl_loi_list_lock);
2646                 cli->cl_avail_grant = 0;
2647                 cli->cl_lost_grant = 0;
2648                 spin_unlock(&cli->cl_loi_list_lock);
2649                 break;
2650         }
2651         case IMP_EVENT_INACTIVE: {
2652                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2653                 break;
2654         }
2655         case IMP_EVENT_INVALIDATE: {
2656                 struct ldlm_namespace *ns = obd->obd_namespace;
2657                 struct lu_env         *env;
2658                 __u16                  refcheck;
2659
2660                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2661
2662                 env = cl_env_get(&refcheck);
2663                 if (!IS_ERR(env)) {
2664                         osc_io_unplug(env, &obd->u.cli, NULL);
2665
2666                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2667                                                  osc_ldlm_resource_invalidate,
2668                                                  env, 0);
2669                         cl_env_put(env, &refcheck);
2670
2671                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2672                 } else
2673                         rc = PTR_ERR(env);
2674                 break;
2675         }
2676         case IMP_EVENT_ACTIVE: {
2677                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2678                 break;
2679         }
2680         case IMP_EVENT_OCD: {
2681                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2682
2683                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2684                         osc_init_grant(&obd->u.cli, ocd);
2685
2686                 /* See bug 7198 */
2687                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2688                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2689
2690                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2691                 break;
2692         }
2693         case IMP_EVENT_DEACTIVATE: {
2694                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2695                 break;
2696         }
2697         case IMP_EVENT_ACTIVATE: {
2698                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2699                 break;
2700         }
2701         default:
2702                 CERROR("Unknown import event %d\n", event);
2703                 LBUG();
2704         }
2705         RETURN(rc);
2706 }
2707
2708 /**
2709  * Determine whether the lock can be canceled before replaying the lock
2710  * during recovery, see bug16774 for detailed information.
2711  *
2712  * \retval zero the lock can't be canceled
2713  * \retval other ok to cancel
2714  */
2715 static int osc_cancel_weight(struct ldlm_lock *lock)
2716 {
2717         /*
2718          * Cancel all unused and granted extent lock.
2719          */
2720         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2721             lock->l_granted_mode == lock->l_req_mode &&
2722             osc_ldlm_weigh_ast(lock) == 0)
2723                 RETURN(1);
2724
2725         RETURN(0);
2726 }
2727
2728 static int brw_queue_work(const struct lu_env *env, void *data)
2729 {
2730         struct client_obd *cli = data;
2731
2732         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2733
2734         osc_io_unplug(env, cli, NULL);
2735         RETURN(0);
2736 }
2737
2738 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2739 {
2740         struct client_obd *cli = &obd->u.cli;
2741         struct obd_type   *type;
2742         void              *handler;
2743         int                rc;
2744         int                adding;
2745         int                added;
2746         int                req_count;
2747         ENTRY;
2748
2749         rc = ptlrpcd_addref();
2750         if (rc)
2751                 RETURN(rc);
2752
2753         rc = client_obd_setup(obd, lcfg);
2754         if (rc)
2755                 GOTO(out_ptlrpcd, rc);
2756
2757         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2758         if (IS_ERR(handler))
2759                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2760         cli->cl_writeback_work = handler;
2761
2762         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2763         if (IS_ERR(handler))
2764                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2765         cli->cl_lru_work = handler;
2766
2767         rc = osc_quota_setup(obd);
2768         if (rc)
2769                 GOTO(out_ptlrpcd_work, rc);
2770
2771         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2772
2773 #ifdef CONFIG_PROC_FS
2774         obd->obd_vars = lprocfs_osc_obd_vars;
2775 #endif
2776         /* If this is true then both client (osc) and server (osp) are on the
2777          * same node. The osp layer if loaded first will register the osc proc
2778          * directory. In that case this obd_device will be attached its proc
2779          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2780         type = class_search_type(LUSTRE_OSP_NAME);
2781         if (type && type->typ_procsym) {
2782                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2783                                                        type->typ_procsym,
2784                                                        obd->obd_vars, obd);
2785                 if (IS_ERR(obd->obd_proc_entry)) {
2786                         rc = PTR_ERR(obd->obd_proc_entry);
2787                         CERROR("error %d setting up lprocfs for %s\n", rc,
2788                                obd->obd_name);
2789                         obd->obd_proc_entry = NULL;
2790                 }
2791         } else {
2792                 rc = lprocfs_obd_setup(obd);
2793         }
2794
2795         /* If the basic OSC proc tree construction succeeded then
2796          * lets do the rest. */
2797         if (rc == 0) {
2798                 lproc_osc_attach_seqstat(obd);
2799                 sptlrpc_lprocfs_cliobd_attach(obd);
2800                 ptlrpc_lprocfs_register_obd(obd);
2801         }
2802
2803         /*
2804          * We try to control the total number of requests with a upper limit
2805          * osc_reqpool_maxreqcount. There might be some race which will cause
2806          * over-limit allocation, but it is fine.
2807          */
2808         req_count = atomic_read(&osc_pool_req_count);
2809         if (req_count < osc_reqpool_maxreqcount) {
2810                 adding = cli->cl_max_rpcs_in_flight + 2;
2811                 if (req_count + adding > osc_reqpool_maxreqcount)
2812                         adding = osc_reqpool_maxreqcount - req_count;
2813
2814                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2815                 atomic_add(added, &osc_pool_req_count);
2816         }
2817
2818         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2819         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2820
2821         spin_lock(&osc_shrink_lock);
2822         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2823         spin_unlock(&osc_shrink_lock);
2824
2825         RETURN(0);
2826
2827 out_ptlrpcd_work:
2828         if (cli->cl_writeback_work != NULL) {
2829                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2830                 cli->cl_writeback_work = NULL;
2831         }
2832         if (cli->cl_lru_work != NULL) {
2833                 ptlrpcd_destroy_work(cli->cl_lru_work);
2834                 cli->cl_lru_work = NULL;
2835         }
2836 out_client_setup:
2837         client_obd_cleanup(obd);
2838 out_ptlrpcd:
2839         ptlrpcd_decref();
2840         RETURN(rc);
2841 }
2842
2843 static int osc_precleanup(struct obd_device *obd)
2844 {
2845         struct client_obd *cli = &obd->u.cli;
2846         ENTRY;
2847
2848         /* LU-464
2849          * for echo client, export may be on zombie list, wait for
2850          * zombie thread to cull it, because cli.cl_import will be
2851          * cleared in client_disconnect_export():
2852          *   class_export_destroy() -> obd_cleanup() ->
2853          *   echo_device_free() -> echo_client_cleanup() ->
2854          *   obd_disconnect() -> osc_disconnect() ->
2855          *   client_disconnect_export()
2856          */
2857         obd_zombie_barrier();
2858         if (cli->cl_writeback_work) {
2859                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2860                 cli->cl_writeback_work = NULL;
2861         }
2862
2863         if (cli->cl_lru_work) {
2864                 ptlrpcd_destroy_work(cli->cl_lru_work);
2865                 cli->cl_lru_work = NULL;
2866         }
2867
2868         obd_cleanup_client_import(obd);
2869         ptlrpc_lprocfs_unregister_obd(obd);
2870         lprocfs_obd_cleanup(obd);
2871         RETURN(0);
2872 }
2873
2874 int osc_cleanup(struct obd_device *obd)
2875 {
2876         struct client_obd *cli = &obd->u.cli;
2877         int rc;
2878
2879         ENTRY;
2880
2881         spin_lock(&osc_shrink_lock);
2882         list_del(&cli->cl_shrink_list);
2883         spin_unlock(&osc_shrink_lock);
2884
2885         /* lru cleanup */
2886         if (cli->cl_cache != NULL) {
2887                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2888                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2889                 list_del_init(&cli->cl_lru_osc);
2890                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2891                 cli->cl_lru_left = NULL;
2892                 cl_cache_decref(cli->cl_cache);
2893                 cli->cl_cache = NULL;
2894         }
2895
2896         /* free memory of osc quota cache */
2897         osc_quota_cleanup(obd);
2898
2899         rc = client_obd_cleanup(obd);
2900
2901         ptlrpcd_decref();
2902         RETURN(rc);
2903 }
2904
2905 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2906 {
2907         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2908         return rc > 0 ? 0: rc;
2909 }
2910
2911 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2912 {
2913         return osc_process_config_base(obd, buf);
2914 }
2915
2916 static struct obd_ops osc_obd_ops = {
2917         .o_owner                = THIS_MODULE,
2918         .o_setup                = osc_setup,
2919         .o_precleanup           = osc_precleanup,
2920         .o_cleanup              = osc_cleanup,
2921         .o_add_conn             = client_import_add_conn,
2922         .o_del_conn             = client_import_del_conn,
2923         .o_connect              = client_connect_import,
2924         .o_reconnect            = osc_reconnect,
2925         .o_disconnect           = osc_disconnect,
2926         .o_statfs               = osc_statfs,
2927         .o_statfs_async         = osc_statfs_async,
2928         .o_create               = osc_create,
2929         .o_destroy              = osc_destroy,
2930         .o_getattr              = osc_getattr,
2931         .o_setattr              = osc_setattr,
2932         .o_iocontrol            = osc_iocontrol,
2933         .o_set_info_async       = osc_set_info_async,
2934         .o_import_event         = osc_import_event,
2935         .o_process_config       = osc_process_config,
2936         .o_quotactl             = osc_quotactl,
2937 };
2938
2939 static struct shrinker *osc_cache_shrinker;
2940 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2941 DEFINE_SPINLOCK(osc_shrink_lock);
2942
2943 #ifndef HAVE_SHRINKER_COUNT
2944 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2945 {
2946         struct shrink_control scv = {
2947                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2948                 .gfp_mask   = shrink_param(sc, gfp_mask)
2949         };
2950 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2951         struct shrinker *shrinker = NULL;
2952 #endif
2953
2954         (void)osc_cache_shrink_scan(shrinker, &scv);
2955
2956         return osc_cache_shrink_count(shrinker, &scv);
2957 }
2958 #endif
2959
2960 static int __init osc_init(void)
2961 {
2962         bool enable_proc = true;
2963         struct obd_type *type;
2964         unsigned int reqpool_size;
2965         unsigned int reqsize;
2966         int rc;
2967         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2968                          osc_cache_shrink_count, osc_cache_shrink_scan);
2969         ENTRY;
2970
2971         /* print an address of _any_ initialized kernel symbol from this
2972          * module, to allow debugging with gdb that doesn't support data
2973          * symbols from modules.*/
2974         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2975
2976         rc = lu_kmem_init(osc_caches);
2977         if (rc)
2978                 RETURN(rc);
2979
2980         type = class_search_type(LUSTRE_OSP_NAME);
2981         if (type != NULL && type->typ_procsym != NULL)
2982                 enable_proc = false;
2983
2984         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2985                                  LUSTRE_OSC_NAME, &osc_device_type);
2986         if (rc)
2987                 GOTO(out_kmem, rc);
2988
2989         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2990
2991         /* This is obviously too much memory, only prevent overflow here */
2992         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2993                 GOTO(out_type, rc = -EINVAL);
2994
2995         reqpool_size = osc_reqpool_mem_max << 20;
2996
2997         reqsize = 1;
2998         while (reqsize < OST_IO_MAXREQSIZE)
2999                 reqsize = reqsize << 1;
3000
3001         /*
3002          * We don't enlarge the request count in OSC pool according to
3003          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3004          * tried after normal allocation failed. So a small OSC pool won't
3005          * cause much performance degression in most of cases.
3006          */
3007         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3008
3009         atomic_set(&osc_pool_req_count, 0);
3010         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3011                                           ptlrpc_add_rqs_to_pool);
3012
3013         if (osc_rq_pool != NULL)
3014                 GOTO(out, rc);
3015         rc = -ENOMEM;
3016 out_type:
3017         class_unregister_type(LUSTRE_OSC_NAME);
3018 out_kmem:
3019         lu_kmem_fini(osc_caches);
3020 out:
3021         RETURN(rc);
3022 }
3023
3024 static void __exit osc_exit(void)
3025 {
3026         remove_shrinker(osc_cache_shrinker);
3027         class_unregister_type(LUSTRE_OSC_NAME);
3028         lu_kmem_fini(osc_caches);
3029         ptlrpc_free_rq_pool(osc_rq_pool);
3030 }
3031
3032 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3033 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3034 MODULE_VERSION(LUSTRE_VERSION_STRING);
3035 MODULE_LICENSE("GPL");
3036
3037 module_init(osc_init);
3038 module_exit(osc_exit);