Whamcloud - gitweb
LU-6401 uapi: turn lustre_ioctl.h into a proper UAPI header
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lustre/lustre_user.h>
38
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <uapi/linux/lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
48 #include <obd.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
51
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
54
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
58
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
62
63 struct osc_brw_async_args {
64         struct obdo              *aa_oa;
65         int                       aa_requested_nob;
66         int                       aa_nio_count;
67         u32                       aa_page_count;
68         int                       aa_resends;
69         struct brw_page **aa_ppga;
70         struct client_obd        *aa_cli;
71         struct list_head          aa_oaps;
72         struct list_head          aa_exts;
73 };
74
75 #define osc_grant_args osc_brw_async_args
76
77 struct osc_setattr_args {
78         struct obdo             *sa_oa;
79         obd_enqueue_update_f     sa_upcall;
80         void                    *sa_cookie;
81 };
82
83 struct osc_fsync_args {
84         struct osc_object       *fa_obj;
85         struct obdo             *fa_oa;
86         obd_enqueue_update_f    fa_upcall;
87         void                    *fa_cookie;
88 };
89
90 struct osc_ladvise_args {
91         struct obdo             *la_oa;
92         obd_enqueue_update_f     la_upcall;
93         void                    *la_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export       *oa_exp;
98         enum ldlm_type          oa_type;
99         enum ldlm_mode          oa_mode;
100         __u64                   *oa_flags;
101         osc_enqueue_upcall_f    oa_upcall;
102         void                    *oa_cookie;
103         struct ost_lvb          *oa_lvb;
104         struct lustre_handle    oa_lockh;
105         unsigned int            oa_agl:1;
106 };
107
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110                          void *data, int rc);
111
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
113 {
114         struct ost_body *body;
115
116         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117         LASSERT(body);
118
119         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 }
121
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123                        struct obdo *oa)
124 {
125         struct ptlrpc_request   *req;
126         struct ost_body         *body;
127         int                      rc;
128
129         ENTRY;
130         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
131         if (req == NULL)
132                 RETURN(-ENOMEM);
133
134         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
135         if (rc) {
136                 ptlrpc_request_free(req);
137                 RETURN(rc);
138         }
139
140         osc_pack_req_body(req, oa);
141
142         ptlrpc_request_set_replen(req);
143
144         rc = ptlrpc_queue_wait(req);
145         if (rc)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body == NULL)
150                 GOTO(out, rc = -EPROTO);
151
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
154
155         oa->o_blksize = cli_brw_size(exp->exp_obd);
156         oa->o_valid |= OBD_MD_FLBLKSZ;
157
158         EXIT;
159 out:
160         ptlrpc_req_finished(req);
161
162         return rc;
163 }
164
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166                        struct obdo *oa)
167 {
168         struct ptlrpc_request   *req;
169         struct ost_body         *body;
170         int                      rc;
171
172         ENTRY;
173         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
180         if (rc) {
181                 ptlrpc_request_free(req);
182                 RETURN(rc);
183         }
184
185         osc_pack_req_body(req, oa);
186
187         ptlrpc_request_set_replen(req);
188
189         rc = ptlrpc_queue_wait(req);
190         if (rc)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
198
199         EXIT;
200 out:
201         ptlrpc_req_finished(req);
202
203         RETURN(rc);
204 }
205
206 static int osc_setattr_interpret(const struct lu_env *env,
207                                  struct ptlrpc_request *req,
208                                  struct osc_setattr_args *sa, int rc)
209 {
210         struct ost_body *body;
211         ENTRY;
212
213         if (rc != 0)
214                 GOTO(out, rc);
215
216         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         if (body == NULL)
218                 GOTO(out, rc = -EPROTO);
219
220         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221                              &body->oa);
222 out:
223         rc = sa->sa_upcall(sa->sa_cookie, rc);
224         RETURN(rc);
225 }
226
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228                       obd_enqueue_update_f upcall, void *cookie,
229                       struct ptlrpc_request_set *rqset)
230 {
231         struct ptlrpc_request   *req;
232         struct osc_setattr_args *sa;
233         int                      rc;
234
235         ENTRY;
236
237         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
242         if (rc) {
243                 ptlrpc_request_free(req);
244                 RETURN(rc);
245         }
246
247         osc_pack_req_body(req, oa);
248
249         ptlrpc_request_set_replen(req);
250
251         /* do mds to ost setattr asynchronously */
252         if (!rqset) {
253                 /* Do not wait for response. */
254                 ptlrpcd_add_req(req);
255         } else {
256                 req->rq_interpret_reply =
257                         (ptlrpc_interpterer_t)osc_setattr_interpret;
258
259                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260                 sa = ptlrpc_req_async_args(req);
261                 sa->sa_oa = oa;
262                 sa->sa_upcall = upcall;
263                 sa->sa_cookie = cookie;
264
265                 if (rqset == PTLRPCD_SET)
266                         ptlrpcd_add_req(req);
267                 else
268                         ptlrpc_set_add_req(rqset, req);
269         }
270
271         RETURN(0);
272 }
273
274 static int osc_ladvise_interpret(const struct lu_env *env,
275                                  struct ptlrpc_request *req,
276                                  void *arg, int rc)
277 {
278         struct osc_ladvise_args *la = arg;
279         struct ost_body *body;
280         ENTRY;
281
282         if (rc != 0)
283                 GOTO(out, rc);
284
285         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286         if (body == NULL)
287                 GOTO(out, rc = -EPROTO);
288
289         *la->la_oa = body->oa;
290 out:
291         rc = la->la_upcall(la->la_cookie, rc);
292         RETURN(rc);
293 }
294
295 /**
296  * If rqset is NULL, do not wait for response. Upcall and cookie could also
297  * be NULL in this case
298  */
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300                      struct ladvise_hdr *ladvise_hdr,
301                      obd_enqueue_update_f upcall, void *cookie,
302                      struct ptlrpc_request_set *rqset)
303 {
304         struct ptlrpc_request   *req;
305         struct ost_body         *body;
306         struct osc_ladvise_args *la;
307         int                      rc;
308         struct lu_ladvise       *req_ladvise;
309         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
310         int                      num_advise = ladvise_hdr->lah_count;
311         struct ladvise_hdr      *req_ladvise_hdr;
312         ENTRY;
313
314         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
315         if (req == NULL)
316                 RETURN(-ENOMEM);
317
318         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319                              num_advise * sizeof(*ladvise));
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
321         if (rc != 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325         req->rq_request_portal = OST_IO_PORTAL;
326         ptlrpc_at_set_req_timeout(req);
327
328         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
329         LASSERT(body);
330         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331                              oa);
332
333         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334                                                  &RMF_OST_LADVISE_HDR);
335         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
336
337         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339         ptlrpc_request_set_replen(req);
340
341         if (rqset == NULL) {
342                 /* Do not wait for response. */
343                 ptlrpcd_add_req(req);
344                 RETURN(0);
345         }
346
347         req->rq_interpret_reply = osc_ladvise_interpret;
348         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349         la = ptlrpc_req_async_args(req);
350         la->la_oa = oa;
351         la->la_upcall = upcall;
352         la->la_cookie = cookie;
353
354         if (rqset == PTLRPCD_SET)
355                 ptlrpcd_add_req(req);
356         else
357                 ptlrpc_set_add_req(rqset, req);
358
359         RETURN(0);
360 }
361
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363                       struct obdo *oa)
364 {
365         struct ptlrpc_request *req;
366         struct ost_body       *body;
367         int                    rc;
368         ENTRY;
369
370         LASSERT(oa != NULL);
371         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
375         if (req == NULL)
376                 GOTO(out, rc = -ENOMEM);
377
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 GOTO(out, rc);
382         }
383
384         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385         LASSERT(body);
386
387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
388
389         ptlrpc_request_set_replen(req);
390
391         rc = ptlrpc_queue_wait(req);
392         if (rc)
393                 GOTO(out_req, rc);
394
395         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
396         if (body == NULL)
397                 GOTO(out_req, rc = -EPROTO);
398
399         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
401
402         oa->o_blksize = cli_brw_size(exp->exp_obd);
403         oa->o_valid |= OBD_MD_FLBLKSZ;
404
405         CDEBUG(D_HA, "transno: %lld\n",
406                lustre_msg_get_transno(req->rq_repmsg));
407 out_req:
408         ptlrpc_req_finished(req);
409 out:
410         RETURN(rc);
411 }
412
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414                    obd_enqueue_update_f upcall, void *cookie,
415                    struct ptlrpc_request_set *rqset)
416 {
417         struct ptlrpc_request   *req;
418         struct osc_setattr_args *sa;
419         struct ost_body         *body;
420         int                      rc;
421         ENTRY;
422
423         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
424         if (req == NULL)
425                 RETURN(-ENOMEM);
426
427         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(rc);
431         }
432         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433         ptlrpc_at_set_req_timeout(req);
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
438
439         ptlrpc_request_set_replen(req);
440
441         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443         sa = ptlrpc_req_async_args(req);
444         sa->sa_oa = oa;
445         sa->sa_upcall = upcall;
446         sa->sa_cookie = cookie;
447         if (rqset == PTLRPCD_SET)
448                 ptlrpcd_add_req(req);
449         else
450                 ptlrpc_set_add_req(rqset, req);
451
452         RETURN(0);
453 }
454
455 static int osc_sync_interpret(const struct lu_env *env,
456                               struct ptlrpc_request *req,
457                               void *arg, int rc)
458 {
459         struct osc_fsync_args   *fa = arg;
460         struct ost_body         *body;
461         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
462         unsigned long           valid = 0;
463         struct cl_object        *obj;
464         ENTRY;
465
466         if (rc != 0)
467                 GOTO(out, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL) {
471                 CERROR("can't unpack ost_body\n");
472                 GOTO(out, rc = -EPROTO);
473         }
474
475         *fa->fa_oa = body->oa;
476         obj = osc2cl(fa->fa_obj);
477
478         /* Update osc object's blocks attribute */
479         cl_object_attr_lock(obj);
480         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481                 attr->cat_blocks = body->oa.o_blocks;
482                 valid |= CAT_BLOCKS;
483         }
484
485         if (valid != 0)
486                 cl_object_attr_update(env, obj, attr, valid);
487         cl_object_attr_unlock(obj);
488
489 out:
490         rc = fa->fa_upcall(fa->fa_cookie, rc);
491         RETURN(rc);
492 }
493
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495                   obd_enqueue_update_f upcall, void *cookie,
496                   struct ptlrpc_request_set *rqset)
497 {
498         struct obd_export     *exp = osc_export(obj);
499         struct ptlrpc_request *req;
500         struct ost_body       *body;
501         struct osc_fsync_args *fa;
502         int                    rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
506         if (req == NULL)
507                 RETURN(-ENOMEM);
508
509         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(rc);
513         }
514
515         /* overload the size and blocks fields in the oa with start/end */
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521         req->rq_interpret_reply = osc_sync_interpret;
522
523         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524         fa = ptlrpc_req_async_args(req);
525         fa->fa_obj = obj;
526         fa->fa_oa = oa;
527         fa->fa_upcall = upcall;
528         fa->fa_cookie = cookie;
529
530         if (rqset == PTLRPCD_SET)
531                 ptlrpcd_add_req(req);
532         else
533                 ptlrpc_set_add_req(rqset, req);
534
535         RETURN (0);
536 }
537
538 /* Find and cancel locally locks matched by @mode in the resource found by
539  * @objid. Found locks are added into @cancel list. Returns the amount of
540  * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542                                    struct list_head *cancels,
543                                    enum ldlm_mode mode, __u64 lock_flags)
544 {
545         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546         struct ldlm_res_id res_id;
547         struct ldlm_resource *res;
548         int count;
549         ENTRY;
550
551         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552          * export) but disabled through procfs (flag in NS).
553          *
554          * This distinguishes from a case when ELC is not supported originally,
555          * when we still want to cancel locks in advance and just cancel them
556          * locally, without sending any RPC. */
557         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558                 RETURN(0);
559
560         ostid_build_res_name(&oa->o_oi, &res_id);
561         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
562         if (IS_ERR(res))
563                 RETURN(0);
564
565         LDLM_RESOURCE_ADDREF(res);
566         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567                                            lock_flags, 0, NULL);
568         LDLM_RESOURCE_DELREF(res);
569         ldlm_resource_putref(res);
570         RETURN(count);
571 }
572
573 static int osc_destroy_interpret(const struct lu_env *env,
574                                  struct ptlrpc_request *req, void *data,
575                                  int rc)
576 {
577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
578
579         atomic_dec(&cli->cl_destroy_in_flight);
580         wake_up(&cli->cl_destroy_waitq);
581         return 0;
582 }
583
584 static int osc_can_send_destroy(struct client_obd *cli)
585 {
586         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587             cli->cl_max_rpcs_in_flight) {
588                 /* The destroy request can be sent */
589                 return 1;
590         }
591         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592             cli->cl_max_rpcs_in_flight) {
593                 /*
594                  * The counter has been modified between the two atomic
595                  * operations.
596                  */
597                 wake_up(&cli->cl_destroy_waitq);
598         }
599         return 0;
600 }
601
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603                        struct obdo *oa)
604 {
605         struct client_obd     *cli = &exp->exp_obd->u.cli;
606         struct ptlrpc_request *req;
607         struct ost_body       *body;
608         struct list_head       cancels = LIST_HEAD_INIT(cancels);
609         int rc, count;
610         ENTRY;
611
612         if (!oa) {
613                 CDEBUG(D_INFO, "oa NULL\n");
614                 RETURN(-EINVAL);
615         }
616
617         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618                                         LDLM_FL_DISCARD_DATA);
619
620         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
621         if (req == NULL) {
622                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
623                 RETURN(-ENOMEM);
624         }
625
626         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627                                0, &cancels, count);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 RETURN(rc);
631         }
632
633         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634         ptlrpc_at_set_req_timeout(req);
635
636         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
637         LASSERT(body);
638         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
639
640         ptlrpc_request_set_replen(req);
641
642         req->rq_interpret_reply = osc_destroy_interpret;
643         if (!osc_can_send_destroy(cli)) {
644                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645
646                 /*
647                  * Wait until the number of on-going destroy RPCs drops
648                  * under max_rpc_in_flight
649                  */
650                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
651                                             osc_can_send_destroy(cli), &lwi);
652                 if (rc) {
653                         ptlrpc_req_finished(req);
654                         RETURN(rc);
655                 }
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_SHIFT;
927         }
928
929         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
930                 u64 size;
931                 int chunk_mask;
932
933                 /* overhead for each extent insertion */
934                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
935                 /* determine the appropriate chunk size used by osc_extent. */
936                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
937                                           ocd->ocd_grant_blkbits);
938                 /* max_pages_per_rpc must be chunk aligned */
939                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
940                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
941                                              ~chunk_mask) & chunk_mask;
942                 /* determine maximum extent size, in #pages */
943                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
944                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
945                 if (cli->cl_max_extent_pages == 0)
946                         cli->cl_max_extent_pages = 1;
947         } else {
948                 cli->cl_grant_extent_tax = 0;
949                 cli->cl_chunkbits = PAGE_SHIFT;
950                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
951         }
952         spin_unlock(&cli->cl_loi_list_lock);
953
954         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
955                 "chunk bits: %d cl_max_extent_pages: %d\n",
956                 cli_name(cli),
957                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
958                 cli->cl_max_extent_pages);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, size_t page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            size_t page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1061                              struct brw_page **pga, int opc,
1062                              cksum_type_t cksum_type)
1063 {
1064         u32                             cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1069
1070         LASSERT(pg_count > 0);
1071
1072         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1073         if (IS_ERR(hdesc)) {
1074                 CERROR("Unable to initialize checksum hash %s\n",
1075                        cfs_crypto_hash_name(cfs_alg));
1076                 return PTR_ERR(hdesc);
1077         }
1078
1079         while (nob > 0 && pg_count > 0) {
1080                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1081
1082                 /* corrupt the data before we compute the checksum, to
1083                  * simulate an OST->client data error */
1084                 if (i == 0 && opc == OST_READ &&
1085                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1086                         unsigned char *ptr = kmap(pga[i]->pg);
1087                         int off = pga[i]->off & ~PAGE_MASK;
1088
1089                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1090                         kunmap(pga[i]->pg);
1091                 }
1092                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1093                                             pga[i]->off & ~PAGE_MASK,
1094                                             count);
1095                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1096                                (int)(pga[i]->off & ~PAGE_MASK));
1097
1098                 nob -= pga[i]->count;
1099                 pg_count--;
1100                 i++;
1101         }
1102
1103         bufsize = sizeof(cksum);
1104         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1105
1106         /* For sending we only compute the wrong checksum instead
1107          * of corrupting the data so it is still correct on a redo */
1108         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1109                 cksum++;
1110
1111         return cksum;
1112 }
1113
1114 static int
1115 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1116                      u32 page_count, struct brw_page **pga,
1117                      struct ptlrpc_request **reqp, int resend)
1118 {
1119         struct ptlrpc_request   *req;
1120         struct ptlrpc_bulk_desc *desc;
1121         struct ost_body         *body;
1122         struct obd_ioobj        *ioobj;
1123         struct niobuf_remote    *niobuf;
1124         int niocount, i, requested_nob, opc, rc;
1125         struct osc_brw_async_args *aa;
1126         struct req_capsule      *pill;
1127         struct brw_page *pg_prev;
1128
1129         ENTRY;
1130         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1131                 RETURN(-ENOMEM); /* Recoverable */
1132         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1133                 RETURN(-EINVAL); /* Fatal */
1134
1135         if ((cmd & OBD_BRW_WRITE) != 0) {
1136                 opc = OST_WRITE;
1137                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1138                                                 osc_rq_pool,
1139                                                 &RQF_OST_BRW_WRITE);
1140         } else {
1141                 opc = OST_READ;
1142                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1143         }
1144         if (req == NULL)
1145                 RETURN(-ENOMEM);
1146
1147         for (niocount = i = 1; i < page_count; i++) {
1148                 if (!can_merge_pages(pga[i - 1], pga[i]))
1149                         niocount++;
1150         }
1151
1152         pill = &req->rq_pill;
1153         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1154                              sizeof(*ioobj));
1155         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1156                              niocount * sizeof(*niobuf));
1157
1158         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1159         if (rc) {
1160                 ptlrpc_request_free(req);
1161                 RETURN(rc);
1162         }
1163         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1164         ptlrpc_at_set_req_timeout(req);
1165         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1166          * retry logic */
1167         req->rq_no_retry_einprogress = 1;
1168
1169         desc = ptlrpc_prep_bulk_imp(req, page_count,
1170                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1171                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1172                         PTLRPC_BULK_PUT_SINK) |
1173                         PTLRPC_BULK_BUF_KIOV,
1174                 OST_BULK_PORTAL,
1175                 &ptlrpc_bulk_kiov_pin_ops);
1176
1177         if (desc == NULL)
1178                 GOTO(out, rc = -ENOMEM);
1179         /* NB request now owns desc and will free it when it gets freed */
1180
1181         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1182         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1183         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1184         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1185
1186         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1187
1188         obdo_to_ioobj(oa, ioobj);
1189         ioobj->ioo_bufcnt = niocount;
1190         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1191          * that might be send for this request.  The actual number is decided
1192          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1193          * "max - 1" for old client compatibility sending "0", and also so the
1194          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1195         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1196         LASSERT(page_count > 0);
1197         pg_prev = pga[0];
1198         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1199                 struct brw_page *pg = pga[i];
1200                 int poff = pg->off & ~PAGE_MASK;
1201
1202                 LASSERT(pg->count > 0);
1203                 /* make sure there is no gap in the middle of page array */
1204                 LASSERTF(page_count == 1 ||
1205                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1206                           ergo(i > 0 && i < page_count - 1,
1207                                poff == 0 && pg->count == PAGE_SIZE)   &&
1208                           ergo(i == page_count - 1, poff == 0)),
1209                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1210                          i, page_count, pg, pg->off, pg->count);
1211                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1212                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1213                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1214                          i, page_count,
1215                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1216                          pg_prev->pg, page_private(pg_prev->pg),
1217                          pg_prev->pg->index, pg_prev->off);
1218                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1219                         (pg->flag & OBD_BRW_SRVLOCK));
1220
1221                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1222                 requested_nob += pg->count;
1223
1224                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1225                         niobuf--;
1226                         niobuf->rnb_len += pg->count;
1227                 } else {
1228                         niobuf->rnb_offset = pg->off;
1229                         niobuf->rnb_len    = pg->count;
1230                         niobuf->rnb_flags  = pg->flag;
1231                 }
1232                 pg_prev = pg;
1233         }
1234
1235         LASSERTF((void *)(niobuf - niocount) ==
1236                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1237                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1238                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1239
1240         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1241         if (resend) {
1242                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1243                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1244                         body->oa.o_flags = 0;
1245                 }
1246                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1247         }
1248
1249         if (osc_should_shrink_grant(cli))
1250                 osc_shrink_grant_local(cli, &body->oa);
1251
1252         /* size[REQ_REC_OFF] still sizeof (*body) */
1253         if (opc == OST_WRITE) {
1254                 if (cli->cl_checksum &&
1255                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1256                         /* store cl_cksum_type in a local variable since
1257                          * it can be changed via lprocfs */
1258                         cksum_type_t cksum_type = cli->cl_cksum_type;
1259
1260                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1261                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1262                                 body->oa.o_flags = 0;
1263                         }
1264                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1265                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1266                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1267                                                              page_count, pga,
1268                                                              OST_WRITE,
1269                                                              cksum_type);
1270                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1271                                body->oa.o_cksum);
1272                         /* save this in 'oa', too, for later checking */
1273                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1274                         oa->o_flags |= cksum_type_pack(cksum_type);
1275                 } else {
1276                         /* clear out the checksum flag, in case this is a
1277                          * resend but cl_checksum is no longer set. b=11238 */
1278                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1279                 }
1280                 oa->o_cksum = body->oa.o_cksum;
1281                 /* 1 RC per niobuf */
1282                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1283                                      sizeof(__u32) * niocount);
1284         } else {
1285                 if (cli->cl_checksum &&
1286                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1287                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1288                                 body->oa.o_flags = 0;
1289                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1290                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1291                 }
1292         }
1293         ptlrpc_request_set_replen(req);
1294
1295         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1296         aa = ptlrpc_req_async_args(req);
1297         aa->aa_oa = oa;
1298         aa->aa_requested_nob = requested_nob;
1299         aa->aa_nio_count = niocount;
1300         aa->aa_page_count = page_count;
1301         aa->aa_resends = 0;
1302         aa->aa_ppga = pga;
1303         aa->aa_cli = cli;
1304         INIT_LIST_HEAD(&aa->aa_oaps);
1305
1306         *reqp = req;
1307         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1308         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1309                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1310                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1311         RETURN(0);
1312
1313  out:
1314         ptlrpc_req_finished(req);
1315         RETURN(rc);
1316 }
1317
1318 static int
1319 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1320                      __u32 client_cksum, __u32 server_cksum, int nob,
1321                      size_t page_count, struct brw_page **pga,
1322                      cksum_type_t client_cksum_type)
1323 {
1324         __u32 new_cksum;
1325         char *msg;
1326         cksum_type_t cksum_type;
1327
1328         if (server_cksum == client_cksum) {
1329                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1330                 return 0;
1331         }
1332
1333         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1334                                        oa->o_flags : 0);
1335         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1336                                       cksum_type);
1337
1338         if (cksum_type != client_cksum_type)
1339                 msg = "the server did not use the checksum type specified in "
1340                       "the original request - likely a protocol problem";
1341         else if (new_cksum == server_cksum)
1342                 msg = "changed on the client after we checksummed it - "
1343                       "likely false positive due to mmap IO (bug 11742)";
1344         else if (new_cksum == client_cksum)
1345                 msg = "changed in transit before arrival at OST";
1346         else
1347                 msg = "changed in transit AND doesn't match the original - "
1348                       "likely false positive due to mmap IO (bug 11742)";
1349
1350         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1351                            " object "DOSTID" extent [%llu-%llu]\n",
1352                            msg, libcfs_nid2str(peer->nid),
1353                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1354                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1355                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1356                            POSTID(&oa->o_oi), pga[0]->off,
1357                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1358         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1359                "client csum now %x\n", client_cksum, client_cksum_type,
1360                server_cksum, cksum_type, new_cksum);
1361         return 1;
1362 }
1363
1364 /* Note rc enters this function as number of bytes transferred */
1365 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1366 {
1367         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1368         const struct lnet_process_id *peer =
1369                         &req->rq_import->imp_connection->c_peer;
1370         struct client_obd *cli = aa->aa_cli;
1371         struct ost_body *body;
1372         u32 client_cksum = 0;
1373         ENTRY;
1374
1375         if (rc < 0 && rc != -EDQUOT) {
1376                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1377                 RETURN(rc);
1378         }
1379
1380         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1381         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1382         if (body == NULL) {
1383                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1384                 RETURN(-EPROTO);
1385         }
1386
1387         /* set/clear over quota flag for a uid/gid/projid */
1388         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1389             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1390                 unsigned qid[LL_MAXQUOTAS] =
1391                                         { body->oa.o_uid, body->oa.o_gid,
1392                                          body->oa.o_projid };
1393
1394                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1395                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1396                        body->oa.o_valid, body->oa.o_flags);
1397                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1398         }
1399
1400         osc_update_grant(cli, body);
1401
1402         if (rc < 0)
1403                 RETURN(rc);
1404
1405         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1406                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1407
1408         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1409                 if (rc > 0) {
1410                         CERROR("Unexpected +ve rc %d\n", rc);
1411                         RETURN(-EPROTO);
1412                 }
1413                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1414
1415                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1416                         RETURN(-EAGAIN);
1417
1418                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1419                     check_write_checksum(&body->oa, peer, client_cksum,
1420                                          body->oa.o_cksum, aa->aa_requested_nob,
1421                                          aa->aa_page_count, aa->aa_ppga,
1422                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1423                         RETURN(-EAGAIN);
1424
1425                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1426                                      aa->aa_page_count, aa->aa_ppga);
1427                 GOTO(out, rc);
1428         }
1429
1430         /* The rest of this function executes only for OST_READs */
1431
1432         /* if unwrap_bulk failed, return -EAGAIN to retry */
1433         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1434         if (rc < 0)
1435                 GOTO(out, rc = -EAGAIN);
1436
1437         if (rc > aa->aa_requested_nob) {
1438                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1439                        aa->aa_requested_nob);
1440                 RETURN(-EPROTO);
1441         }
1442
1443         if (rc != req->rq_bulk->bd_nob_transferred) {
1444                 CERROR ("Unexpected rc %d (%d transferred)\n",
1445                         rc, req->rq_bulk->bd_nob_transferred);
1446                 return (-EPROTO);
1447         }
1448
1449         if (rc < aa->aa_requested_nob)
1450                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1451
1452         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1453                 static int cksum_counter;
1454                 u32        server_cksum = body->oa.o_cksum;
1455                 char      *via = "";
1456                 char      *router = "";
1457                 cksum_type_t cksum_type;
1458
1459                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1460                                                body->oa.o_flags : 0);
1461                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1462                                                  aa->aa_ppga, OST_READ,
1463                                                  cksum_type);
1464
1465                 if (peer->nid != req->rq_bulk->bd_sender) {
1466                         via = " via ";
1467                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1468                 }
1469
1470                 if (server_cksum != client_cksum) {
1471                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1472                                            "%s%s%s inode "DFID" object "DOSTID
1473                                            " extent [%llu-%llu]\n",
1474                                            req->rq_import->imp_obd->obd_name,
1475                                            libcfs_nid2str(peer->nid),
1476                                            via, router,
1477                                            body->oa.o_valid & OBD_MD_FLFID ?
1478                                                 body->oa.o_parent_seq : (__u64)0,
1479                                            body->oa.o_valid & OBD_MD_FLFID ?
1480                                                 body->oa.o_parent_oid : 0,
1481                                            body->oa.o_valid & OBD_MD_FLFID ?
1482                                                 body->oa.o_parent_ver : 0,
1483                                            POSTID(&body->oa.o_oi),
1484                                            aa->aa_ppga[0]->off,
1485                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1486                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1487                                                                         1);
1488                         CERROR("client %x, server %x, cksum_type %x\n",
1489                                client_cksum, server_cksum, cksum_type);
1490                         cksum_counter = 0;
1491                         aa->aa_oa->o_cksum = client_cksum;
1492                         rc = -EAGAIN;
1493                 } else {
1494                         cksum_counter++;
1495                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1496                         rc = 0;
1497                 }
1498         } else if (unlikely(client_cksum)) {
1499                 static int cksum_missed;
1500
1501                 cksum_missed++;
1502                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1503                         CERROR("Checksum %u requested from %s but not sent\n",
1504                                cksum_missed, libcfs_nid2str(peer->nid));
1505         } else {
1506                 rc = 0;
1507         }
1508 out:
1509         if (rc >= 0)
1510                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1511                                      aa->aa_oa, &body->oa);
1512
1513         RETURN(rc);
1514 }
1515
1516 static int osc_brw_redo_request(struct ptlrpc_request *request,
1517                                 struct osc_brw_async_args *aa, int rc)
1518 {
1519         struct ptlrpc_request *new_req;
1520         struct osc_brw_async_args *new_aa;
1521         struct osc_async_page *oap;
1522         ENTRY;
1523
1524         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1525                   "redo for recoverable error %d", rc);
1526
1527         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1528                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1529                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1530                                   aa->aa_ppga, &new_req, 1);
1531         if (rc)
1532                 RETURN(rc);
1533
1534         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1535                 if (oap->oap_request != NULL) {
1536                         LASSERTF(request == oap->oap_request,
1537                                  "request %p != oap_request %p\n",
1538                                  request, oap->oap_request);
1539                         if (oap->oap_interrupted) {
1540                                 ptlrpc_req_finished(new_req);
1541                                 RETURN(-EINTR);
1542                         }
1543                 }
1544         }
1545         /* New request takes over pga and oaps from old request.
1546          * Note that copying a list_head doesn't work, need to move it... */
1547         aa->aa_resends++;
1548         new_req->rq_interpret_reply = request->rq_interpret_reply;
1549         new_req->rq_async_args = request->rq_async_args;
1550         new_req->rq_commit_cb = request->rq_commit_cb;
1551         /* cap resend delay to the current request timeout, this is similar to
1552          * what ptlrpc does (see after_reply()) */
1553         if (aa->aa_resends > new_req->rq_timeout)
1554                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1555         else
1556                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1557         new_req->rq_generation_set = 1;
1558         new_req->rq_import_generation = request->rq_import_generation;
1559
1560         new_aa = ptlrpc_req_async_args(new_req);
1561
1562         INIT_LIST_HEAD(&new_aa->aa_oaps);
1563         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1564         INIT_LIST_HEAD(&new_aa->aa_exts);
1565         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1566         new_aa->aa_resends = aa->aa_resends;
1567
1568         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1569                 if (oap->oap_request) {
1570                         ptlrpc_req_finished(oap->oap_request);
1571                         oap->oap_request = ptlrpc_request_addref(new_req);
1572                 }
1573         }
1574
1575         /* XXX: This code will run into problem if we're going to support
1576          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1577          * and wait for all of them to be finished. We should inherit request
1578          * set from old request. */
1579         ptlrpcd_add_req(new_req);
1580
1581         DEBUG_REQ(D_INFO, new_req, "new request");
1582         RETURN(0);
1583 }
1584
1585 /*
1586  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1587  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1588  * fine for our small page arrays and doesn't require allocation.  its an
1589  * insertion sort that swaps elements that are strides apart, shrinking the
1590  * stride down until its '1' and the array is sorted.
1591  */
1592 static void sort_brw_pages(struct brw_page **array, int num)
1593 {
1594         int stride, i, j;
1595         struct brw_page *tmp;
1596
1597         if (num == 1)
1598                 return;
1599         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1600                 ;
1601
1602         do {
1603                 stride /= 3;
1604                 for (i = stride ; i < num ; i++) {
1605                         tmp = array[i];
1606                         j = i;
1607                         while (j >= stride && array[j - stride]->off > tmp->off) {
1608                                 array[j] = array[j - stride];
1609                                 j -= stride;
1610                         }
1611                         array[j] = tmp;
1612                 }
1613         } while (stride > 1);
1614 }
1615
1616 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1617 {
1618         LASSERT(ppga != NULL);
1619         OBD_FREE(ppga, sizeof(*ppga) * count);
1620 }
1621
1622 static int brw_interpret(const struct lu_env *env,
1623                          struct ptlrpc_request *req, void *data, int rc)
1624 {
1625         struct osc_brw_async_args *aa = data;
1626         struct osc_extent *ext;
1627         struct osc_extent *tmp;
1628         struct client_obd *cli = aa->aa_cli;
1629         ENTRY;
1630
1631         rc = osc_brw_fini_request(req, rc);
1632         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1633         /* When server return -EINPROGRESS, client should always retry
1634          * regardless of the number of times the bulk was resent already. */
1635         if (osc_recoverable_error(rc)) {
1636                 if (req->rq_import_generation !=
1637                     req->rq_import->imp_generation) {
1638                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1639                                ""DOSTID", rc = %d.\n",
1640                                req->rq_import->imp_obd->obd_name,
1641                                POSTID(&aa->aa_oa->o_oi), rc);
1642                 } else if (rc == -EINPROGRESS ||
1643                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1644                         rc = osc_brw_redo_request(req, aa, rc);
1645                 } else {
1646                         CERROR("%s: too many resent retries for object: "
1647                                "%llu:%llu, rc = %d.\n",
1648                                req->rq_import->imp_obd->obd_name,
1649                                POSTID(&aa->aa_oa->o_oi), rc);
1650                 }
1651
1652                 if (rc == 0)
1653                         RETURN(0);
1654                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1655                         rc = -EIO;
1656         }
1657
1658         if (rc == 0) {
1659                 struct obdo *oa = aa->aa_oa;
1660                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1661                 unsigned long valid = 0;
1662                 struct cl_object *obj;
1663                 struct osc_async_page *last;
1664
1665                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1666                 obj = osc2cl(last->oap_obj);
1667
1668                 cl_object_attr_lock(obj);
1669                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1670                         attr->cat_blocks = oa->o_blocks;
1671                         valid |= CAT_BLOCKS;
1672                 }
1673                 if (oa->o_valid & OBD_MD_FLMTIME) {
1674                         attr->cat_mtime = oa->o_mtime;
1675                         valid |= CAT_MTIME;
1676                 }
1677                 if (oa->o_valid & OBD_MD_FLATIME) {
1678                         attr->cat_atime = oa->o_atime;
1679                         valid |= CAT_ATIME;
1680                 }
1681                 if (oa->o_valid & OBD_MD_FLCTIME) {
1682                         attr->cat_ctime = oa->o_ctime;
1683                         valid |= CAT_CTIME;
1684                 }
1685
1686                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1687                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1688                         loff_t last_off = last->oap_count + last->oap_obj_off +
1689                                 last->oap_page_off;
1690
1691                         /* Change file size if this is an out of quota or
1692                          * direct IO write and it extends the file size */
1693                         if (loi->loi_lvb.lvb_size < last_off) {
1694                                 attr->cat_size = last_off;
1695                                 valid |= CAT_SIZE;
1696                         }
1697                         /* Extend KMS if it's not a lockless write */
1698                         if (loi->loi_kms < last_off &&
1699                             oap2osc_page(last)->ops_srvlock == 0) {
1700                                 attr->cat_kms = last_off;
1701                                 valid |= CAT_KMS;
1702                         }
1703                 }
1704
1705                 if (valid != 0)
1706                         cl_object_attr_update(env, obj, attr, valid);
1707                 cl_object_attr_unlock(obj);
1708         }
1709         OBDO_FREE(aa->aa_oa);
1710
1711         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1712                 osc_inc_unstable_pages(req);
1713
1714         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1715                 list_del_init(&ext->oe_link);
1716                 osc_extent_finish(env, ext, 1, rc);
1717         }
1718         LASSERT(list_empty(&aa->aa_exts));
1719         LASSERT(list_empty(&aa->aa_oaps));
1720
1721         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1722         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1723
1724         spin_lock(&cli->cl_loi_list_lock);
1725         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1726          * is called so we know whether to go to sync BRWs or wait for more
1727          * RPCs to complete */
1728         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1729                 cli->cl_w_in_flight--;
1730         else
1731                 cli->cl_r_in_flight--;
1732         osc_wake_cache_waiters(cli);
1733         spin_unlock(&cli->cl_loi_list_lock);
1734
1735         osc_io_unplug(env, cli, NULL);
1736         RETURN(rc);
1737 }
1738
1739 static void brw_commit(struct ptlrpc_request *req)
1740 {
1741         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1742          * this called via the rq_commit_cb, I need to ensure
1743          * osc_dec_unstable_pages is still called. Otherwise unstable
1744          * pages may be leaked. */
1745         spin_lock(&req->rq_lock);
1746         if (likely(req->rq_unstable)) {
1747                 req->rq_unstable = 0;
1748                 spin_unlock(&req->rq_lock);
1749
1750                 osc_dec_unstable_pages(req);
1751         } else {
1752                 req->rq_committed = 1;
1753                 spin_unlock(&req->rq_lock);
1754         }
1755 }
1756
1757 /**
1758  * Build an RPC by the list of extent @ext_list. The caller must ensure
1759  * that the total pages in this list are NOT over max pages per RPC.
1760  * Extents in the list must be in OES_RPC state.
1761  */
1762 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1763                   struct list_head *ext_list, int cmd)
1764 {
1765         struct ptlrpc_request           *req = NULL;
1766         struct osc_extent               *ext;
1767         struct brw_page                 **pga = NULL;
1768         struct osc_brw_async_args       *aa = NULL;
1769         struct obdo                     *oa = NULL;
1770         struct osc_async_page           *oap;
1771         struct osc_object               *obj = NULL;
1772         struct cl_req_attr              *crattr = NULL;
1773         loff_t                          starting_offset = OBD_OBJECT_EOF;
1774         loff_t                          ending_offset = 0;
1775         int                             mpflag = 0;
1776         int                             mem_tight = 0;
1777         int                             page_count = 0;
1778         bool                            soft_sync = false;
1779         bool                            interrupted = false;
1780         int                             i;
1781         int                             grant = 0;
1782         int                             rc;
1783         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1784         struct ost_body                 *body;
1785         ENTRY;
1786         LASSERT(!list_empty(ext_list));
1787
1788         /* add pages into rpc_list to build BRW rpc */
1789         list_for_each_entry(ext, ext_list, oe_link) {
1790                 LASSERT(ext->oe_state == OES_RPC);
1791                 mem_tight |= ext->oe_memalloc;
1792                 grant += ext->oe_grants;
1793                 page_count += ext->oe_nr_pages;
1794                 if (obj == NULL)
1795                         obj = ext->oe_obj;
1796         }
1797
1798         soft_sync = osc_over_unstable_soft_limit(cli);
1799         if (mem_tight)
1800                 mpflag = cfs_memory_pressure_get_and_set();
1801
1802         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1803         if (pga == NULL)
1804                 GOTO(out, rc = -ENOMEM);
1805
1806         OBDO_ALLOC(oa);
1807         if (oa == NULL)
1808                 GOTO(out, rc = -ENOMEM);
1809
1810         i = 0;
1811         list_for_each_entry(ext, ext_list, oe_link) {
1812                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1813                         if (mem_tight)
1814                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1815                         if (soft_sync)
1816                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1817                         pga[i] = &oap->oap_brw_page;
1818                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1819                         i++;
1820
1821                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1822                         if (starting_offset == OBD_OBJECT_EOF ||
1823                             starting_offset > oap->oap_obj_off)
1824                                 starting_offset = oap->oap_obj_off;
1825                         else
1826                                 LASSERT(oap->oap_page_off == 0);
1827                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1828                                 ending_offset = oap->oap_obj_off +
1829                                                 oap->oap_count;
1830                         else
1831                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1832                                         PAGE_SIZE);
1833                         if (oap->oap_interrupted)
1834                                 interrupted = true;
1835                 }
1836         }
1837
1838         /* first page in the list */
1839         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1840
1841         crattr = &osc_env_info(env)->oti_req_attr;
1842         memset(crattr, 0, sizeof(*crattr));
1843         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1844         crattr->cra_flags = ~0ULL;
1845         crattr->cra_page = oap2cl_page(oap);
1846         crattr->cra_oa = oa;
1847         cl_req_attr_set(env, osc2cl(obj), crattr);
1848
1849         if (cmd == OBD_BRW_WRITE)
1850                 oa->o_grant_used = grant;
1851
1852         sort_brw_pages(pga, page_count);
1853         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1854         if (rc != 0) {
1855                 CERROR("prep_req failed: %d\n", rc);
1856                 GOTO(out, rc);
1857         }
1858
1859         req->rq_commit_cb = brw_commit;
1860         req->rq_interpret_reply = brw_interpret;
1861         req->rq_memalloc = mem_tight != 0;
1862         oap->oap_request = ptlrpc_request_addref(req);
1863         if (interrupted && !req->rq_intr)
1864                 ptlrpc_mark_interrupted(req);
1865
1866         /* Need to update the timestamps after the request is built in case
1867          * we race with setattr (locally or in queue at OST).  If OST gets
1868          * later setattr before earlier BRW (as determined by the request xid),
1869          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1870          * way to do this in a single call.  bug 10150 */
1871         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1872         crattr->cra_oa = &body->oa;
1873         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1874         cl_req_attr_set(env, osc2cl(obj), crattr);
1875         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1876
1877         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1878         aa = ptlrpc_req_async_args(req);
1879         INIT_LIST_HEAD(&aa->aa_oaps);
1880         list_splice_init(&rpc_list, &aa->aa_oaps);
1881         INIT_LIST_HEAD(&aa->aa_exts);
1882         list_splice_init(ext_list, &aa->aa_exts);
1883
1884         spin_lock(&cli->cl_loi_list_lock);
1885         starting_offset >>= PAGE_SHIFT;
1886         if (cmd == OBD_BRW_READ) {
1887                 cli->cl_r_in_flight++;
1888                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1889                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1890                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1891                                       starting_offset + 1);
1892         } else {
1893                 cli->cl_w_in_flight++;
1894                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1895                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1896                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1897                                       starting_offset + 1);
1898         }
1899         spin_unlock(&cli->cl_loi_list_lock);
1900
1901         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1902                   page_count, aa, cli->cl_r_in_flight,
1903                   cli->cl_w_in_flight);
1904         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1905
1906         ptlrpcd_add_req(req);
1907         rc = 0;
1908         EXIT;
1909
1910 out:
1911         if (mem_tight != 0)
1912                 cfs_memory_pressure_restore(mpflag);
1913
1914         if (rc != 0) {
1915                 LASSERT(req == NULL);
1916
1917                 if (oa)
1918                         OBDO_FREE(oa);
1919                 if (pga)
1920                         OBD_FREE(pga, sizeof(*pga) * page_count);
1921                 /* this should happen rarely and is pretty bad, it makes the
1922                  * pending list not follow the dirty order */
1923                 while (!list_empty(ext_list)) {
1924                         ext = list_entry(ext_list->next, struct osc_extent,
1925                                          oe_link);
1926                         list_del_init(&ext->oe_link);
1927                         osc_extent_finish(env, ext, 0, rc);
1928                 }
1929         }
1930         RETURN(rc);
1931 }
1932
1933 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1934 {
1935         int set = 0;
1936
1937         LASSERT(lock != NULL);
1938
1939         lock_res_and_lock(lock);
1940
1941         if (lock->l_ast_data == NULL)
1942                 lock->l_ast_data = data;
1943         if (lock->l_ast_data == data)
1944                 set = 1;
1945
1946         unlock_res_and_lock(lock);
1947
1948         return set;
1949 }
1950
1951 static int osc_enqueue_fini(struct ptlrpc_request *req,
1952                             osc_enqueue_upcall_f upcall, void *cookie,
1953                             struct lustre_handle *lockh, enum ldlm_mode mode,
1954                             __u64 *flags, int agl, int errcode)
1955 {
1956         bool intent = *flags & LDLM_FL_HAS_INTENT;
1957         int rc;
1958         ENTRY;
1959
1960         /* The request was created before ldlm_cli_enqueue call. */
1961         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1962                 struct ldlm_reply *rep;
1963
1964                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1965                 LASSERT(rep != NULL);
1966
1967                 rep->lock_policy_res1 =
1968                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1969                 if (rep->lock_policy_res1)
1970                         errcode = rep->lock_policy_res1;
1971                 if (!agl)
1972                         *flags |= LDLM_FL_LVB_READY;
1973         } else if (errcode == ELDLM_OK) {
1974                 *flags |= LDLM_FL_LVB_READY;
1975         }
1976
1977         /* Call the update callback. */
1978         rc = (*upcall)(cookie, lockh, errcode);
1979
1980         /* release the reference taken in ldlm_cli_enqueue() */
1981         if (errcode == ELDLM_LOCK_MATCHED)
1982                 errcode = ELDLM_OK;
1983         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1984                 ldlm_lock_decref(lockh, mode);
1985
1986         RETURN(rc);
1987 }
1988
1989 static int osc_enqueue_interpret(const struct lu_env *env,
1990                                  struct ptlrpc_request *req,
1991                                  struct osc_enqueue_args *aa, int rc)
1992 {
1993         struct ldlm_lock *lock;
1994         struct lustre_handle *lockh = &aa->oa_lockh;
1995         enum ldlm_mode mode = aa->oa_mode;
1996         struct ost_lvb *lvb = aa->oa_lvb;
1997         __u32 lvb_len = sizeof(*lvb);
1998         __u64 flags = 0;
1999
2000         ENTRY;
2001
2002         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2003          * be valid. */
2004         lock = ldlm_handle2lock(lockh);
2005         LASSERTF(lock != NULL,
2006                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2007                  lockh->cookie, req, aa);
2008
2009         /* Take an additional reference so that a blocking AST that
2010          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2011          * to arrive after an upcall has been executed by
2012          * osc_enqueue_fini(). */
2013         ldlm_lock_addref(lockh, mode);
2014
2015         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2016         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2017
2018         /* Let CP AST to grant the lock first. */
2019         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2020
2021         if (aa->oa_agl) {
2022                 LASSERT(aa->oa_lvb == NULL);
2023                 LASSERT(aa->oa_flags == NULL);
2024                 aa->oa_flags = &flags;
2025         }
2026
2027         /* Complete obtaining the lock procedure. */
2028         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2029                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2030                                    lockh, rc);
2031         /* Complete osc stuff. */
2032         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2033                               aa->oa_flags, aa->oa_agl, rc);
2034
2035         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2036
2037         ldlm_lock_decref(lockh, mode);
2038         LDLM_LOCK_PUT(lock);
2039         RETURN(rc);
2040 }
2041
2042 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2043
2044 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2045  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2046  * other synchronous requests, however keeping some locks and trying to obtain
2047  * others may take a considerable amount of time in a case of ost failure; and
2048  * when other sync requests do not get released lock from a client, the client
2049  * is evicted from the cluster -- such scenarious make the life difficult, so
2050  * release locks just after they are obtained. */
2051 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2052                      __u64 *flags, union ldlm_policy_data *policy,
2053                      struct ost_lvb *lvb, int kms_valid,
2054                      osc_enqueue_upcall_f upcall, void *cookie,
2055                      struct ldlm_enqueue_info *einfo,
2056                      struct ptlrpc_request_set *rqset, int async, int agl)
2057 {
2058         struct obd_device *obd = exp->exp_obd;
2059         struct lustre_handle lockh = { 0 };
2060         struct ptlrpc_request *req = NULL;
2061         int intent = *flags & LDLM_FL_HAS_INTENT;
2062         __u64 match_flags = *flags;
2063         enum ldlm_mode mode;
2064         int rc;
2065         ENTRY;
2066
2067         /* Filesystem lock extents are extended to page boundaries so that
2068          * dealing with the page cache is a little smoother.  */
2069         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2070         policy->l_extent.end |= ~PAGE_MASK;
2071
2072         /*
2073          * kms is not valid when either object is completely fresh (so that no
2074          * locks are cached), or object was evicted. In the latter case cached
2075          * lock cannot be used, because it would prime inode state with
2076          * potentially stale LVB.
2077          */
2078         if (!kms_valid)
2079                 goto no_match;
2080
2081         /* Next, search for already existing extent locks that will cover us */
2082         /* If we're trying to read, we also search for an existing PW lock.  The
2083          * VFS and page cache already protect us locally, so lots of readers/
2084          * writers can share a single PW lock.
2085          *
2086          * There are problems with conversion deadlocks, so instead of
2087          * converting a read lock to a write lock, we'll just enqueue a new
2088          * one.
2089          *
2090          * At some point we should cancel the read lock instead of making them
2091          * send us a blocking callback, but there are problems with canceling
2092          * locks out from other users right now, too. */
2093         mode = einfo->ei_mode;
2094         if (einfo->ei_mode == LCK_PR)
2095                 mode |= LCK_PW;
2096         if (agl == 0)
2097                 match_flags |= LDLM_FL_LVB_READY;
2098         if (intent != 0)
2099                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2100         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2101                                einfo->ei_type, policy, mode, &lockh, 0);
2102         if (mode) {
2103                 struct ldlm_lock *matched;
2104
2105                 if (*flags & LDLM_FL_TEST_LOCK)
2106                         RETURN(ELDLM_OK);
2107
2108                 matched = ldlm_handle2lock(&lockh);
2109                 if (agl) {
2110                         /* AGL enqueues DLM locks speculatively. Therefore if
2111                          * it already exists a DLM lock, it wll just inform the
2112                          * caller to cancel the AGL process for this stripe. */
2113                         ldlm_lock_decref(&lockh, mode);
2114                         LDLM_LOCK_PUT(matched);
2115                         RETURN(-ECANCELED);
2116                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2117                         *flags |= LDLM_FL_LVB_READY;
2118
2119                         /* We already have a lock, and it's referenced. */
2120                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2121
2122                         ldlm_lock_decref(&lockh, mode);
2123                         LDLM_LOCK_PUT(matched);
2124                         RETURN(ELDLM_OK);
2125                 } else {
2126                         ldlm_lock_decref(&lockh, mode);
2127                         LDLM_LOCK_PUT(matched);
2128                 }
2129         }
2130
2131 no_match:
2132         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2133                 RETURN(-ENOLCK);
2134
2135         if (intent) {
2136                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2137                                            &RQF_LDLM_ENQUEUE_LVB);
2138                 if (req == NULL)
2139                         RETURN(-ENOMEM);
2140
2141                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2142                 if (rc) {
2143                         ptlrpc_request_free(req);
2144                         RETURN(rc);
2145                 }
2146
2147                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2148                                      sizeof *lvb);
2149                 ptlrpc_request_set_replen(req);
2150         }
2151
2152         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2153         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2154
2155         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2156                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2157         if (async) {
2158                 if (!rc) {
2159                         struct osc_enqueue_args *aa;
2160                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2161                         aa = ptlrpc_req_async_args(req);
2162                         aa->oa_exp    = exp;
2163                         aa->oa_mode   = einfo->ei_mode;
2164                         aa->oa_type   = einfo->ei_type;
2165                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2166                         aa->oa_upcall = upcall;
2167                         aa->oa_cookie = cookie;
2168                         aa->oa_agl    = !!agl;
2169                         if (!agl) {
2170                                 aa->oa_flags  = flags;
2171                                 aa->oa_lvb    = lvb;
2172                         } else {
2173                                 /* AGL is essentially to enqueue an DLM lock
2174                                  * in advance, so we don't care about the
2175                                  * result of AGL enqueue. */
2176                                 aa->oa_lvb    = NULL;
2177                                 aa->oa_flags  = NULL;
2178                         }
2179
2180                         req->rq_interpret_reply =
2181                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2182                         if (rqset == PTLRPCD_SET)
2183                                 ptlrpcd_add_req(req);
2184                         else
2185                                 ptlrpc_set_add_req(rqset, req);
2186                 } else if (intent) {
2187                         ptlrpc_req_finished(req);
2188                 }
2189                 RETURN(rc);
2190         }
2191
2192         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2193                               flags, agl, rc);
2194         if (intent)
2195                 ptlrpc_req_finished(req);
2196
2197         RETURN(rc);
2198 }
2199
2200 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2201                    enum ldlm_type type, union ldlm_policy_data *policy,
2202                    enum ldlm_mode mode, __u64 *flags, void *data,
2203                    struct lustre_handle *lockh, int unref)
2204 {
2205         struct obd_device *obd = exp->exp_obd;
2206         __u64 lflags = *flags;
2207         enum ldlm_mode rc;
2208         ENTRY;
2209
2210         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2211                 RETURN(-EIO);
2212
2213         /* Filesystem lock extents are extended to page boundaries so that
2214          * dealing with the page cache is a little smoother */
2215         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2216         policy->l_extent.end |= ~PAGE_MASK;
2217
2218         /* Next, search for already existing extent locks that will cover us */
2219         /* If we're trying to read, we also search for an existing PW lock.  The
2220          * VFS and page cache already protect us locally, so lots of readers/
2221          * writers can share a single PW lock. */
2222         rc = mode;
2223         if (mode == LCK_PR)
2224                 rc |= LCK_PW;
2225         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2226                              res_id, type, policy, rc, lockh, unref);
2227         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2228                 RETURN(rc);
2229
2230         if (data != NULL) {
2231                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2232
2233                 LASSERT(lock != NULL);
2234                 if (!osc_set_lock_data(lock, data)) {
2235                         ldlm_lock_decref(lockh, rc);
2236                         rc = 0;
2237                 }
2238                 LDLM_LOCK_PUT(lock);
2239         }
2240         RETURN(rc);
2241 }
2242
2243 static int osc_statfs_interpret(const struct lu_env *env,
2244                                 struct ptlrpc_request *req,
2245                                 struct osc_async_args *aa, int rc)
2246 {
2247         struct obd_statfs *msfs;
2248         ENTRY;
2249
2250         if (rc == -EBADR)
2251                 /* The request has in fact never been sent
2252                  * due to issues at a higher level (LOV).
2253                  * Exit immediately since the caller is
2254                  * aware of the problem and takes care
2255                  * of the clean up */
2256                  RETURN(rc);
2257
2258         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2259             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2260                 GOTO(out, rc = 0);
2261
2262         if (rc != 0)
2263                 GOTO(out, rc);
2264
2265         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2266         if (msfs == NULL) {
2267                 GOTO(out, rc = -EPROTO);
2268         }
2269
2270         *aa->aa_oi->oi_osfs = *msfs;
2271 out:
2272         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2273         RETURN(rc);
2274 }
2275
2276 static int osc_statfs_async(struct obd_export *exp,
2277                             struct obd_info *oinfo, __u64 max_age,
2278                             struct ptlrpc_request_set *rqset)
2279 {
2280         struct obd_device     *obd = class_exp2obd(exp);
2281         struct ptlrpc_request *req;
2282         struct osc_async_args *aa;
2283         int                    rc;
2284         ENTRY;
2285
2286         /* We could possibly pass max_age in the request (as an absolute
2287          * timestamp or a "seconds.usec ago") so the target can avoid doing
2288          * extra calls into the filesystem if that isn't necessary (e.g.
2289          * during mount that would help a bit).  Having relative timestamps
2290          * is not so great if request processing is slow, while absolute
2291          * timestamps are not ideal because they need time synchronization. */
2292         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2293         if (req == NULL)
2294                 RETURN(-ENOMEM);
2295
2296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2297         if (rc) {
2298                 ptlrpc_request_free(req);
2299                 RETURN(rc);
2300         }
2301         ptlrpc_request_set_replen(req);
2302         req->rq_request_portal = OST_CREATE_PORTAL;
2303         ptlrpc_at_set_req_timeout(req);
2304
2305         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2306                 /* procfs requests not want stat in wait for avoid deadlock */
2307                 req->rq_no_resend = 1;
2308                 req->rq_no_delay = 1;
2309         }
2310
2311         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2312         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2313         aa = ptlrpc_req_async_args(req);
2314         aa->aa_oi = oinfo;
2315
2316         ptlrpc_set_add_req(rqset, req);
2317         RETURN(0);
2318 }
2319
2320 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2321                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2322 {
2323         struct obd_device     *obd = class_exp2obd(exp);
2324         struct obd_statfs     *msfs;
2325         struct ptlrpc_request *req;
2326         struct obd_import     *imp = NULL;
2327         int rc;
2328         ENTRY;
2329
2330         /*Since the request might also come from lprocfs, so we need
2331          *sync this with client_disconnect_export Bug15684*/
2332         down_read(&obd->u.cli.cl_sem);
2333         if (obd->u.cli.cl_import)
2334                 imp = class_import_get(obd->u.cli.cl_import);
2335         up_read(&obd->u.cli.cl_sem);
2336         if (!imp)
2337                 RETURN(-ENODEV);
2338
2339         /* We could possibly pass max_age in the request (as an absolute
2340          * timestamp or a "seconds.usec ago") so the target can avoid doing
2341          * extra calls into the filesystem if that isn't necessary (e.g.
2342          * during mount that would help a bit).  Having relative timestamps
2343          * is not so great if request processing is slow, while absolute
2344          * timestamps are not ideal because they need time synchronization. */
2345         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2346
2347         class_import_put(imp);
2348
2349         if (req == NULL)
2350                 RETURN(-ENOMEM);
2351
2352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2353         if (rc) {
2354                 ptlrpc_request_free(req);
2355                 RETURN(rc);
2356         }
2357         ptlrpc_request_set_replen(req);
2358         req->rq_request_portal = OST_CREATE_PORTAL;
2359         ptlrpc_at_set_req_timeout(req);
2360
2361         if (flags & OBD_STATFS_NODELAY) {
2362                 /* procfs requests not want stat in wait for avoid deadlock */
2363                 req->rq_no_resend = 1;
2364                 req->rq_no_delay = 1;
2365         }
2366
2367         rc = ptlrpc_queue_wait(req);
2368         if (rc)
2369                 GOTO(out, rc);
2370
2371         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2372         if (msfs == NULL) {
2373                 GOTO(out, rc = -EPROTO);
2374         }
2375
2376         *osfs = *msfs;
2377
2378         EXIT;
2379  out:
2380         ptlrpc_req_finished(req);
2381         return rc;
2382 }
2383
2384 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2385                          void *karg, void __user *uarg)
2386 {
2387         struct obd_device *obd = exp->exp_obd;
2388         struct obd_ioctl_data *data = karg;
2389         int err = 0;
2390         ENTRY;
2391
2392         if (!try_module_get(THIS_MODULE)) {
2393                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2394                        module_name(THIS_MODULE));
2395                 return -EINVAL;
2396         }
2397         switch (cmd) {
2398         case OBD_IOC_CLIENT_RECOVER:
2399                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2400                                             data->ioc_inlbuf1, 0);
2401                 if (err > 0)
2402                         err = 0;
2403                 GOTO(out, err);
2404         case IOC_OSC_SET_ACTIVE:
2405                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2406                                                data->ioc_offset);
2407                 GOTO(out, err);
2408         case OBD_IOC_PING_TARGET:
2409                 err = ptlrpc_obd_ping(obd);
2410                 GOTO(out, err);
2411         default:
2412                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2413                        cmd, current_comm());
2414                 GOTO(out, err = -ENOTTY);
2415         }
2416 out:
2417         module_put(THIS_MODULE);
2418         return err;
2419 }
2420
2421 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2422                               u32 keylen, void *key,
2423                               u32 vallen, void *val,
2424                               struct ptlrpc_request_set *set)
2425 {
2426         struct ptlrpc_request *req;
2427         struct obd_device     *obd = exp->exp_obd;
2428         struct obd_import     *imp = class_exp2cliimp(exp);
2429         char                  *tmp;
2430         int                    rc;
2431         ENTRY;
2432
2433         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2434
2435         if (KEY_IS(KEY_CHECKSUM)) {
2436                 if (vallen != sizeof(int))
2437                         RETURN(-EINVAL);
2438                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2439                 RETURN(0);
2440         }
2441
2442         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2443                 sptlrpc_conf_client_adapt(obd);
2444                 RETURN(0);
2445         }
2446
2447         if (KEY_IS(KEY_FLUSH_CTX)) {
2448                 sptlrpc_import_flush_my_ctx(imp);
2449                 RETURN(0);
2450         }
2451
2452         if (KEY_IS(KEY_CACHE_SET)) {
2453                 struct client_obd *cli = &obd->u.cli;
2454
2455                 LASSERT(cli->cl_cache == NULL); /* only once */
2456                 cli->cl_cache = (struct cl_client_cache *)val;
2457                 cl_cache_incref(cli->cl_cache);
2458                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2459
2460                 /* add this osc into entity list */
2461                 LASSERT(list_empty(&cli->cl_lru_osc));
2462                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2463                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2464                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2465
2466                 RETURN(0);
2467         }
2468
2469         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2470                 struct client_obd *cli = &obd->u.cli;
2471                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2472                 long target = *(long *)val;
2473
2474                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2475                 *(long *)val -= nr;
2476                 RETURN(0);
2477         }
2478
2479         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2480                 RETURN(-EINVAL);
2481
2482         /* We pass all other commands directly to OST. Since nobody calls osc
2483            methods directly and everybody is supposed to go through LOV, we
2484            assume lov checked invalid values for us.
2485            The only recognised values so far are evict_by_nid and mds_conn.
2486            Even if something bad goes through, we'd get a -EINVAL from OST
2487            anyway. */
2488
2489         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2490                                                 &RQF_OST_SET_GRANT_INFO :
2491                                                 &RQF_OBD_SET_INFO);
2492         if (req == NULL)
2493                 RETURN(-ENOMEM);
2494
2495         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2496                              RCL_CLIENT, keylen);
2497         if (!KEY_IS(KEY_GRANT_SHRINK))
2498                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2499                                      RCL_CLIENT, vallen);
2500         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2501         if (rc) {
2502                 ptlrpc_request_free(req);
2503                 RETURN(rc);
2504         }
2505
2506         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2507         memcpy(tmp, key, keylen);
2508         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2509                                                         &RMF_OST_BODY :
2510                                                         &RMF_SETINFO_VAL);
2511         memcpy(tmp, val, vallen);
2512
2513         if (KEY_IS(KEY_GRANT_SHRINK)) {
2514                 struct osc_grant_args *aa;
2515                 struct obdo *oa;
2516
2517                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2518                 aa = ptlrpc_req_async_args(req);
2519                 OBDO_ALLOC(oa);
2520                 if (!oa) {
2521                         ptlrpc_req_finished(req);
2522                         RETURN(-ENOMEM);
2523                 }
2524                 *oa = ((struct ost_body *)val)->oa;
2525                 aa->aa_oa = oa;
2526                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2527         }
2528
2529         ptlrpc_request_set_replen(req);
2530         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2531                 LASSERT(set != NULL);
2532                 ptlrpc_set_add_req(set, req);
2533                 ptlrpc_check_set(NULL, set);
2534         } else {
2535                 ptlrpcd_add_req(req);
2536         }
2537
2538         RETURN(0);
2539 }
2540
2541 static int osc_reconnect(const struct lu_env *env,
2542                          struct obd_export *exp, struct obd_device *obd,
2543                          struct obd_uuid *cluuid,
2544                          struct obd_connect_data *data,
2545                          void *localdata)
2546 {
2547         struct client_obd *cli = &obd->u.cli;
2548
2549         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2550                 long lost_grant;
2551                 long grant;
2552
2553                 spin_lock(&cli->cl_loi_list_lock);
2554                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2555                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2556                         grant += cli->cl_dirty_grant;
2557                 else
2558                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2559                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2560                 lost_grant = cli->cl_lost_grant;
2561                 cli->cl_lost_grant = 0;
2562                 spin_unlock(&cli->cl_loi_list_lock);
2563
2564                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2565                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2566                        data->ocd_version, data->ocd_grant, lost_grant);
2567         }
2568
2569         RETURN(0);
2570 }
2571
2572 static int osc_disconnect(struct obd_export *exp)
2573 {
2574         struct obd_device *obd = class_exp2obd(exp);
2575         int rc;
2576
2577         rc = client_disconnect_export(exp);
2578         /**
2579          * Initially we put del_shrink_grant before disconnect_export, but it
2580          * causes the following problem if setup (connect) and cleanup
2581          * (disconnect) are tangled together.
2582          *      connect p1                     disconnect p2
2583          *   ptlrpc_connect_import
2584          *     ...............               class_manual_cleanup
2585          *                                     osc_disconnect
2586          *                                     del_shrink_grant
2587          *   ptlrpc_connect_interrupt
2588          *     init_grant_shrink
2589          *   add this client to shrink list
2590          *                                      cleanup_osc
2591          * Bang! pinger trigger the shrink.
2592          * So the osc should be disconnected from the shrink list, after we
2593          * are sure the import has been destroyed. BUG18662
2594          */
2595         if (obd->u.cli.cl_import == NULL)
2596                 osc_del_shrink_grant(&obd->u.cli);
2597         return rc;
2598 }
2599
2600 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2601         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2602 {
2603         struct lu_env *env = arg;
2604         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2605         struct ldlm_lock *lock;
2606         struct osc_object *osc = NULL;
2607         ENTRY;
2608
2609         lock_res(res);
2610         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2611                 if (lock->l_ast_data != NULL && osc == NULL) {
2612                         osc = lock->l_ast_data;
2613                         cl_object_get(osc2cl(osc));
2614                 }
2615
2616                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2617                  * by the 2nd round of ldlm_namespace_clean() call in
2618                  * osc_import_event(). */
2619                 ldlm_clear_cleaned(lock);
2620         }
2621         unlock_res(res);
2622
2623         if (osc != NULL) {
2624                 osc_object_invalidate(env, osc);
2625                 cl_object_put(env, osc2cl(osc));
2626         }
2627
2628         RETURN(0);
2629 }
2630
2631 static int osc_import_event(struct obd_device *obd,
2632                             struct obd_import *imp,
2633                             enum obd_import_event event)
2634 {
2635         struct client_obd *cli;
2636         int rc = 0;
2637
2638         ENTRY;
2639         LASSERT(imp->imp_obd == obd);
2640
2641         switch (event) {
2642         case IMP_EVENT_DISCON: {
2643                 cli = &obd->u.cli;
2644                 spin_lock(&cli->cl_loi_list_lock);
2645                 cli->cl_avail_grant = 0;
2646                 cli->cl_lost_grant = 0;
2647                 spin_unlock(&cli->cl_loi_list_lock);
2648                 break;
2649         }
2650         case IMP_EVENT_INACTIVE: {
2651                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2652                 break;
2653         }
2654         case IMP_EVENT_INVALIDATE: {
2655                 struct ldlm_namespace *ns = obd->obd_namespace;
2656                 struct lu_env         *env;
2657                 __u16                  refcheck;
2658
2659                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2660
2661                 env = cl_env_get(&refcheck);
2662                 if (!IS_ERR(env)) {
2663                         osc_io_unplug(env, &obd->u.cli, NULL);
2664
2665                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2666                                                  osc_ldlm_resource_invalidate,
2667                                                  env, 0);
2668                         cl_env_put(env, &refcheck);
2669
2670                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2671                 } else
2672                         rc = PTR_ERR(env);
2673                 break;
2674         }
2675         case IMP_EVENT_ACTIVE: {
2676                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2677                 break;
2678         }
2679         case IMP_EVENT_OCD: {
2680                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2681
2682                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2683                         osc_init_grant(&obd->u.cli, ocd);
2684
2685                 /* See bug 7198 */
2686                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2687                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2688
2689                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2690                 break;
2691         }
2692         case IMP_EVENT_DEACTIVATE: {
2693                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2694                 break;
2695         }
2696         case IMP_EVENT_ACTIVATE: {
2697                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2698                 break;
2699         }
2700         default:
2701                 CERROR("Unknown import event %d\n", event);
2702                 LBUG();
2703         }
2704         RETURN(rc);
2705 }
2706
2707 /**
2708  * Determine whether the lock can be canceled before replaying the lock
2709  * during recovery, see bug16774 for detailed information.
2710  *
2711  * \retval zero the lock can't be canceled
2712  * \retval other ok to cancel
2713  */
2714 static int osc_cancel_weight(struct ldlm_lock *lock)
2715 {
2716         /*
2717          * Cancel all unused and granted extent lock.
2718          */
2719         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2720             lock->l_granted_mode == lock->l_req_mode &&
2721             osc_ldlm_weigh_ast(lock) == 0)
2722                 RETURN(1);
2723
2724         RETURN(0);
2725 }
2726
2727 static int brw_queue_work(const struct lu_env *env, void *data)
2728 {
2729         struct client_obd *cli = data;
2730
2731         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2732
2733         osc_io_unplug(env, cli, NULL);
2734         RETURN(0);
2735 }
2736
2737 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2738 {
2739         struct client_obd *cli = &obd->u.cli;
2740         struct obd_type   *type;
2741         void              *handler;
2742         int                rc;
2743         int                adding;
2744         int                added;
2745         int                req_count;
2746         ENTRY;
2747
2748         rc = ptlrpcd_addref();
2749         if (rc)
2750                 RETURN(rc);
2751
2752         rc = client_obd_setup(obd, lcfg);
2753         if (rc)
2754                 GOTO(out_ptlrpcd, rc);
2755
2756         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2757         if (IS_ERR(handler))
2758                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2759         cli->cl_writeback_work = handler;
2760
2761         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2762         if (IS_ERR(handler))
2763                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2764         cli->cl_lru_work = handler;
2765
2766         rc = osc_quota_setup(obd);
2767         if (rc)
2768                 GOTO(out_ptlrpcd_work, rc);
2769
2770         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2771
2772 #ifdef CONFIG_PROC_FS
2773         obd->obd_vars = lprocfs_osc_obd_vars;
2774 #endif
2775         /* If this is true then both client (osc) and server (osp) are on the
2776          * same node. The osp layer if loaded first will register the osc proc
2777          * directory. In that case this obd_device will be attached its proc
2778          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2779         type = class_search_type(LUSTRE_OSP_NAME);
2780         if (type && type->typ_procsym) {
2781                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2782                                                        type->typ_procsym,
2783                                                        obd->obd_vars, obd);
2784                 if (IS_ERR(obd->obd_proc_entry)) {
2785                         rc = PTR_ERR(obd->obd_proc_entry);
2786                         CERROR("error %d setting up lprocfs for %s\n", rc,
2787                                obd->obd_name);
2788                         obd->obd_proc_entry = NULL;
2789                 }
2790         } else {
2791                 rc = lprocfs_obd_setup(obd);
2792         }
2793
2794         /* If the basic OSC proc tree construction succeeded then
2795          * lets do the rest. */
2796         if (rc == 0) {
2797                 lproc_osc_attach_seqstat(obd);
2798                 sptlrpc_lprocfs_cliobd_attach(obd);
2799                 ptlrpc_lprocfs_register_obd(obd);
2800         }
2801
2802         /*
2803          * We try to control the total number of requests with a upper limit
2804          * osc_reqpool_maxreqcount. There might be some race which will cause
2805          * over-limit allocation, but it is fine.
2806          */
2807         req_count = atomic_read(&osc_pool_req_count);
2808         if (req_count < osc_reqpool_maxreqcount) {
2809                 adding = cli->cl_max_rpcs_in_flight + 2;
2810                 if (req_count + adding > osc_reqpool_maxreqcount)
2811                         adding = osc_reqpool_maxreqcount - req_count;
2812
2813                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2814                 atomic_add(added, &osc_pool_req_count);
2815         }
2816
2817         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2818         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2819
2820         spin_lock(&osc_shrink_lock);
2821         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2822         spin_unlock(&osc_shrink_lock);
2823
2824         RETURN(0);
2825
2826 out_ptlrpcd_work:
2827         if (cli->cl_writeback_work != NULL) {
2828                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2829                 cli->cl_writeback_work = NULL;
2830         }
2831         if (cli->cl_lru_work != NULL) {
2832                 ptlrpcd_destroy_work(cli->cl_lru_work);
2833                 cli->cl_lru_work = NULL;
2834         }
2835 out_client_setup:
2836         client_obd_cleanup(obd);
2837 out_ptlrpcd:
2838         ptlrpcd_decref();
2839         RETURN(rc);
2840 }
2841
2842 static int osc_precleanup(struct obd_device *obd)
2843 {
2844         struct client_obd *cli = &obd->u.cli;
2845         ENTRY;
2846
2847         /* LU-464
2848          * for echo client, export may be on zombie list, wait for
2849          * zombie thread to cull it, because cli.cl_import will be
2850          * cleared in client_disconnect_export():
2851          *   class_export_destroy() -> obd_cleanup() ->
2852          *   echo_device_free() -> echo_client_cleanup() ->
2853          *   obd_disconnect() -> osc_disconnect() ->
2854          *   client_disconnect_export()
2855          */
2856         obd_zombie_barrier();
2857         if (cli->cl_writeback_work) {
2858                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2859                 cli->cl_writeback_work = NULL;
2860         }
2861
2862         if (cli->cl_lru_work) {
2863                 ptlrpcd_destroy_work(cli->cl_lru_work);
2864                 cli->cl_lru_work = NULL;
2865         }
2866
2867         obd_cleanup_client_import(obd);
2868         ptlrpc_lprocfs_unregister_obd(obd);
2869         lprocfs_obd_cleanup(obd);
2870         RETURN(0);
2871 }
2872
2873 int osc_cleanup(struct obd_device *obd)
2874 {
2875         struct client_obd *cli = &obd->u.cli;
2876         int rc;
2877
2878         ENTRY;
2879
2880         spin_lock(&osc_shrink_lock);
2881         list_del(&cli->cl_shrink_list);
2882         spin_unlock(&osc_shrink_lock);
2883
2884         /* lru cleanup */
2885         if (cli->cl_cache != NULL) {
2886                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2887                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2888                 list_del_init(&cli->cl_lru_osc);
2889                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2890                 cli->cl_lru_left = NULL;
2891                 cl_cache_decref(cli->cl_cache);
2892                 cli->cl_cache = NULL;
2893         }
2894
2895         /* free memory of osc quota cache */
2896         osc_quota_cleanup(obd);
2897
2898         rc = client_obd_cleanup(obd);
2899
2900         ptlrpcd_decref();
2901         RETURN(rc);
2902 }
2903
2904 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2905 {
2906         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2907         return rc > 0 ? 0: rc;
2908 }
2909
2910 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2911 {
2912         return osc_process_config_base(obd, buf);
2913 }
2914
2915 static struct obd_ops osc_obd_ops = {
2916         .o_owner                = THIS_MODULE,
2917         .o_setup                = osc_setup,
2918         .o_precleanup           = osc_precleanup,
2919         .o_cleanup              = osc_cleanup,
2920         .o_add_conn             = client_import_add_conn,
2921         .o_del_conn             = client_import_del_conn,
2922         .o_connect              = client_connect_import,
2923         .o_reconnect            = osc_reconnect,
2924         .o_disconnect           = osc_disconnect,
2925         .o_statfs               = osc_statfs,
2926         .o_statfs_async         = osc_statfs_async,
2927         .o_create               = osc_create,
2928         .o_destroy              = osc_destroy,
2929         .o_getattr              = osc_getattr,
2930         .o_setattr              = osc_setattr,
2931         .o_iocontrol            = osc_iocontrol,
2932         .o_set_info_async       = osc_set_info_async,
2933         .o_import_event         = osc_import_event,
2934         .o_process_config       = osc_process_config,
2935         .o_quotactl             = osc_quotactl,
2936 };
2937
2938 static struct shrinker *osc_cache_shrinker;
2939 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2940 DEFINE_SPINLOCK(osc_shrink_lock);
2941
2942 #ifndef HAVE_SHRINKER_COUNT
2943 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2944 {
2945         struct shrink_control scv = {
2946                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2947                 .gfp_mask   = shrink_param(sc, gfp_mask)
2948         };
2949 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2950         struct shrinker *shrinker = NULL;
2951 #endif
2952
2953         (void)osc_cache_shrink_scan(shrinker, &scv);
2954
2955         return osc_cache_shrink_count(shrinker, &scv);
2956 }
2957 #endif
2958
2959 static int __init osc_init(void)
2960 {
2961         bool enable_proc = true;
2962         struct obd_type *type;
2963         unsigned int reqpool_size;
2964         unsigned int reqsize;
2965         int rc;
2966         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2967                          osc_cache_shrink_count, osc_cache_shrink_scan);
2968         ENTRY;
2969
2970         /* print an address of _any_ initialized kernel symbol from this
2971          * module, to allow debugging with gdb that doesn't support data
2972          * symbols from modules.*/
2973         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2974
2975         rc = lu_kmem_init(osc_caches);
2976         if (rc)
2977                 RETURN(rc);
2978
2979         type = class_search_type(LUSTRE_OSP_NAME);
2980         if (type != NULL && type->typ_procsym != NULL)
2981                 enable_proc = false;
2982
2983         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2984                                  LUSTRE_OSC_NAME, &osc_device_type);
2985         if (rc)
2986                 GOTO(out_kmem, rc);
2987
2988         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2989
2990         /* This is obviously too much memory, only prevent overflow here */
2991         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2992                 GOTO(out_type, rc = -EINVAL);
2993
2994         reqpool_size = osc_reqpool_mem_max << 20;
2995
2996         reqsize = 1;
2997         while (reqsize < OST_IO_MAXREQSIZE)
2998                 reqsize = reqsize << 1;
2999
3000         /*
3001          * We don't enlarge the request count in OSC pool according to
3002          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3003          * tried after normal allocation failed. So a small OSC pool won't
3004          * cause much performance degression in most of cases.
3005          */
3006         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3007
3008         atomic_set(&osc_pool_req_count, 0);
3009         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3010                                           ptlrpc_add_rqs_to_pool);
3011
3012         if (osc_rq_pool != NULL)
3013                 GOTO(out, rc);
3014         rc = -ENOMEM;
3015 out_type:
3016         class_unregister_type(LUSTRE_OSC_NAME);
3017 out_kmem:
3018         lu_kmem_fini(osc_caches);
3019 out:
3020         RETURN(rc);
3021 }
3022
3023 static void __exit osc_exit(void)
3024 {
3025         remove_shrinker(osc_cache_shrinker);
3026         class_unregister_type(LUSTRE_OSC_NAME);
3027         lu_kmem_fini(osc_caches);
3028         ptlrpc_free_rq_pool(osc_rq_pool);
3029 }
3030
3031 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3032 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3033 MODULE_VERSION(LUSTRE_VERSION_STRING);
3034 MODULE_LICENSE("GPL");
3035
3036 module_init(osc_init);
3037 module_exit(osc_exit);