Whamcloud - gitweb
4e9cf07216999a060ec8f94144d1420537048059
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_ladvise_args {
95         struct obdo             *la_oa;
96         obd_enqueue_update_f     la_upcall;
97         void                    *la_cookie;
98 };
99
100 struct osc_enqueue_args {
101         struct obd_export       *oa_exp;
102         enum ldlm_type          oa_type;
103         enum ldlm_mode          oa_mode;
104         __u64                   *oa_flags;
105         osc_enqueue_upcall_f    oa_upcall;
106         void                    *oa_cookie;
107         struct ost_lvb          *oa_lvb;
108         struct lustre_handle    oa_lockh;
109         unsigned int            oa_agl:1;
110 };
111
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
114                          void *data, int rc);
115
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
117 {
118         struct ost_body *body;
119
120         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
121         LASSERT(body);
122
123         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
124 }
125
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
127                        struct obdo *oa)
128 {
129         struct ptlrpc_request   *req;
130         struct ost_body         *body;
131         int                      rc;
132
133         ENTRY;
134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
135         if (req == NULL)
136                 RETURN(-ENOMEM);
137
138         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
139         if (rc) {
140                 ptlrpc_request_free(req);
141                 RETURN(rc);
142         }
143
144         osc_pack_req_body(req, oa);
145
146         ptlrpc_request_set_replen(req);
147
148         rc = ptlrpc_queue_wait(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
153         if (body == NULL)
154                 GOTO(out, rc = -EPROTO);
155
156         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
158
159         oa->o_blksize = cli_brw_size(exp->exp_obd);
160         oa->o_valid |= OBD_MD_FLBLKSZ;
161
162         EXIT;
163 out:
164         ptlrpc_req_finished(req);
165
166         return rc;
167 }
168
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
170                        struct obdo *oa)
171 {
172         struct ptlrpc_request   *req;
173         struct ost_body         *body;
174         int                      rc;
175
176         ENTRY;
177         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
178
179         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
180         if (req == NULL)
181                 RETURN(-ENOMEM);
182
183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
184         if (rc) {
185                 ptlrpc_request_free(req);
186                 RETURN(rc);
187         }
188
189         osc_pack_req_body(req, oa);
190
191         ptlrpc_request_set_replen(req);
192
193         rc = ptlrpc_queue_wait(req);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
198         if (body == NULL)
199                 GOTO(out, rc = -EPROTO);
200
201         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
202
203         EXIT;
204 out:
205         ptlrpc_req_finished(req);
206
207         RETURN(rc);
208 }
209
210 static int osc_setattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_setattr_args *sa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 GOTO(out, rc = -EPROTO);
223
224         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
225                              &body->oa);
226 out:
227         rc = sa->sa_upcall(sa->sa_cookie, rc);
228         RETURN(rc);
229 }
230
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232                       obd_enqueue_update_f upcall, void *cookie,
233                       struct ptlrpc_request_set *rqset)
234 {
235         struct ptlrpc_request   *req;
236         struct osc_setattr_args *sa;
237         int                      rc;
238
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oa);
252
253         ptlrpc_request_set_replen(req);
254
255         /* do mds to ost setattr asynchronously */
256         if (!rqset) {
257                 /* Do not wait for response. */
258                 ptlrpcd_add_req(req);
259         } else {
260                 req->rq_interpret_reply =
261                         (ptlrpc_interpterer_t)osc_setattr_interpret;
262
263                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264                 sa = ptlrpc_req_async_args(req);
265                 sa->sa_oa = oa;
266                 sa->sa_upcall = upcall;
267                 sa->sa_cookie = cookie;
268
269                 if (rqset == PTLRPCD_SET)
270                         ptlrpcd_add_req(req);
271                 else
272                         ptlrpc_set_add_req(rqset, req);
273         }
274
275         RETURN(0);
276 }
277
278 static int osc_ladvise_interpret(const struct lu_env *env,
279                                  struct ptlrpc_request *req,
280                                  void *arg, int rc)
281 {
282         struct osc_ladvise_args *la = arg;
283         struct ost_body *body;
284         ENTRY;
285
286         if (rc != 0)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         *la->la_oa = body->oa;
294 out:
295         rc = la->la_upcall(la->la_cookie, rc);
296         RETURN(rc);
297 }
298
299 /**
300  * If rqset is NULL, do not wait for response. Upcall and cookie could also
301  * be NULL in this case
302  */
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304                      struct ladvise_hdr *ladvise_hdr,
305                      obd_enqueue_update_f upcall, void *cookie,
306                      struct ptlrpc_request_set *rqset)
307 {
308         struct ptlrpc_request   *req;
309         struct ost_body         *body;
310         struct osc_ladvise_args *la;
311         int                      rc;
312         struct lu_ladvise       *req_ladvise;
313         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
314         int                      num_advise = ladvise_hdr->lah_count;
315         struct ladvise_hdr      *req_ladvise_hdr;
316         ENTRY;
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323                              num_advise * sizeof(*ladvise));
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
325         if (rc != 0) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329         req->rq_request_portal = OST_IO_PORTAL;
330         ptlrpc_at_set_req_timeout(req);
331
332         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
333         LASSERT(body);
334         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
335                              oa);
336
337         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338                                                  &RMF_OST_LADVISE_HDR);
339         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
340
341         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343         ptlrpc_request_set_replen(req);
344
345         if (rqset == NULL) {
346                 /* Do not wait for response. */
347                 ptlrpcd_add_req(req);
348                 RETURN(0);
349         }
350
351         req->rq_interpret_reply = osc_ladvise_interpret;
352         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353         la = ptlrpc_req_async_args(req);
354         la->la_oa = oa;
355         la->la_upcall = upcall;
356         la->la_cookie = cookie;
357
358         if (rqset == PTLRPCD_SET)
359                 ptlrpcd_add_req(req);
360         else
361                 ptlrpc_set_add_req(rqset, req);
362
363         RETURN(0);
364 }
365
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
367                       struct obdo *oa)
368 {
369         struct ptlrpc_request *req;
370         struct ost_body       *body;
371         int                    rc;
372         ENTRY;
373
374         LASSERT(oa != NULL);
375         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
379         if (req == NULL)
380                 GOTO(out, rc = -ENOMEM);
381
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 GOTO(out, rc);
386         }
387
388         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
389         LASSERT(body);
390
391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
392
393         ptlrpc_request_set_replen(req);
394
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL)
401                 GOTO(out_req, rc = -EPROTO);
402
403         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
405
406         oa->o_blksize = cli_brw_size(exp->exp_obd);
407         oa->o_valid |= OBD_MD_FLBLKSZ;
408
409         CDEBUG(D_HA, "transno: "LPD64"\n",
410                lustre_msg_get_transno(req->rq_repmsg));
411 out_req:
412         ptlrpc_req_finished(req);
413 out:
414         RETURN(rc);
415 }
416
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418                    obd_enqueue_update_f upcall, void *cookie,
419                    struct ptlrpc_request_set *rqset)
420 {
421         struct ptlrpc_request   *req;
422         struct osc_setattr_args *sa;
423         struct ost_body         *body;
424         int                      rc;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
428         if (req == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(rc);
435         }
436         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437         ptlrpc_at_set_req_timeout(req);
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
442
443         ptlrpc_request_set_replen(req);
444
445         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447         sa = ptlrpc_req_async_args(req);
448         sa->sa_oa = oa;
449         sa->sa_upcall = upcall;
450         sa->sa_cookie = cookie;
451         if (rqset == PTLRPCD_SET)
452                 ptlrpcd_add_req(req);
453         else
454                 ptlrpc_set_add_req(rqset, req);
455
456         RETURN(0);
457 }
458
459 static int osc_sync_interpret(const struct lu_env *env,
460                               struct ptlrpc_request *req,
461                               void *arg, int rc)
462 {
463         struct osc_fsync_args   *fa = arg;
464         struct ost_body         *body;
465         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
466         unsigned long           valid = 0;
467         struct cl_object        *obj;
468         ENTRY;
469
470         if (rc != 0)
471                 GOTO(out, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL) {
475                 CERROR("can't unpack ost_body\n");
476                 GOTO(out, rc = -EPROTO);
477         }
478
479         *fa->fa_oa = body->oa;
480         obj = osc2cl(fa->fa_obj);
481
482         /* Update osc object's blocks attribute */
483         cl_object_attr_lock(obj);
484         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485                 attr->cat_blocks = body->oa.o_blocks;
486                 valid |= CAT_BLOCKS;
487         }
488
489         if (valid != 0)
490                 cl_object_attr_update(env, obj, attr, valid);
491         cl_object_attr_unlock(obj);
492
493 out:
494         rc = fa->fa_upcall(fa->fa_cookie, rc);
495         RETURN(rc);
496 }
497
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499                   obd_enqueue_update_f upcall, void *cookie,
500                   struct ptlrpc_request_set *rqset)
501 {
502         struct obd_export     *exp = osc_export(obj);
503         struct ptlrpc_request *req;
504         struct ost_body       *body;
505         struct osc_fsync_args *fa;
506         int                    rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
510         if (req == NULL)
511                 RETURN(-ENOMEM);
512
513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(rc);
517         }
518
519         /* overload the size and blocks fields in the oa with start/end */
520         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
521         LASSERT(body);
522         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
523
524         ptlrpc_request_set_replen(req);
525         req->rq_interpret_reply = osc_sync_interpret;
526
527         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528         fa = ptlrpc_req_async_args(req);
529         fa->fa_obj = obj;
530         fa->fa_oa = oa;
531         fa->fa_upcall = upcall;
532         fa->fa_cookie = cookie;
533
534         if (rqset == PTLRPCD_SET)
535                 ptlrpcd_add_req(req);
536         else
537                 ptlrpc_set_add_req(rqset, req);
538
539         RETURN (0);
540 }
541
542 /* Find and cancel locally locks matched by @mode in the resource found by
543  * @objid. Found locks are added into @cancel list. Returns the amount of
544  * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546                                    struct list_head *cancels,
547                                    enum ldlm_mode mode, __u64 lock_flags)
548 {
549         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550         struct ldlm_res_id res_id;
551         struct ldlm_resource *res;
552         int count;
553         ENTRY;
554
555         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556          * export) but disabled through procfs (flag in NS).
557          *
558          * This distinguishes from a case when ELC is not supported originally,
559          * when we still want to cancel locks in advance and just cancel them
560          * locally, without sending any RPC. */
561         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
562                 RETURN(0);
563
564         ostid_build_res_name(&oa->o_oi, &res_id);
565         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
566         if (IS_ERR(res))
567                 RETURN(0);
568
569         LDLM_RESOURCE_ADDREF(res);
570         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571                                            lock_flags, 0, NULL);
572         LDLM_RESOURCE_DELREF(res);
573         ldlm_resource_putref(res);
574         RETURN(count);
575 }
576
577 static int osc_destroy_interpret(const struct lu_env *env,
578                                  struct ptlrpc_request *req, void *data,
579                                  int rc)
580 {
581         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
582
583         atomic_dec(&cli->cl_destroy_in_flight);
584         wake_up(&cli->cl_destroy_waitq);
585         return 0;
586 }
587
588 static int osc_can_send_destroy(struct client_obd *cli)
589 {
590         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591             cli->cl_max_rpcs_in_flight) {
592                 /* The destroy request can be sent */
593                 return 1;
594         }
595         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596             cli->cl_max_rpcs_in_flight) {
597                 /*
598                  * The counter has been modified between the two atomic
599                  * operations.
600                  */
601                 wake_up(&cli->cl_destroy_waitq);
602         }
603         return 0;
604 }
605
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
607                        struct obdo *oa)
608 {
609         struct client_obd     *cli = &exp->exp_obd->u.cli;
610         struct ptlrpc_request *req;
611         struct ost_body       *body;
612         struct list_head       cancels = LIST_HEAD_INIT(cancels);
613         int rc, count;
614         ENTRY;
615
616         if (!oa) {
617                 CDEBUG(D_INFO, "oa NULL\n");
618                 RETURN(-EINVAL);
619         }
620
621         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622                                         LDLM_FL_DISCARD_DATA);
623
624         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
625         if (req == NULL) {
626                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
627                 RETURN(-ENOMEM);
628         }
629
630         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
631                                0, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(rc);
635         }
636
637         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638         ptlrpc_at_set_req_timeout(req);
639
640         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
641         LASSERT(body);
642         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
643
644         ptlrpc_request_set_replen(req);
645
646         req->rq_interpret_reply = osc_destroy_interpret;
647         if (!osc_can_send_destroy(cli)) {
648                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
649
650                 /*
651                  * Wait until the number of on-going destroy RPCs drops
652                  * under max_rpc_in_flight
653                  */
654                 l_wait_event_exclusive(cli->cl_destroy_waitq,
655                                        osc_can_send_destroy(cli), &lwi);
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
927         }
928
929         if (cli->cl_avail_grant < 0) {
930                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931                       cli_name(cli), cli->cl_avail_grant,
932                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933                 /* workaround for servers which do not have the patch from
934                  * LU-2679 */
935                 cli->cl_avail_grant = ocd->ocd_grant;
936         }
937
938         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
939                 u64 size;
940
941                 /* overhead for each extent insertion */
942                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943                 /* determine the appropriate chunk size used by osc_extent. */
944                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945                                           ocd->ocd_grant_blkbits);
946                 /* determine maximum extent size, in #pages */
947                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949                 if (cli->cl_max_extent_pages == 0)
950                         cli->cl_max_extent_pages = 1;
951         } else {
952                 cli->cl_grant_extent_tax = 0;
953                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
955         }
956         spin_unlock(&cli->cl_loi_list_lock);
957
958         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959                 "chunk bits: %d cl_max_extent_pages: %d\n",
960                 cli_name(cli),
961                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962                 cli->cl_max_extent_pages);
963
964         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965             list_empty(&cli->cl_grant_shrink_list))
966                 osc_add_shrink_grant(cli);
967 }
968
969 /* We assume that the reason this OSC got a short read is because it read
970  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971  * via the LOV, and it _knows_ it's reading inside the file, it's just that
972  * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974                               struct brw_page **pga)
975 {
976         char *ptr;
977         int i = 0;
978
979         /* skip bytes read OK */
980         while (nob_read > 0) {
981                 LASSERT (page_count > 0);
982
983                 if (pga[i]->count > nob_read) {
984                         /* EOF inside this page */
985                         ptr = kmap(pga[i]->pg) +
986                                 (pga[i]->off & ~PAGE_MASK);
987                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988                         kunmap(pga[i]->pg);
989                         page_count--;
990                         i++;
991                         break;
992                 }
993
994                 nob_read -= pga[i]->count;
995                 page_count--;
996                 i++;
997         }
998
999         /* zero remaining pages */
1000         while (page_count-- > 0) {
1001                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002                 memset(ptr, 0, pga[i]->count);
1003                 kunmap(pga[i]->pg);
1004                 i++;
1005         }
1006 }
1007
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009                            int requested_nob, int niocount,
1010                            size_t page_count, struct brw_page **pga)
1011 {
1012         int     i;
1013         __u32   *remote_rcs;
1014
1015         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016                                                   sizeof(*remote_rcs) *
1017                                                   niocount);
1018         if (remote_rcs == NULL) {
1019                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1020                 return(-EPROTO);
1021         }
1022
1023         /* return error if any niobuf was in error */
1024         for (i = 0; i < niocount; i++) {
1025                 if ((int)remote_rcs[i] < 0)
1026                         return(remote_rcs[i]);
1027
1028                 if (remote_rcs[i] != 0) {
1029                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030                                 i, remote_rcs[i], req);
1031                         return(-EPROTO);
1032                 }
1033         }
1034
1035         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037                        req->rq_bulk->bd_nob_transferred, requested_nob);
1038                 return(-EPROTO);
1039         }
1040
1041         return (0);
1042 }
1043
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1045 {
1046         if (p1->flag != p2->flag) {
1047                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1049                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1050
1051                 /* warn if we try to combine flags that we don't know to be
1052                  * safe to combine */
1053                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055                               "report this at https://jira.hpdd.intel.com/\n",
1056                               p1->flag, p2->flag);
1057                 }
1058                 return 0;
1059         }
1060
1061         return (p1->off + p1->count == p2->off);
1062 }
1063
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065                              struct brw_page **pga, int opc,
1066                              cksum_type_t cksum_type)
1067 {
1068         u32                             cksum;
1069         int                             i = 0;
1070         struct cfs_crypto_hash_desc     *hdesc;
1071         unsigned int                    bufsize;
1072         int                             err;
1073         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1074
1075         LASSERT(pg_count > 0);
1076
1077         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078         if (IS_ERR(hdesc)) {
1079                 CERROR("Unable to initialize checksum hash %s\n",
1080                        cfs_crypto_hash_name(cfs_alg));
1081                 return PTR_ERR(hdesc);
1082         }
1083
1084         while (nob > 0 && pg_count > 0) {
1085                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1086
1087                 /* corrupt the data before we compute the checksum, to
1088                  * simulate an OST->client data error */
1089                 if (i == 0 && opc == OST_READ &&
1090                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091                         unsigned char *ptr = kmap(pga[i]->pg);
1092                         int off = pga[i]->off & ~PAGE_MASK;
1093
1094                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1095                         kunmap(pga[i]->pg);
1096                 }
1097                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098                                             pga[i]->off & ~PAGE_MASK,
1099                                             count);
1100                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101                                (int)(pga[i]->off & ~PAGE_MASK));
1102
1103                 nob -= pga[i]->count;
1104                 pg_count--;
1105                 i++;
1106         }
1107
1108         bufsize = sizeof(cksum);
1109         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1110
1111         /* For sending we only compute the wrong checksum instead
1112          * of corrupting the data so it is still correct on a redo */
1113         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1114                 cksum++;
1115
1116         return cksum;
1117 }
1118
1119 static int
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121                      u32 page_count, struct brw_page **pga,
1122                      struct ptlrpc_request **reqp, int resend)
1123 {
1124         struct ptlrpc_request   *req;
1125         struct ptlrpc_bulk_desc *desc;
1126         struct ost_body         *body;
1127         struct obd_ioobj        *ioobj;
1128         struct niobuf_remote    *niobuf;
1129         int niocount, i, requested_nob, opc, rc;
1130         struct osc_brw_async_args *aa;
1131         struct req_capsule      *pill;
1132         struct brw_page *pg_prev;
1133
1134         ENTRY;
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136                 RETURN(-ENOMEM); /* Recoverable */
1137         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138                 RETURN(-EINVAL); /* Fatal */
1139
1140         if ((cmd & OBD_BRW_WRITE) != 0) {
1141                 opc = OST_WRITE;
1142                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1143                                                 osc_rq_pool,
1144                                                 &RQF_OST_BRW_WRITE);
1145         } else {
1146                 opc = OST_READ;
1147                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1148         }
1149         if (req == NULL)
1150                 RETURN(-ENOMEM);
1151
1152         for (niocount = i = 1; i < page_count; i++) {
1153                 if (!can_merge_pages(pga[i - 1], pga[i]))
1154                         niocount++;
1155         }
1156
1157         pill = &req->rq_pill;
1158         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1159                              sizeof(*ioobj));
1160         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161                              niocount * sizeof(*niobuf));
1162
1163         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1164         if (rc) {
1165                 ptlrpc_request_free(req);
1166                 RETURN(rc);
1167         }
1168         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169         ptlrpc_at_set_req_timeout(req);
1170         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1171          * retry logic */
1172         req->rq_no_retry_einprogress = 1;
1173
1174         desc = ptlrpc_prep_bulk_imp(req, page_count,
1175                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177                         PTLRPC_BULK_PUT_SINK) |
1178                         PTLRPC_BULK_BUF_KIOV,
1179                 OST_BULK_PORTAL,
1180                 &ptlrpc_bulk_kiov_pin_ops);
1181
1182         if (desc == NULL)
1183                 GOTO(out, rc = -ENOMEM);
1184         /* NB request now owns desc and will free it when it gets freed */
1185
1186         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1190
1191         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1192
1193         obdo_to_ioobj(oa, ioobj);
1194         ioobj->ioo_bufcnt = niocount;
1195         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196          * that might be send for this request.  The actual number is decided
1197          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198          * "max - 1" for old client compatibility sending "0", and also so the
1199          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201         LASSERT(page_count > 0);
1202         pg_prev = pga[0];
1203         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204                 struct brw_page *pg = pga[i];
1205                 int poff = pg->off & ~PAGE_MASK;
1206
1207                 LASSERT(pg->count > 0);
1208                 /* make sure there is no gap in the middle of page array */
1209                 LASSERTF(page_count == 1 ||
1210                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211                           ergo(i > 0 && i < page_count - 1,
1212                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1213                           ergo(i == page_count - 1, poff == 0)),
1214                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215                          i, page_count, pg, pg->off, pg->count);
1216                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1219                          i, page_count,
1220                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221                          pg_prev->pg, page_private(pg_prev->pg),
1222                          pg_prev->pg->index, pg_prev->off);
1223                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224                         (pg->flag & OBD_BRW_SRVLOCK));
1225
1226                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227                 requested_nob += pg->count;
1228
1229                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1230                         niobuf--;
1231                         niobuf->rnb_len += pg->count;
1232                 } else {
1233                         niobuf->rnb_offset = pg->off;
1234                         niobuf->rnb_len    = pg->count;
1235                         niobuf->rnb_flags  = pg->flag;
1236                 }
1237                 pg_prev = pg;
1238         }
1239
1240         LASSERTF((void *)(niobuf - niocount) ==
1241                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1244
1245         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1246         if (resend) {
1247                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1249                         body->oa.o_flags = 0;
1250                 }
1251                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1252         }
1253
1254         if (osc_should_shrink_grant(cli))
1255                 osc_shrink_grant_local(cli, &body->oa);
1256
1257         /* size[REQ_REC_OFF] still sizeof (*body) */
1258         if (opc == OST_WRITE) {
1259                 if (cli->cl_checksum &&
1260                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261                         /* store cl_cksum_type in a local variable since
1262                          * it can be changed via lprocfs */
1263                         cksum_type_t cksum_type = cli->cl_cksum_type;
1264
1265                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267                                 body->oa.o_flags = 0;
1268                         }
1269                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1270                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1272                                                              page_count, pga,
1273                                                              OST_WRITE,
1274                                                              cksum_type);
1275                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1276                                body->oa.o_cksum);
1277                         /* save this in 'oa', too, for later checking */
1278                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279                         oa->o_flags |= cksum_type_pack(cksum_type);
1280                 } else {
1281                         /* clear out the checksum flag, in case this is a
1282                          * resend but cl_checksum is no longer set. b=11238 */
1283                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1284                 }
1285                 oa->o_cksum = body->oa.o_cksum;
1286                 /* 1 RC per niobuf */
1287                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288                                      sizeof(__u32) * niocount);
1289         } else {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293                                 body->oa.o_flags = 0;
1294                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                 }
1297         }
1298         ptlrpc_request_set_replen(req);
1299
1300         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301         aa = ptlrpc_req_async_args(req);
1302         aa->aa_oa = oa;
1303         aa->aa_requested_nob = requested_nob;
1304         aa->aa_nio_count = niocount;
1305         aa->aa_page_count = page_count;
1306         aa->aa_resends = 0;
1307         aa->aa_ppga = pga;
1308         aa->aa_cli = cli;
1309         INIT_LIST_HEAD(&aa->aa_oaps);
1310
1311         *reqp = req;
1312         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316         RETURN(0);
1317
1318  out:
1319         ptlrpc_req_finished(req);
1320         RETURN(rc);
1321 }
1322
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324                                 __u32 client_cksum, __u32 server_cksum, int nob,
1325                                 size_t page_count, struct brw_page **pga,
1326                                 cksum_type_t client_cksum_type)
1327 {
1328         __u32 new_cksum;
1329         char *msg;
1330         cksum_type_t cksum_type;
1331
1332         if (server_cksum == client_cksum) {
1333                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1334                 return 0;
1335         }
1336
1337         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1338                                        oa->o_flags : 0);
1339         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1340                                       cksum_type);
1341
1342         if (cksum_type != client_cksum_type)
1343                 msg = "the server did not use the checksum type specified in "
1344                       "the original request - likely a protocol problem";
1345         else if (new_cksum == server_cksum)
1346                 msg = "changed on the client after we checksummed it - "
1347                       "likely false positive due to mmap IO (bug 11742)";
1348         else if (new_cksum == client_cksum)
1349                 msg = "changed in transit before arrival at OST";
1350         else
1351                 msg = "changed in transit AND doesn't match the original - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353
1354         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356                            msg, libcfs_nid2str(peer->nid),
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360                            POSTID(&oa->o_oi), pga[0]->off,
1361                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363                "client csum now %x\n", client_cksum, client_cksum_type,
1364                server_cksum, cksum_type, new_cksum);
1365         return 1;
1366 }
1367
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1370 {
1371         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372         const lnet_process_id_t *peer =
1373                         &req->rq_import->imp_connection->c_peer;
1374         struct client_obd *cli = aa->aa_cli;
1375         struct ost_body *body;
1376         u32 client_cksum = 0;
1377         ENTRY;
1378
1379         if (rc < 0 && rc != -EDQUOT) {
1380                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1381                 RETURN(rc);
1382         }
1383
1384         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1386         if (body == NULL) {
1387                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1388                 RETURN(-EPROTO);
1389         }
1390
1391         /* set/clear over quota flag for a uid/gid */
1392         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394                 unsigned int qid[LL_MAXQUOTAS] =
1395                                         {body->oa.o_uid, body->oa.o_gid};
1396
1397                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1398                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1399                        body->oa.o_flags);
1400                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1401         }
1402
1403         osc_update_grant(cli, body);
1404
1405         if (rc < 0)
1406                 RETURN(rc);
1407
1408         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1409                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1410
1411         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1412                 if (rc > 0) {
1413                         CERROR("Unexpected +ve rc %d\n", rc);
1414                         RETURN(-EPROTO);
1415                 }
1416                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1417
1418                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1419                         RETURN(-EAGAIN);
1420
1421                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422                     check_write_checksum(&body->oa, peer, client_cksum,
1423                                          body->oa.o_cksum, aa->aa_requested_nob,
1424                                          aa->aa_page_count, aa->aa_ppga,
1425                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1426                         RETURN(-EAGAIN);
1427
1428                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1429                                      aa->aa_page_count, aa->aa_ppga);
1430                 GOTO(out, rc);
1431         }
1432
1433         /* The rest of this function executes only for OST_READs */
1434
1435         /* if unwrap_bulk failed, return -EAGAIN to retry */
1436         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1437         if (rc < 0)
1438                 GOTO(out, rc = -EAGAIN);
1439
1440         if (rc > aa->aa_requested_nob) {
1441                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1442                        aa->aa_requested_nob);
1443                 RETURN(-EPROTO);
1444         }
1445
1446         if (rc != req->rq_bulk->bd_nob_transferred) {
1447                 CERROR ("Unexpected rc %d (%d transferred)\n",
1448                         rc, req->rq_bulk->bd_nob_transferred);
1449                 return (-EPROTO);
1450         }
1451
1452         if (rc < aa->aa_requested_nob)
1453                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1454
1455         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1456                 static int cksum_counter;
1457                 u32        server_cksum = body->oa.o_cksum;
1458                 char      *via = "";
1459                 char      *router = "";
1460                 cksum_type_t cksum_type;
1461
1462                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1463                                                body->oa.o_flags : 0);
1464                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1465                                                  aa->aa_ppga, OST_READ,
1466                                                  cksum_type);
1467
1468                 if (peer->nid != req->rq_bulk->bd_sender) {
1469                         via = " via ";
1470                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1471                 }
1472
1473                 if (server_cksum != client_cksum) {
1474                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475                                            "%s%s%s inode "DFID" object "DOSTID
1476                                            " extent ["LPU64"-"LPU64"]\n",
1477                                            req->rq_import->imp_obd->obd_name,
1478                                            libcfs_nid2str(peer->nid),
1479                                            via, router,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_seq : (__u64)0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_oid : 0,
1484                                            body->oa.o_valid & OBD_MD_FLFID ?
1485                                                 body->oa.o_parent_ver : 0,
1486                                            POSTID(&body->oa.o_oi),
1487                                            aa->aa_ppga[0]->off,
1488                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1489                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1490                                                                         1);
1491                         CERROR("client %x, server %x, cksum_type %x\n",
1492                                client_cksum, server_cksum, cksum_type);
1493                         cksum_counter = 0;
1494                         aa->aa_oa->o_cksum = client_cksum;
1495                         rc = -EAGAIN;
1496                 } else {
1497                         cksum_counter++;
1498                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1499                         rc = 0;
1500                 }
1501         } else if (unlikely(client_cksum)) {
1502                 static int cksum_missed;
1503
1504                 cksum_missed++;
1505                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506                         CERROR("Checksum %u requested from %s but not sent\n",
1507                                cksum_missed, libcfs_nid2str(peer->nid));
1508         } else {
1509                 rc = 0;
1510         }
1511 out:
1512         if (rc >= 0)
1513                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514                                      aa->aa_oa, &body->oa);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520                                 struct osc_brw_async_args *aa, int rc)
1521 {
1522         struct ptlrpc_request *new_req;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         ENTRY;
1526
1527         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528                   "redo for recoverable error %d", rc);
1529
1530         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1532                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1533                                   aa->aa_ppga, &new_req, 1);
1534         if (rc)
1535                 RETURN(rc);
1536
1537         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538                 if (oap->oap_request != NULL) {
1539                         LASSERTF(request == oap->oap_request,
1540                                  "request %p != oap_request %p\n",
1541                                  request, oap->oap_request);
1542                         if (oap->oap_interrupted) {
1543                                 ptlrpc_req_finished(new_req);
1544                                 RETURN(-EINTR);
1545                         }
1546                 }
1547         }
1548         /* New request takes over pga and oaps from old request.
1549          * Note that copying a list_head doesn't work, need to move it... */
1550         aa->aa_resends++;
1551         new_req->rq_interpret_reply = request->rq_interpret_reply;
1552         new_req->rq_async_args = request->rq_async_args;
1553         new_req->rq_commit_cb = request->rq_commit_cb;
1554         /* cap resend delay to the current request timeout, this is similar to
1555          * what ptlrpc does (see after_reply()) */
1556         if (aa->aa_resends > new_req->rq_timeout)
1557                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558         else
1559                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560         new_req->rq_generation_set = 1;
1561         new_req->rq_import_generation = request->rq_import_generation;
1562
1563         new_aa = ptlrpc_req_async_args(new_req);
1564
1565         INIT_LIST_HEAD(&new_aa->aa_oaps);
1566         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567         INIT_LIST_HEAD(&new_aa->aa_exts);
1568         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569         new_aa->aa_resends = aa->aa_resends;
1570
1571         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572                 if (oap->oap_request) {
1573                         ptlrpc_req_finished(oap->oap_request);
1574                         oap->oap_request = ptlrpc_request_addref(new_req);
1575                 }
1576         }
1577
1578         /* XXX: This code will run into problem if we're going to support
1579          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1580          * and wait for all of them to be finished. We should inherit request
1581          * set from old request. */
1582         ptlrpcd_add_req(new_req);
1583
1584         DEBUG_REQ(D_INFO, new_req, "new request");
1585         RETURN(0);
1586 }
1587
1588 /*
1589  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1590  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1591  * fine for our small page arrays and doesn't require allocation.  its an
1592  * insertion sort that swaps elements that are strides apart, shrinking the
1593  * stride down until its '1' and the array is sorted.
1594  */
1595 static void sort_brw_pages(struct brw_page **array, int num)
1596 {
1597         int stride, i, j;
1598         struct brw_page *tmp;
1599
1600         if (num == 1)
1601                 return;
1602         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1603                 ;
1604
1605         do {
1606                 stride /= 3;
1607                 for (i = stride ; i < num ; i++) {
1608                         tmp = array[i];
1609                         j = i;
1610                         while (j >= stride && array[j - stride]->off > tmp->off) {
1611                                 array[j] = array[j - stride];
1612                                 j -= stride;
1613                         }
1614                         array[j] = tmp;
1615                 }
1616         } while (stride > 1);
1617 }
1618
1619 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1620 {
1621         LASSERT(ppga != NULL);
1622         OBD_FREE(ppga, sizeof(*ppga) * count);
1623 }
1624
1625 static int brw_interpret(const struct lu_env *env,
1626                          struct ptlrpc_request *req, void *data, int rc)
1627 {
1628         struct osc_brw_async_args *aa = data;
1629         struct osc_extent *ext;
1630         struct osc_extent *tmp;
1631         struct client_obd *cli = aa->aa_cli;
1632         ENTRY;
1633
1634         rc = osc_brw_fini_request(req, rc);
1635         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1636         /* When server return -EINPROGRESS, client should always retry
1637          * regardless of the number of times the bulk was resent already. */
1638         if (osc_recoverable_error(rc)) {
1639                 if (req->rq_import_generation !=
1640                     req->rq_import->imp_generation) {
1641                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1642                                ""DOSTID", rc = %d.\n",
1643                                req->rq_import->imp_obd->obd_name,
1644                                POSTID(&aa->aa_oa->o_oi), rc);
1645                 } else if (rc == -EINPROGRESS ||
1646                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1647                         rc = osc_brw_redo_request(req, aa, rc);
1648                 } else {
1649                         CERROR("%s: too many resent retries for object: "
1650                                ""LPU64":"LPU64", rc = %d.\n",
1651                                req->rq_import->imp_obd->obd_name,
1652                                POSTID(&aa->aa_oa->o_oi), rc);
1653                 }
1654
1655                 if (rc == 0)
1656                         RETURN(0);
1657                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1658                         rc = -EIO;
1659         }
1660
1661         if (rc == 0) {
1662                 struct obdo *oa = aa->aa_oa;
1663                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1664                 unsigned long valid = 0;
1665                 struct cl_object *obj;
1666                 struct osc_async_page *last;
1667
1668                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1669                 obj = osc2cl(last->oap_obj);
1670
1671                 cl_object_attr_lock(obj);
1672                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1673                         attr->cat_blocks = oa->o_blocks;
1674                         valid |= CAT_BLOCKS;
1675                 }
1676                 if (oa->o_valid & OBD_MD_FLMTIME) {
1677                         attr->cat_mtime = oa->o_mtime;
1678                         valid |= CAT_MTIME;
1679                 }
1680                 if (oa->o_valid & OBD_MD_FLATIME) {
1681                         attr->cat_atime = oa->o_atime;
1682                         valid |= CAT_ATIME;
1683                 }
1684                 if (oa->o_valid & OBD_MD_FLCTIME) {
1685                         attr->cat_ctime = oa->o_ctime;
1686                         valid |= CAT_CTIME;
1687                 }
1688
1689                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1690                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1691                         loff_t last_off = last->oap_count + last->oap_obj_off +
1692                                 last->oap_page_off;
1693
1694                         /* Change file size if this is an out of quota or
1695                          * direct IO write and it extends the file size */
1696                         if (loi->loi_lvb.lvb_size < last_off) {
1697                                 attr->cat_size = last_off;
1698                                 valid |= CAT_SIZE;
1699                         }
1700                         /* Extend KMS if it's not a lockless write */
1701                         if (loi->loi_kms < last_off &&
1702                             oap2osc_page(last)->ops_srvlock == 0) {
1703                                 attr->cat_kms = last_off;
1704                                 valid |= CAT_KMS;
1705                         }
1706                 }
1707
1708                 if (valid != 0)
1709                         cl_object_attr_update(env, obj, attr, valid);
1710                 cl_object_attr_unlock(obj);
1711         }
1712         OBDO_FREE(aa->aa_oa);
1713
1714         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1715                 osc_inc_unstable_pages(req);
1716
1717         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1718                 list_del_init(&ext->oe_link);
1719                 osc_extent_finish(env, ext, 1, rc);
1720         }
1721         LASSERT(list_empty(&aa->aa_exts));
1722         LASSERT(list_empty(&aa->aa_oaps));
1723
1724         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1725         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1726
1727         spin_lock(&cli->cl_loi_list_lock);
1728         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1729          * is called so we know whether to go to sync BRWs or wait for more
1730          * RPCs to complete */
1731         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1732                 cli->cl_w_in_flight--;
1733         else
1734                 cli->cl_r_in_flight--;
1735         osc_wake_cache_waiters(cli);
1736         spin_unlock(&cli->cl_loi_list_lock);
1737
1738         osc_io_unplug(env, cli, NULL);
1739         RETURN(rc);
1740 }
1741
1742 static void brw_commit(struct ptlrpc_request *req)
1743 {
1744         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1745          * this called via the rq_commit_cb, I need to ensure
1746          * osc_dec_unstable_pages is still called. Otherwise unstable
1747          * pages may be leaked. */
1748         spin_lock(&req->rq_lock);
1749         if (likely(req->rq_unstable)) {
1750                 req->rq_unstable = 0;
1751                 spin_unlock(&req->rq_lock);
1752
1753                 osc_dec_unstable_pages(req);
1754         } else {
1755                 req->rq_committed = 1;
1756                 spin_unlock(&req->rq_lock);
1757         }
1758 }
1759
1760 /**
1761  * Build an RPC by the list of extent @ext_list. The caller must ensure
1762  * that the total pages in this list are NOT over max pages per RPC.
1763  * Extents in the list must be in OES_RPC state.
1764  */
1765 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1766                   struct list_head *ext_list, int cmd)
1767 {
1768         struct ptlrpc_request           *req = NULL;
1769         struct osc_extent               *ext;
1770         struct brw_page                 **pga = NULL;
1771         struct osc_brw_async_args       *aa = NULL;
1772         struct obdo                     *oa = NULL;
1773         struct osc_async_page           *oap;
1774         struct osc_object               *obj = NULL;
1775         struct cl_req_attr              *crattr = NULL;
1776         loff_t                          starting_offset = OBD_OBJECT_EOF;
1777         loff_t                          ending_offset = 0;
1778         int                             mpflag = 0;
1779         int                             mem_tight = 0;
1780         int                             page_count = 0;
1781         bool                            soft_sync = false;
1782         bool                            interrupted = false;
1783         int                             i;
1784         int                             grant = 0;
1785         int                             rc;
1786         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1787         struct ost_body                 *body;
1788         ENTRY;
1789         LASSERT(!list_empty(ext_list));
1790
1791         /* add pages into rpc_list to build BRW rpc */
1792         list_for_each_entry(ext, ext_list, oe_link) {
1793                 LASSERT(ext->oe_state == OES_RPC);
1794                 mem_tight |= ext->oe_memalloc;
1795                 grant += ext->oe_grants;
1796                 page_count += ext->oe_nr_pages;
1797                 if (obj == NULL)
1798                         obj = ext->oe_obj;
1799         }
1800
1801         soft_sync = osc_over_unstable_soft_limit(cli);
1802         if (mem_tight)
1803                 mpflag = cfs_memory_pressure_get_and_set();
1804
1805         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1806         if (pga == NULL)
1807                 GOTO(out, rc = -ENOMEM);
1808
1809         OBDO_ALLOC(oa);
1810         if (oa == NULL)
1811                 GOTO(out, rc = -ENOMEM);
1812
1813         i = 0;
1814         list_for_each_entry(ext, ext_list, oe_link) {
1815                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1816                         if (mem_tight)
1817                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1818                         if (soft_sync)
1819                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1820                         pga[i] = &oap->oap_brw_page;
1821                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1822                         i++;
1823
1824                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1825                         if (starting_offset == OBD_OBJECT_EOF ||
1826                             starting_offset > oap->oap_obj_off)
1827                                 starting_offset = oap->oap_obj_off;
1828                         else
1829                                 LASSERT(oap->oap_page_off == 0);
1830                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1831                                 ending_offset = oap->oap_obj_off +
1832                                                 oap->oap_count;
1833                         else
1834                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1835                                         PAGE_CACHE_SIZE);
1836                         if (oap->oap_interrupted)
1837                                 interrupted = true;
1838                 }
1839         }
1840
1841         /* first page in the list */
1842         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1843
1844         crattr = &osc_env_info(env)->oti_req_attr;
1845         memset(crattr, 0, sizeof(*crattr));
1846         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1847         crattr->cra_flags = ~0ULL;
1848         crattr->cra_page = oap2cl_page(oap);
1849         crattr->cra_oa = oa;
1850         cl_req_attr_set(env, osc2cl(obj), crattr);
1851
1852         if (cmd == OBD_BRW_WRITE)
1853                 oa->o_grant_used = grant;
1854
1855         sort_brw_pages(pga, page_count);
1856         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1857         if (rc != 0) {
1858                 CERROR("prep_req failed: %d\n", rc);
1859                 GOTO(out, rc);
1860         }
1861
1862         req->rq_commit_cb = brw_commit;
1863         req->rq_interpret_reply = brw_interpret;
1864         req->rq_memalloc = mem_tight != 0;
1865         oap->oap_request = ptlrpc_request_addref(req);
1866         if (interrupted && !req->rq_intr)
1867                 ptlrpc_mark_interrupted(req);
1868
1869         /* Need to update the timestamps after the request is built in case
1870          * we race with setattr (locally or in queue at OST).  If OST gets
1871          * later setattr before earlier BRW (as determined by the request xid),
1872          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1873          * way to do this in a single call.  bug 10150 */
1874         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1875         crattr->cra_oa = &body->oa;
1876         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1877         cl_req_attr_set(env, osc2cl(obj), crattr);
1878         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1879
1880         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1881         aa = ptlrpc_req_async_args(req);
1882         INIT_LIST_HEAD(&aa->aa_oaps);
1883         list_splice_init(&rpc_list, &aa->aa_oaps);
1884         INIT_LIST_HEAD(&aa->aa_exts);
1885         list_splice_init(ext_list, &aa->aa_exts);
1886
1887         spin_lock(&cli->cl_loi_list_lock);
1888         starting_offset >>= PAGE_CACHE_SHIFT;
1889         if (cmd == OBD_BRW_READ) {
1890                 cli->cl_r_in_flight++;
1891                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1892                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1893                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1894                                       starting_offset + 1);
1895         } else {
1896                 cli->cl_w_in_flight++;
1897                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1898                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1899                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1900                                       starting_offset + 1);
1901         }
1902         spin_unlock(&cli->cl_loi_list_lock);
1903
1904         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1905                   page_count, aa, cli->cl_r_in_flight,
1906                   cli->cl_w_in_flight);
1907         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, 4);
1908
1909         ptlrpcd_add_req(req);
1910         rc = 0;
1911         EXIT;
1912
1913 out:
1914         if (mem_tight != 0)
1915                 cfs_memory_pressure_restore(mpflag);
1916
1917         if (rc != 0) {
1918                 LASSERT(req == NULL);
1919
1920                 if (oa)
1921                         OBDO_FREE(oa);
1922                 if (pga)
1923                         OBD_FREE(pga, sizeof(*pga) * page_count);
1924                 /* this should happen rarely and is pretty bad, it makes the
1925                  * pending list not follow the dirty order */
1926                 while (!list_empty(ext_list)) {
1927                         ext = list_entry(ext_list->next, struct osc_extent,
1928                                          oe_link);
1929                         list_del_init(&ext->oe_link);
1930                         osc_extent_finish(env, ext, 0, rc);
1931                 }
1932         }
1933         RETURN(rc);
1934 }
1935
1936 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1937 {
1938         int set = 0;
1939
1940         LASSERT(lock != NULL);
1941
1942         lock_res_and_lock(lock);
1943
1944         if (lock->l_ast_data == NULL)
1945                 lock->l_ast_data = data;
1946         if (lock->l_ast_data == data)
1947                 set = 1;
1948
1949         unlock_res_and_lock(lock);
1950
1951         return set;
1952 }
1953
1954 static int osc_enqueue_fini(struct ptlrpc_request *req,
1955                             osc_enqueue_upcall_f upcall, void *cookie,
1956                             struct lustre_handle *lockh, enum ldlm_mode mode,
1957                             __u64 *flags, int agl, int errcode)
1958 {
1959         bool intent = *flags & LDLM_FL_HAS_INTENT;
1960         int rc;
1961         ENTRY;
1962
1963         /* The request was created before ldlm_cli_enqueue call. */
1964         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1965                 struct ldlm_reply *rep;
1966
1967                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1968                 LASSERT(rep != NULL);
1969
1970                 rep->lock_policy_res1 =
1971                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1972                 if (rep->lock_policy_res1)
1973                         errcode = rep->lock_policy_res1;
1974                 if (!agl)
1975                         *flags |= LDLM_FL_LVB_READY;
1976         } else if (errcode == ELDLM_OK) {
1977                 *flags |= LDLM_FL_LVB_READY;
1978         }
1979
1980         /* Call the update callback. */
1981         rc = (*upcall)(cookie, lockh, errcode);
1982
1983         /* release the reference taken in ldlm_cli_enqueue() */
1984         if (errcode == ELDLM_LOCK_MATCHED)
1985                 errcode = ELDLM_OK;
1986         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1987                 ldlm_lock_decref(lockh, mode);
1988
1989         RETURN(rc);
1990 }
1991
1992 static int osc_enqueue_interpret(const struct lu_env *env,
1993                                  struct ptlrpc_request *req,
1994                                  struct osc_enqueue_args *aa, int rc)
1995 {
1996         struct ldlm_lock *lock;
1997         struct lustre_handle *lockh = &aa->oa_lockh;
1998         enum ldlm_mode mode = aa->oa_mode;
1999         struct ost_lvb *lvb = aa->oa_lvb;
2000         __u32 lvb_len = sizeof(*lvb);
2001         __u64 flags = 0;
2002
2003         ENTRY;
2004
2005         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2006          * be valid. */
2007         lock = ldlm_handle2lock(lockh);
2008         LASSERTF(lock != NULL,
2009                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2010                  lockh->cookie, req, aa);
2011
2012         /* Take an additional reference so that a blocking AST that
2013          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2014          * to arrive after an upcall has been executed by
2015          * osc_enqueue_fini(). */
2016         ldlm_lock_addref(lockh, mode);
2017
2018         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2019         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2020
2021         /* Let CP AST to grant the lock first. */
2022         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2023
2024         if (aa->oa_agl) {
2025                 LASSERT(aa->oa_lvb == NULL);
2026                 LASSERT(aa->oa_flags == NULL);
2027                 aa->oa_flags = &flags;
2028         }
2029
2030         /* Complete obtaining the lock procedure. */
2031         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2032                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2033                                    lockh, rc);
2034         /* Complete osc stuff. */
2035         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2036                               aa->oa_flags, aa->oa_agl, rc);
2037
2038         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2039
2040         ldlm_lock_decref(lockh, mode);
2041         LDLM_LOCK_PUT(lock);
2042         RETURN(rc);
2043 }
2044
2045 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2046
2047 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2048  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2049  * other synchronous requests, however keeping some locks and trying to obtain
2050  * others may take a considerable amount of time in a case of ost failure; and
2051  * when other sync requests do not get released lock from a client, the client
2052  * is evicted from the cluster -- such scenarious make the life difficult, so
2053  * release locks just after they are obtained. */
2054 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2055                      __u64 *flags, union ldlm_policy_data *policy,
2056                      struct ost_lvb *lvb, int kms_valid,
2057                      osc_enqueue_upcall_f upcall, void *cookie,
2058                      struct ldlm_enqueue_info *einfo,
2059                      struct ptlrpc_request_set *rqset, int async, int agl)
2060 {
2061         struct obd_device *obd = exp->exp_obd;
2062         struct lustre_handle lockh = { 0 };
2063         struct ptlrpc_request *req = NULL;
2064         int intent = *flags & LDLM_FL_HAS_INTENT;
2065         __u64 match_flags = *flags;
2066         enum ldlm_mode mode;
2067         int rc;
2068         ENTRY;
2069
2070         /* Filesystem lock extents are extended to page boundaries so that
2071          * dealing with the page cache is a little smoother.  */
2072         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2073         policy->l_extent.end |= ~PAGE_MASK;
2074
2075         /*
2076          * kms is not valid when either object is completely fresh (so that no
2077          * locks are cached), or object was evicted. In the latter case cached
2078          * lock cannot be used, because it would prime inode state with
2079          * potentially stale LVB.
2080          */
2081         if (!kms_valid)
2082                 goto no_match;
2083
2084         /* Next, search for already existing extent locks that will cover us */
2085         /* If we're trying to read, we also search for an existing PW lock.  The
2086          * VFS and page cache already protect us locally, so lots of readers/
2087          * writers can share a single PW lock.
2088          *
2089          * There are problems with conversion deadlocks, so instead of
2090          * converting a read lock to a write lock, we'll just enqueue a new
2091          * one.
2092          *
2093          * At some point we should cancel the read lock instead of making them
2094          * send us a blocking callback, but there are problems with canceling
2095          * locks out from other users right now, too. */
2096         mode = einfo->ei_mode;
2097         if (einfo->ei_mode == LCK_PR)
2098                 mode |= LCK_PW;
2099         if (agl == 0)
2100                 match_flags |= LDLM_FL_LVB_READY;
2101         if (intent != 0)
2102                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2103         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2104                                einfo->ei_type, policy, mode, &lockh, 0);
2105         if (mode) {
2106                 struct ldlm_lock *matched;
2107
2108                 if (*flags & LDLM_FL_TEST_LOCK)
2109                         RETURN(ELDLM_OK);
2110
2111                 matched = ldlm_handle2lock(&lockh);
2112                 if (agl) {
2113                         /* AGL enqueues DLM locks speculatively. Therefore if
2114                          * it already exists a DLM lock, it wll just inform the
2115                          * caller to cancel the AGL process for this stripe. */
2116                         ldlm_lock_decref(&lockh, mode);
2117                         LDLM_LOCK_PUT(matched);
2118                         RETURN(-ECANCELED);
2119                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2120                         *flags |= LDLM_FL_LVB_READY;
2121
2122                         /* We already have a lock, and it's referenced. */
2123                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2124
2125                         ldlm_lock_decref(&lockh, mode);
2126                         LDLM_LOCK_PUT(matched);
2127                         RETURN(ELDLM_OK);
2128                 } else {
2129                         ldlm_lock_decref(&lockh, mode);
2130                         LDLM_LOCK_PUT(matched);
2131                 }
2132         }
2133
2134 no_match:
2135         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2136                 RETURN(-ENOLCK);
2137
2138         if (intent) {
2139                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2140                                            &RQF_LDLM_ENQUEUE_LVB);
2141                 if (req == NULL)
2142                         RETURN(-ENOMEM);
2143
2144                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2145                 if (rc) {
2146                         ptlrpc_request_free(req);
2147                         RETURN(rc);
2148                 }
2149
2150                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2151                                      sizeof *lvb);
2152                 ptlrpc_request_set_replen(req);
2153         }
2154
2155         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2156         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2157
2158         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2159                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2160         if (async) {
2161                 if (!rc) {
2162                         struct osc_enqueue_args *aa;
2163                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2164                         aa = ptlrpc_req_async_args(req);
2165                         aa->oa_exp    = exp;
2166                         aa->oa_mode   = einfo->ei_mode;
2167                         aa->oa_type   = einfo->ei_type;
2168                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2169                         aa->oa_upcall = upcall;
2170                         aa->oa_cookie = cookie;
2171                         aa->oa_agl    = !!agl;
2172                         if (!agl) {
2173                                 aa->oa_flags  = flags;
2174                                 aa->oa_lvb    = lvb;
2175                         } else {
2176                                 /* AGL is essentially to enqueue an DLM lock
2177                                  * in advance, so we don't care about the
2178                                  * result of AGL enqueue. */
2179                                 aa->oa_lvb    = NULL;
2180                                 aa->oa_flags  = NULL;
2181                         }
2182
2183                         req->rq_interpret_reply =
2184                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2185                         if (rqset == PTLRPCD_SET)
2186                                 ptlrpcd_add_req(req);
2187                         else
2188                                 ptlrpc_set_add_req(rqset, req);
2189                 } else if (intent) {
2190                         ptlrpc_req_finished(req);
2191                 }
2192                 RETURN(rc);
2193         }
2194
2195         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2196                               flags, agl, rc);
2197         if (intent)
2198                 ptlrpc_req_finished(req);
2199
2200         RETURN(rc);
2201 }
2202
2203 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2204                    enum ldlm_type type, union ldlm_policy_data *policy,
2205                    enum ldlm_mode mode, __u64 *flags, void *data,
2206                    struct lustre_handle *lockh, int unref)
2207 {
2208         struct obd_device *obd = exp->exp_obd;
2209         __u64 lflags = *flags;
2210         enum ldlm_mode rc;
2211         ENTRY;
2212
2213         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2214                 RETURN(-EIO);
2215
2216         /* Filesystem lock extents are extended to page boundaries so that
2217          * dealing with the page cache is a little smoother */
2218         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2219         policy->l_extent.end |= ~PAGE_MASK;
2220
2221         /* Next, search for already existing extent locks that will cover us */
2222         /* If we're trying to read, we also search for an existing PW lock.  The
2223          * VFS and page cache already protect us locally, so lots of readers/
2224          * writers can share a single PW lock. */
2225         rc = mode;
2226         if (mode == LCK_PR)
2227                 rc |= LCK_PW;
2228         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2229                              res_id, type, policy, rc, lockh, unref);
2230         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2231                 RETURN(rc);
2232
2233         if (data != NULL) {
2234                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2235
2236                 LASSERT(lock != NULL);
2237                 if (!osc_set_lock_data(lock, data)) {
2238                         ldlm_lock_decref(lockh, rc);
2239                         rc = 0;
2240                 }
2241                 LDLM_LOCK_PUT(lock);
2242         }
2243         RETURN(rc);
2244 }
2245
2246 static int osc_statfs_interpret(const struct lu_env *env,
2247                                 struct ptlrpc_request *req,
2248                                 struct osc_async_args *aa, int rc)
2249 {
2250         struct obd_statfs *msfs;
2251         ENTRY;
2252
2253         if (rc == -EBADR)
2254                 /* The request has in fact never been sent
2255                  * due to issues at a higher level (LOV).
2256                  * Exit immediately since the caller is
2257                  * aware of the problem and takes care
2258                  * of the clean up */
2259                  RETURN(rc);
2260
2261         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2262             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2263                 GOTO(out, rc = 0);
2264
2265         if (rc != 0)
2266                 GOTO(out, rc);
2267
2268         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2269         if (msfs == NULL) {
2270                 GOTO(out, rc = -EPROTO);
2271         }
2272
2273         *aa->aa_oi->oi_osfs = *msfs;
2274 out:
2275         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2276         RETURN(rc);
2277 }
2278
2279 static int osc_statfs_async(struct obd_export *exp,
2280                             struct obd_info *oinfo, __u64 max_age,
2281                             struct ptlrpc_request_set *rqset)
2282 {
2283         struct obd_device     *obd = class_exp2obd(exp);
2284         struct ptlrpc_request *req;
2285         struct osc_async_args *aa;
2286         int                    rc;
2287         ENTRY;
2288
2289         /* We could possibly pass max_age in the request (as an absolute
2290          * timestamp or a "seconds.usec ago") so the target can avoid doing
2291          * extra calls into the filesystem if that isn't necessary (e.g.
2292          * during mount that would help a bit).  Having relative timestamps
2293          * is not so great if request processing is slow, while absolute
2294          * timestamps are not ideal because they need time synchronization. */
2295         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2296         if (req == NULL)
2297                 RETURN(-ENOMEM);
2298
2299         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2300         if (rc) {
2301                 ptlrpc_request_free(req);
2302                 RETURN(rc);
2303         }
2304         ptlrpc_request_set_replen(req);
2305         req->rq_request_portal = OST_CREATE_PORTAL;
2306         ptlrpc_at_set_req_timeout(req);
2307
2308         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2309                 /* procfs requests not want stat in wait for avoid deadlock */
2310                 req->rq_no_resend = 1;
2311                 req->rq_no_delay = 1;
2312         }
2313
2314         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2315         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2316         aa = ptlrpc_req_async_args(req);
2317         aa->aa_oi = oinfo;
2318
2319         ptlrpc_set_add_req(rqset, req);
2320         RETURN(0);
2321 }
2322
2323 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2324                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2325 {
2326         struct obd_device     *obd = class_exp2obd(exp);
2327         struct obd_statfs     *msfs;
2328         struct ptlrpc_request *req;
2329         struct obd_import     *imp = NULL;
2330         int rc;
2331         ENTRY;
2332
2333         /*Since the request might also come from lprocfs, so we need
2334          *sync this with client_disconnect_export Bug15684*/
2335         down_read(&obd->u.cli.cl_sem);
2336         if (obd->u.cli.cl_import)
2337                 imp = class_import_get(obd->u.cli.cl_import);
2338         up_read(&obd->u.cli.cl_sem);
2339         if (!imp)
2340                 RETURN(-ENODEV);
2341
2342         /* We could possibly pass max_age in the request (as an absolute
2343          * timestamp or a "seconds.usec ago") so the target can avoid doing
2344          * extra calls into the filesystem if that isn't necessary (e.g.
2345          * during mount that would help a bit).  Having relative timestamps
2346          * is not so great if request processing is slow, while absolute
2347          * timestamps are not ideal because they need time synchronization. */
2348         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2349
2350         class_import_put(imp);
2351
2352         if (req == NULL)
2353                 RETURN(-ENOMEM);
2354
2355         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2356         if (rc) {
2357                 ptlrpc_request_free(req);
2358                 RETURN(rc);
2359         }
2360         ptlrpc_request_set_replen(req);
2361         req->rq_request_portal = OST_CREATE_PORTAL;
2362         ptlrpc_at_set_req_timeout(req);
2363
2364         if (flags & OBD_STATFS_NODELAY) {
2365                 /* procfs requests not want stat in wait for avoid deadlock */
2366                 req->rq_no_resend = 1;
2367                 req->rq_no_delay = 1;
2368         }
2369
2370         rc = ptlrpc_queue_wait(req);
2371         if (rc)
2372                 GOTO(out, rc);
2373
2374         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2375         if (msfs == NULL) {
2376                 GOTO(out, rc = -EPROTO);
2377         }
2378
2379         *osfs = *msfs;
2380
2381         EXIT;
2382  out:
2383         ptlrpc_req_finished(req);
2384         return rc;
2385 }
2386
2387 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2388                          void *karg, void __user *uarg)
2389 {
2390         struct obd_device *obd = exp->exp_obd;
2391         struct obd_ioctl_data *data = karg;
2392         int err = 0;
2393         ENTRY;
2394
2395         if (!try_module_get(THIS_MODULE)) {
2396                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2397                        module_name(THIS_MODULE));
2398                 return -EINVAL;
2399         }
2400         switch (cmd) {
2401         case OBD_IOC_CLIENT_RECOVER:
2402                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2403                                             data->ioc_inlbuf1, 0);
2404                 if (err > 0)
2405                         err = 0;
2406                 GOTO(out, err);
2407         case IOC_OSC_SET_ACTIVE:
2408                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2409                                                data->ioc_offset);
2410                 GOTO(out, err);
2411         case OBD_IOC_PING_TARGET:
2412                 err = ptlrpc_obd_ping(obd);
2413                 GOTO(out, err);
2414         default:
2415                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2416                        cmd, current_comm());
2417                 GOTO(out, err = -ENOTTY);
2418         }
2419 out:
2420         module_put(THIS_MODULE);
2421         return err;
2422 }
2423
2424 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2425                               u32 keylen, void *key,
2426                               u32 vallen, void *val,
2427                               struct ptlrpc_request_set *set)
2428 {
2429         struct ptlrpc_request *req;
2430         struct obd_device     *obd = exp->exp_obd;
2431         struct obd_import     *imp = class_exp2cliimp(exp);
2432         char                  *tmp;
2433         int                    rc;
2434         ENTRY;
2435
2436         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2437
2438         if (KEY_IS(KEY_CHECKSUM)) {
2439                 if (vallen != sizeof(int))
2440                         RETURN(-EINVAL);
2441                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2442                 RETURN(0);
2443         }
2444
2445         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2446                 sptlrpc_conf_client_adapt(obd);
2447                 RETURN(0);
2448         }
2449
2450         if (KEY_IS(KEY_FLUSH_CTX)) {
2451                 sptlrpc_import_flush_my_ctx(imp);
2452                 RETURN(0);
2453         }
2454
2455         if (KEY_IS(KEY_CACHE_SET)) {
2456                 struct client_obd *cli = &obd->u.cli;
2457
2458                 LASSERT(cli->cl_cache == NULL); /* only once */
2459                 cli->cl_cache = (struct cl_client_cache *)val;
2460                 cl_cache_incref(cli->cl_cache);
2461                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2462
2463                 /* add this osc into entity list */
2464                 LASSERT(list_empty(&cli->cl_lru_osc));
2465                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2466                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2467                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2468
2469                 RETURN(0);
2470         }
2471
2472         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2473                 struct client_obd *cli = &obd->u.cli;
2474                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2475                 long target = *(long *)val;
2476
2477                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2478                 *(long *)val -= nr;
2479                 RETURN(0);
2480         }
2481
2482         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2483                 RETURN(-EINVAL);
2484
2485         /* We pass all other commands directly to OST. Since nobody calls osc
2486            methods directly and everybody is supposed to go through LOV, we
2487            assume lov checked invalid values for us.
2488            The only recognised values so far are evict_by_nid and mds_conn.
2489            Even if something bad goes through, we'd get a -EINVAL from OST
2490            anyway. */
2491
2492         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2493                                                 &RQF_OST_SET_GRANT_INFO :
2494                                                 &RQF_OBD_SET_INFO);
2495         if (req == NULL)
2496                 RETURN(-ENOMEM);
2497
2498         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2499                              RCL_CLIENT, keylen);
2500         if (!KEY_IS(KEY_GRANT_SHRINK))
2501                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2502                                      RCL_CLIENT, vallen);
2503         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2504         if (rc) {
2505                 ptlrpc_request_free(req);
2506                 RETURN(rc);
2507         }
2508
2509         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2510         memcpy(tmp, key, keylen);
2511         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2512                                                         &RMF_OST_BODY :
2513                                                         &RMF_SETINFO_VAL);
2514         memcpy(tmp, val, vallen);
2515
2516         if (KEY_IS(KEY_GRANT_SHRINK)) {
2517                 struct osc_grant_args *aa;
2518                 struct obdo *oa;
2519
2520                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2521                 aa = ptlrpc_req_async_args(req);
2522                 OBDO_ALLOC(oa);
2523                 if (!oa) {
2524                         ptlrpc_req_finished(req);
2525                         RETURN(-ENOMEM);
2526                 }
2527                 *oa = ((struct ost_body *)val)->oa;
2528                 aa->aa_oa = oa;
2529                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2530         }
2531
2532         ptlrpc_request_set_replen(req);
2533         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2534                 LASSERT(set != NULL);
2535                 ptlrpc_set_add_req(set, req);
2536                 ptlrpc_check_set(NULL, set);
2537         } else {
2538                 ptlrpcd_add_req(req);
2539         }
2540
2541         RETURN(0);
2542 }
2543
2544 static int osc_reconnect(const struct lu_env *env,
2545                          struct obd_export *exp, struct obd_device *obd,
2546                          struct obd_uuid *cluuid,
2547                          struct obd_connect_data *data,
2548                          void *localdata)
2549 {
2550         struct client_obd *cli = &obd->u.cli;
2551
2552         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2553                 long lost_grant;
2554                 long grant;
2555
2556                 spin_lock(&cli->cl_loi_list_lock);
2557                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2558                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2559                         grant += cli->cl_dirty_grant;
2560                 else
2561                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2562                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2563                 lost_grant = cli->cl_lost_grant;
2564                 cli->cl_lost_grant = 0;
2565                 spin_unlock(&cli->cl_loi_list_lock);
2566
2567                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2568                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2569                        data->ocd_version, data->ocd_grant, lost_grant);
2570         }
2571
2572         RETURN(0);
2573 }
2574
2575 static int osc_disconnect(struct obd_export *exp)
2576 {
2577         struct obd_device *obd = class_exp2obd(exp);
2578         int rc;
2579
2580         rc = client_disconnect_export(exp);
2581         /**
2582          * Initially we put del_shrink_grant before disconnect_export, but it
2583          * causes the following problem if setup (connect) and cleanup
2584          * (disconnect) are tangled together.
2585          *      connect p1                     disconnect p2
2586          *   ptlrpc_connect_import
2587          *     ...............               class_manual_cleanup
2588          *                                     osc_disconnect
2589          *                                     del_shrink_grant
2590          *   ptlrpc_connect_interrupt
2591          *     init_grant_shrink
2592          *   add this client to shrink list
2593          *                                      cleanup_osc
2594          * Bang! pinger trigger the shrink.
2595          * So the osc should be disconnected from the shrink list, after we
2596          * are sure the import has been destroyed. BUG18662
2597          */
2598         if (obd->u.cli.cl_import == NULL)
2599                 osc_del_shrink_grant(&obd->u.cli);
2600         return rc;
2601 }
2602
2603 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2604         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2605 {
2606         struct lu_env *env = arg;
2607         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2608         struct ldlm_lock *lock;
2609         struct osc_object *osc = NULL;
2610         ENTRY;
2611
2612         lock_res(res);
2613         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2614                 if (lock->l_ast_data != NULL && osc == NULL) {
2615                         osc = lock->l_ast_data;
2616                         cl_object_get(osc2cl(osc));
2617                 }
2618
2619                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2620                  * by the 2nd round of ldlm_namespace_clean() call in
2621                  * osc_import_event(). */
2622                 ldlm_clear_cleaned(lock);
2623         }
2624         unlock_res(res);
2625
2626         if (osc != NULL) {
2627                 osc_object_invalidate(env, osc);
2628                 cl_object_put(env, osc2cl(osc));
2629         }
2630
2631         RETURN(0);
2632 }
2633
2634 static int osc_import_event(struct obd_device *obd,
2635                             struct obd_import *imp,
2636                             enum obd_import_event event)
2637 {
2638         struct client_obd *cli;
2639         int rc = 0;
2640
2641         ENTRY;
2642         LASSERT(imp->imp_obd == obd);
2643
2644         switch (event) {
2645         case IMP_EVENT_DISCON: {
2646                 cli = &obd->u.cli;
2647                 spin_lock(&cli->cl_loi_list_lock);
2648                 cli->cl_avail_grant = 0;
2649                 cli->cl_lost_grant = 0;
2650                 spin_unlock(&cli->cl_loi_list_lock);
2651                 break;
2652         }
2653         case IMP_EVENT_INACTIVE: {
2654                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2655                 break;
2656         }
2657         case IMP_EVENT_INVALIDATE: {
2658                 struct ldlm_namespace *ns = obd->obd_namespace;
2659                 struct lu_env         *env;
2660                 __u16                  refcheck;
2661
2662                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2663
2664                 env = cl_env_get(&refcheck);
2665                 if (!IS_ERR(env)) {
2666                         osc_io_unplug(env, &obd->u.cli, NULL);
2667
2668                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2669                                                  osc_ldlm_resource_invalidate,
2670                                                  env, 0);
2671                         cl_env_put(env, &refcheck);
2672
2673                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2674                 } else
2675                         rc = PTR_ERR(env);
2676                 break;
2677         }
2678         case IMP_EVENT_ACTIVE: {
2679                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2680                 break;
2681         }
2682         case IMP_EVENT_OCD: {
2683                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2684
2685                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2686                         osc_init_grant(&obd->u.cli, ocd);
2687
2688                 /* See bug 7198 */
2689                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2690                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2691
2692                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2693                 break;
2694         }
2695         case IMP_EVENT_DEACTIVATE: {
2696                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2697                 break;
2698         }
2699         case IMP_EVENT_ACTIVATE: {
2700                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2701                 break;
2702         }
2703         default:
2704                 CERROR("Unknown import event %d\n", event);
2705                 LBUG();
2706         }
2707         RETURN(rc);
2708 }
2709
2710 /**
2711  * Determine whether the lock can be canceled before replaying the lock
2712  * during recovery, see bug16774 for detailed information.
2713  *
2714  * \retval zero the lock can't be canceled
2715  * \retval other ok to cancel
2716  */
2717 static int osc_cancel_weight(struct ldlm_lock *lock)
2718 {
2719         /*
2720          * Cancel all unused and granted extent lock.
2721          */
2722         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2723             lock->l_granted_mode == lock->l_req_mode &&
2724             osc_ldlm_weigh_ast(lock) == 0)
2725                 RETURN(1);
2726
2727         RETURN(0);
2728 }
2729
2730 static int brw_queue_work(const struct lu_env *env, void *data)
2731 {
2732         struct client_obd *cli = data;
2733
2734         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2735
2736         osc_io_unplug(env, cli, NULL);
2737         RETURN(0);
2738 }
2739
2740 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2741 {
2742         struct client_obd *cli = &obd->u.cli;
2743         struct obd_type   *type;
2744         void              *handler;
2745         int                rc;
2746         int                adding;
2747         int                added;
2748         int                req_count;
2749         ENTRY;
2750
2751         rc = ptlrpcd_addref();
2752         if (rc)
2753                 RETURN(rc);
2754
2755         rc = client_obd_setup(obd, lcfg);
2756         if (rc)
2757                 GOTO(out_ptlrpcd, rc);
2758
2759         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2760         if (IS_ERR(handler))
2761                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2762         cli->cl_writeback_work = handler;
2763
2764         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2765         if (IS_ERR(handler))
2766                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2767         cli->cl_lru_work = handler;
2768
2769         rc = osc_quota_setup(obd);
2770         if (rc)
2771                 GOTO(out_ptlrpcd_work, rc);
2772
2773         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2774
2775 #ifdef CONFIG_PROC_FS
2776         obd->obd_vars = lprocfs_osc_obd_vars;
2777 #endif
2778         /* If this is true then both client (osc) and server (osp) are on the
2779          * same node. The osp layer if loaded first will register the osc proc
2780          * directory. In that case this obd_device will be attached its proc
2781          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2782         type = class_search_type(LUSTRE_OSP_NAME);
2783         if (type && type->typ_procsym) {
2784                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2785                                                        type->typ_procsym,
2786                                                        obd->obd_vars, obd);
2787                 if (IS_ERR(obd->obd_proc_entry)) {
2788                         rc = PTR_ERR(obd->obd_proc_entry);
2789                         CERROR("error %d setting up lprocfs for %s\n", rc,
2790                                obd->obd_name);
2791                         obd->obd_proc_entry = NULL;
2792                 }
2793         } else {
2794                 rc = lprocfs_obd_setup(obd);
2795         }
2796
2797         /* If the basic OSC proc tree construction succeeded then
2798          * lets do the rest. */
2799         if (rc == 0) {
2800                 lproc_osc_attach_seqstat(obd);
2801                 sptlrpc_lprocfs_cliobd_attach(obd);
2802                 ptlrpc_lprocfs_register_obd(obd);
2803         }
2804
2805         /*
2806          * We try to control the total number of requests with a upper limit
2807          * osc_reqpool_maxreqcount. There might be some race which will cause
2808          * over-limit allocation, but it is fine.
2809          */
2810         req_count = atomic_read(&osc_pool_req_count);
2811         if (req_count < osc_reqpool_maxreqcount) {
2812                 adding = cli->cl_max_rpcs_in_flight + 2;
2813                 if (req_count + adding > osc_reqpool_maxreqcount)
2814                         adding = osc_reqpool_maxreqcount - req_count;
2815
2816                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2817                 atomic_add(added, &osc_pool_req_count);
2818         }
2819
2820         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2821         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2822
2823         spin_lock(&osc_shrink_lock);
2824         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2825         spin_unlock(&osc_shrink_lock);
2826
2827         RETURN(0);
2828
2829 out_ptlrpcd_work:
2830         if (cli->cl_writeback_work != NULL) {
2831                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2832                 cli->cl_writeback_work = NULL;
2833         }
2834         if (cli->cl_lru_work != NULL) {
2835                 ptlrpcd_destroy_work(cli->cl_lru_work);
2836                 cli->cl_lru_work = NULL;
2837         }
2838 out_client_setup:
2839         client_obd_cleanup(obd);
2840 out_ptlrpcd:
2841         ptlrpcd_decref();
2842         RETURN(rc);
2843 }
2844
2845 static int osc_precleanup(struct obd_device *obd)
2846 {
2847         struct client_obd *cli = &obd->u.cli;
2848         ENTRY;
2849
2850         /* LU-464
2851          * for echo client, export may be on zombie list, wait for
2852          * zombie thread to cull it, because cli.cl_import will be
2853          * cleared in client_disconnect_export():
2854          *   class_export_destroy() -> obd_cleanup() ->
2855          *   echo_device_free() -> echo_client_cleanup() ->
2856          *   obd_disconnect() -> osc_disconnect() ->
2857          *   client_disconnect_export()
2858          */
2859         obd_zombie_barrier();
2860         if (cli->cl_writeback_work) {
2861                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2862                 cli->cl_writeback_work = NULL;
2863         }
2864
2865         if (cli->cl_lru_work) {
2866                 ptlrpcd_destroy_work(cli->cl_lru_work);
2867                 cli->cl_lru_work = NULL;
2868         }
2869
2870         obd_cleanup_client_import(obd);
2871         ptlrpc_lprocfs_unregister_obd(obd);
2872         lprocfs_obd_cleanup(obd);
2873         RETURN(0);
2874 }
2875
2876 int osc_cleanup(struct obd_device *obd)
2877 {
2878         struct client_obd *cli = &obd->u.cli;
2879         int rc;
2880
2881         ENTRY;
2882
2883         spin_lock(&osc_shrink_lock);
2884         list_del(&cli->cl_shrink_list);
2885         spin_unlock(&osc_shrink_lock);
2886
2887         /* lru cleanup */
2888         if (cli->cl_cache != NULL) {
2889                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2890                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2891                 list_del_init(&cli->cl_lru_osc);
2892                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2893                 cli->cl_lru_left = NULL;
2894                 cl_cache_decref(cli->cl_cache);
2895                 cli->cl_cache = NULL;
2896         }
2897
2898         /* free memory of osc quota cache */
2899         osc_quota_cleanup(obd);
2900
2901         rc = client_obd_cleanup(obd);
2902
2903         ptlrpcd_decref();
2904         RETURN(rc);
2905 }
2906
2907 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2908 {
2909         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2910         return rc > 0 ? 0: rc;
2911 }
2912
2913 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2914 {
2915         return osc_process_config_base(obd, buf);
2916 }
2917
2918 static struct obd_ops osc_obd_ops = {
2919         .o_owner                = THIS_MODULE,
2920         .o_setup                = osc_setup,
2921         .o_precleanup           = osc_precleanup,
2922         .o_cleanup              = osc_cleanup,
2923         .o_add_conn             = client_import_add_conn,
2924         .o_del_conn             = client_import_del_conn,
2925         .o_connect              = client_connect_import,
2926         .o_reconnect            = osc_reconnect,
2927         .o_disconnect           = osc_disconnect,
2928         .o_statfs               = osc_statfs,
2929         .o_statfs_async         = osc_statfs_async,
2930         .o_create               = osc_create,
2931         .o_destroy              = osc_destroy,
2932         .o_getattr              = osc_getattr,
2933         .o_setattr              = osc_setattr,
2934         .o_iocontrol            = osc_iocontrol,
2935         .o_set_info_async       = osc_set_info_async,
2936         .o_import_event         = osc_import_event,
2937         .o_process_config       = osc_process_config,
2938         .o_quotactl             = osc_quotactl,
2939 };
2940
2941 static struct shrinker *osc_cache_shrinker;
2942 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2943 DEFINE_SPINLOCK(osc_shrink_lock);
2944
2945 #ifndef HAVE_SHRINKER_COUNT
2946 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2947 {
2948         struct shrink_control scv = {
2949                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2950                 .gfp_mask   = shrink_param(sc, gfp_mask)
2951         };
2952 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2953         struct shrinker *shrinker = NULL;
2954 #endif
2955
2956         (void)osc_cache_shrink_scan(shrinker, &scv);
2957
2958         return osc_cache_shrink_count(shrinker, &scv);
2959 }
2960 #endif
2961
2962 static int __init osc_init(void)
2963 {
2964         bool enable_proc = true;
2965         struct obd_type *type;
2966         unsigned int reqpool_size;
2967         unsigned int reqsize;
2968         int rc;
2969         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2970                          osc_cache_shrink_count, osc_cache_shrink_scan);
2971         ENTRY;
2972
2973         /* print an address of _any_ initialized kernel symbol from this
2974          * module, to allow debugging with gdb that doesn't support data
2975          * symbols from modules.*/
2976         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2977
2978         rc = lu_kmem_init(osc_caches);
2979         if (rc)
2980                 RETURN(rc);
2981
2982         type = class_search_type(LUSTRE_OSP_NAME);
2983         if (type != NULL && type->typ_procsym != NULL)
2984                 enable_proc = false;
2985
2986         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2987                                  LUSTRE_OSC_NAME, &osc_device_type);
2988         if (rc)
2989                 GOTO(out_kmem, rc);
2990
2991         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2992
2993         /* This is obviously too much memory, only prevent overflow here */
2994         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2995                 GOTO(out_type, rc = -EINVAL);
2996
2997         reqpool_size = osc_reqpool_mem_max << 20;
2998
2999         reqsize = 1;
3000         while (reqsize < OST_IO_MAXREQSIZE)
3001                 reqsize = reqsize << 1;
3002
3003         /*
3004          * We don't enlarge the request count in OSC pool according to
3005          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3006          * tried after normal allocation failed. So a small OSC pool won't
3007          * cause much performance degression in most of cases.
3008          */
3009         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3010
3011         atomic_set(&osc_pool_req_count, 0);
3012         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3013                                           ptlrpc_add_rqs_to_pool);
3014
3015         if (osc_rq_pool != NULL)
3016                 GOTO(out, rc);
3017         rc = -ENOMEM;
3018 out_type:
3019         class_unregister_type(LUSTRE_OSC_NAME);
3020 out_kmem:
3021         lu_kmem_fini(osc_caches);
3022 out:
3023         RETURN(rc);
3024 }
3025
3026 static void __exit osc_exit(void)
3027 {
3028         remove_shrinker(osc_cache_shrinker);
3029         class_unregister_type(LUSTRE_OSC_NAME);
3030         lu_kmem_fini(osc_caches);
3031         ptlrpc_free_rq_pool(osc_rq_pool);
3032 }
3033
3034 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3035 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3036 MODULE_VERSION(LUSTRE_VERSION_STRING);
3037 MODULE_LICENSE("GPL");
3038
3039 module_init(osc_init);
3040 module_exit(osc_exit);