Whamcloud - gitweb
LU-6179 llite: Implement ladvise lockahead
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 struct osc_brw_async_args {
62         struct obdo              *aa_oa;
63         int                       aa_requested_nob;
64         int                       aa_nio_count;
65         u32                       aa_page_count;
66         int                       aa_resends;
67         struct brw_page **aa_ppga;
68         struct client_obd        *aa_cli;
69         struct list_head          aa_oaps;
70         struct list_head          aa_exts;
71 };
72
73 #define osc_grant_args osc_brw_async_args
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct osc_object       *fa_obj;
83         struct obdo             *fa_oa;
84         obd_enqueue_update_f    fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_ladvise_args {
89         struct obdo             *la_oa;
90         obd_enqueue_update_f     la_upcall;
91         void                    *la_cookie;
92 };
93
94 struct osc_enqueue_args {
95         struct obd_export       *oa_exp;
96         enum ldlm_type          oa_type;
97         enum ldlm_mode          oa_mode;
98         __u64                   *oa_flags;
99         osc_enqueue_upcall_f    oa_upcall;
100         void                    *oa_cookie;
101         struct ost_lvb          *oa_lvb;
102         struct lustre_handle    oa_lockh;
103         bool                    oa_speculative;
104 };
105
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108                          void *data, int rc);
109
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
111 {
112         struct ost_body *body;
113
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         LASSERT(body);
116
117         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 }
119
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121                        struct obdo *oa)
122 {
123         struct ptlrpc_request   *req;
124         struct ost_body         *body;
125         int                      rc;
126
127         ENTRY;
128         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
129         if (req == NULL)
130                 RETURN(-ENOMEM);
131
132         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
133         if (rc) {
134                 ptlrpc_request_free(req);
135                 RETURN(rc);
136         }
137
138         osc_pack_req_body(req, oa);
139
140         ptlrpc_request_set_replen(req);
141
142         rc = ptlrpc_queue_wait(req);
143         if (rc)
144                 GOTO(out, rc);
145
146         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
147         if (body == NULL)
148                 GOTO(out, rc = -EPROTO);
149
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
152
153         oa->o_blksize = cli_brw_size(exp->exp_obd);
154         oa->o_valid |= OBD_MD_FLBLKSZ;
155
156         EXIT;
157 out:
158         ptlrpc_req_finished(req);
159
160         return rc;
161 }
162
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164                        struct obdo *oa)
165 {
166         struct ptlrpc_request   *req;
167         struct ost_body         *body;
168         int                      rc;
169
170         ENTRY;
171         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
172
173         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
174         if (req == NULL)
175                 RETURN(-ENOMEM);
176
177         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
178         if (rc) {
179                 ptlrpc_request_free(req);
180                 RETURN(rc);
181         }
182
183         osc_pack_req_body(req, oa);
184
185         ptlrpc_request_set_replen(req);
186
187         rc = ptlrpc_queue_wait(req);
188         if (rc)
189                 GOTO(out, rc);
190
191         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192         if (body == NULL)
193                 GOTO(out, rc = -EPROTO);
194
195         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
196
197         EXIT;
198 out:
199         ptlrpc_req_finished(req);
200
201         RETURN(rc);
202 }
203
204 static int osc_setattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_setattr_args *sa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body == NULL)
216                 GOTO(out, rc = -EPROTO);
217
218         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219                              &body->oa);
220 out:
221         rc = sa->sa_upcall(sa->sa_cookie, rc);
222         RETURN(rc);
223 }
224
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226                       obd_enqueue_update_f upcall, void *cookie,
227                       struct ptlrpc_request_set *rqset)
228 {
229         struct ptlrpc_request   *req;
230         struct osc_setattr_args *sa;
231         int                      rc;
232
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oa);
246
247         ptlrpc_request_set_replen(req);
248
249         /* do mds to ost setattr asynchronously */
250         if (!rqset) {
251                 /* Do not wait for response. */
252                 ptlrpcd_add_req(req);
253         } else {
254                 req->rq_interpret_reply =
255                         (ptlrpc_interpterer_t)osc_setattr_interpret;
256
257                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258                 sa = ptlrpc_req_async_args(req);
259                 sa->sa_oa = oa;
260                 sa->sa_upcall = upcall;
261                 sa->sa_cookie = cookie;
262
263                 if (rqset == PTLRPCD_SET)
264                         ptlrpcd_add_req(req);
265                 else
266                         ptlrpc_set_add_req(rqset, req);
267         }
268
269         RETURN(0);
270 }
271
272 static int osc_ladvise_interpret(const struct lu_env *env,
273                                  struct ptlrpc_request *req,
274                                  void *arg, int rc)
275 {
276         struct osc_ladvise_args *la = arg;
277         struct ost_body *body;
278         ENTRY;
279
280         if (rc != 0)
281                 GOTO(out, rc);
282
283         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284         if (body == NULL)
285                 GOTO(out, rc = -EPROTO);
286
287         *la->la_oa = body->oa;
288 out:
289         rc = la->la_upcall(la->la_cookie, rc);
290         RETURN(rc);
291 }
292
293 /**
294  * If rqset is NULL, do not wait for response. Upcall and cookie could also
295  * be NULL in this case
296  */
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298                      struct ladvise_hdr *ladvise_hdr,
299                      obd_enqueue_update_f upcall, void *cookie,
300                      struct ptlrpc_request_set *rqset)
301 {
302         struct ptlrpc_request   *req;
303         struct ost_body         *body;
304         struct osc_ladvise_args *la;
305         int                      rc;
306         struct lu_ladvise       *req_ladvise;
307         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
308         int                      num_advise = ladvise_hdr->lah_count;
309         struct ladvise_hdr      *req_ladvise_hdr;
310         ENTRY;
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317                              num_advise * sizeof(*ladvise));
318         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
319         if (rc != 0) {
320                 ptlrpc_request_free(req);
321                 RETURN(rc);
322         }
323         req->rq_request_portal = OST_IO_PORTAL;
324         ptlrpc_at_set_req_timeout(req);
325
326         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
327         LASSERT(body);
328         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
329                              oa);
330
331         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332                                                  &RMF_OST_LADVISE_HDR);
333         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
334
335         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337         ptlrpc_request_set_replen(req);
338
339         if (rqset == NULL) {
340                 /* Do not wait for response. */
341                 ptlrpcd_add_req(req);
342                 RETURN(0);
343         }
344
345         req->rq_interpret_reply = osc_ladvise_interpret;
346         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347         la = ptlrpc_req_async_args(req);
348         la->la_oa = oa;
349         la->la_upcall = upcall;
350         la->la_cookie = cookie;
351
352         if (rqset == PTLRPCD_SET)
353                 ptlrpcd_add_req(req);
354         else
355                 ptlrpc_set_add_req(rqset, req);
356
357         RETURN(0);
358 }
359
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
361                       struct obdo *oa)
362 {
363         struct ptlrpc_request *req;
364         struct ost_body       *body;
365         int                    rc;
366         ENTRY;
367
368         LASSERT(oa != NULL);
369         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
371
372         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
373         if (req == NULL)
374                 GOTO(out, rc = -ENOMEM);
375
376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
377         if (rc) {
378                 ptlrpc_request_free(req);
379                 GOTO(out, rc);
380         }
381
382         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
383         LASSERT(body);
384
385         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
386
387         ptlrpc_request_set_replen(req);
388
389         rc = ptlrpc_queue_wait(req);
390         if (rc)
391                 GOTO(out_req, rc);
392
393         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
394         if (body == NULL)
395                 GOTO(out_req, rc = -EPROTO);
396
397         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
399
400         oa->o_blksize = cli_brw_size(exp->exp_obd);
401         oa->o_valid |= OBD_MD_FLBLKSZ;
402
403         CDEBUG(D_HA, "transno: %lld\n",
404                lustre_msg_get_transno(req->rq_repmsg));
405 out_req:
406         ptlrpc_req_finished(req);
407 out:
408         RETURN(rc);
409 }
410
411 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
412                    obd_enqueue_update_f upcall, void *cookie,
413                    struct ptlrpc_request_set *rqset)
414 {
415         struct ptlrpc_request   *req;
416         struct osc_setattr_args *sa;
417         struct ost_body         *body;
418         int                      rc;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
422         if (req == NULL)
423                 RETURN(-ENOMEM);
424
425         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
426         if (rc) {
427                 ptlrpc_request_free(req);
428                 RETURN(rc);
429         }
430         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
431         ptlrpc_at_set_req_timeout(req);
432
433         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
434         LASSERT(body);
435         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
436
437         ptlrpc_request_set_replen(req);
438
439         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
440         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
441         sa = ptlrpc_req_async_args(req);
442         sa->sa_oa = oa;
443         sa->sa_upcall = upcall;
444         sa->sa_cookie = cookie;
445         if (rqset == PTLRPCD_SET)
446                 ptlrpcd_add_req(req);
447         else
448                 ptlrpc_set_add_req(rqset, req);
449
450         RETURN(0);
451 }
452
453 static int osc_sync_interpret(const struct lu_env *env,
454                               struct ptlrpc_request *req,
455                               void *arg, int rc)
456 {
457         struct osc_fsync_args   *fa = arg;
458         struct ost_body         *body;
459         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
460         unsigned long           valid = 0;
461         struct cl_object        *obj;
462         ENTRY;
463
464         if (rc != 0)
465                 GOTO(out, rc);
466
467         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
468         if (body == NULL) {
469                 CERROR("can't unpack ost_body\n");
470                 GOTO(out, rc = -EPROTO);
471         }
472
473         *fa->fa_oa = body->oa;
474         obj = osc2cl(fa->fa_obj);
475
476         /* Update osc object's blocks attribute */
477         cl_object_attr_lock(obj);
478         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
479                 attr->cat_blocks = body->oa.o_blocks;
480                 valid |= CAT_BLOCKS;
481         }
482
483         if (valid != 0)
484                 cl_object_attr_update(env, obj, attr, valid);
485         cl_object_attr_unlock(obj);
486
487 out:
488         rc = fa->fa_upcall(fa->fa_cookie, rc);
489         RETURN(rc);
490 }
491
492 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
493                   obd_enqueue_update_f upcall, void *cookie,
494                   struct ptlrpc_request_set *rqset)
495 {
496         struct obd_export     *exp = osc_export(obj);
497         struct ptlrpc_request *req;
498         struct ost_body       *body;
499         struct osc_fsync_args *fa;
500         int                    rc;
501         ENTRY;
502
503         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
504         if (req == NULL)
505                 RETURN(-ENOMEM);
506
507         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
508         if (rc) {
509                 ptlrpc_request_free(req);
510                 RETURN(rc);
511         }
512
513         /* overload the size and blocks fields in the oa with start/end */
514         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
515         LASSERT(body);
516         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
517
518         ptlrpc_request_set_replen(req);
519         req->rq_interpret_reply = osc_sync_interpret;
520
521         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
522         fa = ptlrpc_req_async_args(req);
523         fa->fa_obj = obj;
524         fa->fa_oa = oa;
525         fa->fa_upcall = upcall;
526         fa->fa_cookie = cookie;
527
528         if (rqset == PTLRPCD_SET)
529                 ptlrpcd_add_req(req);
530         else
531                 ptlrpc_set_add_req(rqset, req);
532
533         RETURN (0);
534 }
535
536 /* Find and cancel locally locks matched by @mode in the resource found by
537  * @objid. Found locks are added into @cancel list. Returns the amount of
538  * locks added to @cancels list. */
539 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
540                                    struct list_head *cancels,
541                                    enum ldlm_mode mode, __u64 lock_flags)
542 {
543         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
544         struct ldlm_res_id res_id;
545         struct ldlm_resource *res;
546         int count;
547         ENTRY;
548
549         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
550          * export) but disabled through procfs (flag in NS).
551          *
552          * This distinguishes from a case when ELC is not supported originally,
553          * when we still want to cancel locks in advance and just cancel them
554          * locally, without sending any RPC. */
555         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
556                 RETURN(0);
557
558         ostid_build_res_name(&oa->o_oi, &res_id);
559         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
560         if (IS_ERR(res))
561                 RETURN(0);
562
563         LDLM_RESOURCE_ADDREF(res);
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         LDLM_RESOURCE_DELREF(res);
567         ldlm_resource_putref(res);
568         RETURN(count);
569 }
570
571 static int osc_destroy_interpret(const struct lu_env *env,
572                                  struct ptlrpc_request *req, void *data,
573                                  int rc)
574 {
575         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
576
577         atomic_dec(&cli->cl_destroy_in_flight);
578         wake_up(&cli->cl_destroy_waitq);
579         return 0;
580 }
581
582 static int osc_can_send_destroy(struct client_obd *cli)
583 {
584         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
585             cli->cl_max_rpcs_in_flight) {
586                 /* The destroy request can be sent */
587                 return 1;
588         }
589         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
590             cli->cl_max_rpcs_in_flight) {
591                 /*
592                  * The counter has been modified between the two atomic
593                  * operations.
594                  */
595                 wake_up(&cli->cl_destroy_waitq);
596         }
597         return 0;
598 }
599
600 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
601                        struct obdo *oa)
602 {
603         struct client_obd     *cli = &exp->exp_obd->u.cli;
604         struct ptlrpc_request *req;
605         struct ost_body       *body;
606         struct list_head       cancels = LIST_HEAD_INIT(cancels);
607         int rc, count;
608         ENTRY;
609
610         if (!oa) {
611                 CDEBUG(D_INFO, "oa NULL\n");
612                 RETURN(-EINVAL);
613         }
614
615         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
616                                         LDLM_FL_DISCARD_DATA);
617
618         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
619         if (req == NULL) {
620                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
621                 RETURN(-ENOMEM);
622         }
623
624         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
625                                0, &cancels, count);
626         if (rc) {
627                 ptlrpc_request_free(req);
628                 RETURN(rc);
629         }
630
631         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
632         ptlrpc_at_set_req_timeout(req);
633
634         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635         LASSERT(body);
636         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
637
638         ptlrpc_request_set_replen(req);
639
640         req->rq_interpret_reply = osc_destroy_interpret;
641         if (!osc_can_send_destroy(cli)) {
642                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
643
644                 /*
645                  * Wait until the number of on-going destroy RPCs drops
646                  * under max_rpc_in_flight
647                  */
648                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
649                                             osc_can_send_destroy(cli), &lwi);
650                 if (rc) {
651                         ptlrpc_req_finished(req);
652                         RETURN(rc);
653                 }
654         }
655
656         /* Do not wait for response */
657         ptlrpcd_add_req(req);
658         RETURN(0);
659 }
660
661 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
662                                 long writing_bytes)
663 {
664         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
665
666         LASSERT(!(oa->o_valid & bits));
667
668         oa->o_valid |= bits;
669         spin_lock(&cli->cl_loi_list_lock);
670         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
671                 oa->o_dirty = cli->cl_dirty_grant;
672         else
673                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
674         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
675                      cli->cl_dirty_max_pages)) {
676                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
677                        cli->cl_dirty_pages, cli->cl_dirty_transit,
678                        cli->cl_dirty_max_pages);
679                 oa->o_undirty = 0;
680         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
681                             atomic_long_read(&obd_dirty_transit_pages) >
682                             (long)(obd_max_dirty_pages + 1))) {
683                 /* The atomic_read() allowing the atomic_inc() are
684                  * not covered by a lock thus they may safely race and trip
685                  * this CERROR() unless we add in a small fudge factor (+1). */
686                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
687                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
688                        atomic_long_read(&obd_dirty_transit_pages),
689                        obd_max_dirty_pages);
690                 oa->o_undirty = 0;
691         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
692                             0x7fffffff)) {
693                 CERROR("dirty %lu - dirty_max %lu too big???\n",
694                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
695                 oa->o_undirty = 0;
696         } else {
697                 unsigned long nrpages;
698
699                 nrpages = cli->cl_max_pages_per_rpc;
700                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
701                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
702                 oa->o_undirty = nrpages << PAGE_SHIFT;
703                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
704                                  GRANT_PARAM)) {
705                         int nrextents;
706
707                         /* take extent tax into account when asking for more
708                          * grant space */
709                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
710                                      cli->cl_max_extent_pages;
711                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
712                 }
713         }
714         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
715         oa->o_dropped = cli->cl_lost_grant;
716         cli->cl_lost_grant = 0;
717         spin_unlock(&cli->cl_loi_list_lock);
718         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
719                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
720 }
721
722 void osc_update_next_shrink(struct client_obd *cli)
723 {
724         cli->cl_next_shrink_grant =
725                 cfs_time_shift(cli->cl_grant_shrink_interval);
726         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
727                cli->cl_next_shrink_grant);
728 }
729
730 static void __osc_update_grant(struct client_obd *cli, u64 grant)
731 {
732         spin_lock(&cli->cl_loi_list_lock);
733         cli->cl_avail_grant += grant;
734         spin_unlock(&cli->cl_loi_list_lock);
735 }
736
737 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
738 {
739         if (body->oa.o_valid & OBD_MD_FLGRANT) {
740                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
741                 __osc_update_grant(cli, body->oa.o_grant);
742         }
743 }
744
745 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
746                               u32 keylen, void *key,
747                               u32 vallen, void *val,
748                               struct ptlrpc_request_set *set);
749
750 static int osc_shrink_grant_interpret(const struct lu_env *env,
751                                       struct ptlrpc_request *req,
752                                       void *aa, int rc)
753 {
754         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
755         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
756         struct ost_body *body;
757
758         if (rc != 0) {
759                 __osc_update_grant(cli, oa->o_grant);
760                 GOTO(out, rc);
761         }
762
763         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
764         LASSERT(body);
765         osc_update_grant(cli, body);
766 out:
767         OBDO_FREE(oa);
768         return rc;
769 }
770
771 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
772 {
773         spin_lock(&cli->cl_loi_list_lock);
774         oa->o_grant = cli->cl_avail_grant / 4;
775         cli->cl_avail_grant -= oa->o_grant;
776         spin_unlock(&cli->cl_loi_list_lock);
777         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
778                 oa->o_valid |= OBD_MD_FLFLAGS;
779                 oa->o_flags = 0;
780         }
781         oa->o_flags |= OBD_FL_SHRINK_GRANT;
782         osc_update_next_shrink(cli);
783 }
784
785 /* Shrink the current grant, either from some large amount to enough for a
786  * full set of in-flight RPCs, or if we have already shrunk to that limit
787  * then to enough for a single RPC.  This avoids keeping more grant than
788  * needed, and avoids shrinking the grant piecemeal. */
789 static int osc_shrink_grant(struct client_obd *cli)
790 {
791         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
792                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
793
794         spin_lock(&cli->cl_loi_list_lock);
795         if (cli->cl_avail_grant <= target_bytes)
796                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
797         spin_unlock(&cli->cl_loi_list_lock);
798
799         return osc_shrink_grant_to_target(cli, target_bytes);
800 }
801
802 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
803 {
804         int                     rc = 0;
805         struct ost_body        *body;
806         ENTRY;
807
808         spin_lock(&cli->cl_loi_list_lock);
809         /* Don't shrink if we are already above or below the desired limit
810          * We don't want to shrink below a single RPC, as that will negatively
811          * impact block allocation and long-term performance. */
812         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
813                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
814
815         if (target_bytes >= cli->cl_avail_grant) {
816                 spin_unlock(&cli->cl_loi_list_lock);
817                 RETURN(0);
818         }
819         spin_unlock(&cli->cl_loi_list_lock);
820
821         OBD_ALLOC_PTR(body);
822         if (!body)
823                 RETURN(-ENOMEM);
824
825         osc_announce_cached(cli, &body->oa, 0);
826
827         spin_lock(&cli->cl_loi_list_lock);
828         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
829         cli->cl_avail_grant = target_bytes;
830         spin_unlock(&cli->cl_loi_list_lock);
831         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
832                 body->oa.o_valid |= OBD_MD_FLFLAGS;
833                 body->oa.o_flags = 0;
834         }
835         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
836         osc_update_next_shrink(cli);
837
838         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
839                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
840                                 sizeof(*body), body, NULL);
841         if (rc != 0)
842                 __osc_update_grant(cli, body->oa.o_grant);
843         OBD_FREE_PTR(body);
844         RETURN(rc);
845 }
846
847 static int osc_should_shrink_grant(struct client_obd *client)
848 {
849         cfs_time_t time = cfs_time_current();
850         cfs_time_t next_shrink = client->cl_next_shrink_grant;
851
852         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
853              OBD_CONNECT_GRANT_SHRINK) == 0)
854                 return 0;
855
856         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
857                 /* Get the current RPC size directly, instead of going via:
858                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859                  * Keep comment here so that it can be found by searching. */
860                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861
862                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863                     client->cl_avail_grant > brw_size)
864                         return 1;
865                 else
866                         osc_update_next_shrink(client);
867         }
868         return 0;
869 }
870
871 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
872 {
873         struct client_obd *client;
874
875         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
876                 if (osc_should_shrink_grant(client))
877                         osc_shrink_grant(client);
878         }
879         return 0;
880 }
881
882 static int osc_add_shrink_grant(struct client_obd *client)
883 {
884         int rc;
885
886         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
887                                        TIMEOUT_GRANT,
888                                        osc_grant_shrink_grant_cb, NULL,
889                                        &client->cl_grant_shrink_list);
890         if (rc) {
891                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
892                 return rc;
893         }
894         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
895         osc_update_next_shrink(client);
896         return 0;
897 }
898
899 static int osc_del_shrink_grant(struct client_obd *client)
900 {
901         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
902                                          TIMEOUT_GRANT);
903 }
904
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 {
907         /*
908          * ocd_grant is the total grant amount we're expect to hold: if we've
909          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
910          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
911          * dirty.
912          *
913          * race is tolerable here: if we're evicted, but imp_state already
914          * left EVICTED state, then cl_dirty_pages must be 0 already.
915          */
916         spin_lock(&cli->cl_loi_list_lock);
917         cli->cl_avail_grant = ocd->ocd_grant;
918         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
919                 cli->cl_avail_grant -= cli->cl_reserved_grant;
920                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
921                         cli->cl_avail_grant -= cli->cl_dirty_grant;
922                 else
923                         cli->cl_avail_grant -=
924                                         cli->cl_dirty_pages << PAGE_SHIFT;
925         }
926
927         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
928                 u64 size;
929                 int chunk_mask;
930
931                 /* overhead for each extent insertion */
932                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
933                 /* determine the appropriate chunk size used by osc_extent. */
934                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
935                                           ocd->ocd_grant_blkbits);
936                 /* max_pages_per_rpc must be chunk aligned */
937                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
938                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
939                                              ~chunk_mask) & chunk_mask;
940                 /* determine maximum extent size, in #pages */
941                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
942                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
943                 if (cli->cl_max_extent_pages == 0)
944                         cli->cl_max_extent_pages = 1;
945         } else {
946                 cli->cl_grant_extent_tax = 0;
947                 cli->cl_chunkbits = PAGE_SHIFT;
948                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
949         }
950         spin_unlock(&cli->cl_loi_list_lock);
951
952         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
953                 "chunk bits: %d cl_max_extent_pages: %d\n",
954                 cli_name(cli),
955                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
956                 cli->cl_max_extent_pages);
957
958         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
959             list_empty(&cli->cl_grant_shrink_list))
960                 osc_add_shrink_grant(cli);
961 }
962
963 /* We assume that the reason this OSC got a short read is because it read
964  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
965  * via the LOV, and it _knows_ it's reading inside the file, it's just that
966  * this stripe never got written at or beyond this stripe offset yet. */
967 static void handle_short_read(int nob_read, size_t page_count,
968                               struct brw_page **pga)
969 {
970         char *ptr;
971         int i = 0;
972
973         /* skip bytes read OK */
974         while (nob_read > 0) {
975                 LASSERT (page_count > 0);
976
977                 if (pga[i]->count > nob_read) {
978                         /* EOF inside this page */
979                         ptr = kmap(pga[i]->pg) +
980                                 (pga[i]->off & ~PAGE_MASK);
981                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
982                         kunmap(pga[i]->pg);
983                         page_count--;
984                         i++;
985                         break;
986                 }
987
988                 nob_read -= pga[i]->count;
989                 page_count--;
990                 i++;
991         }
992
993         /* zero remaining pages */
994         while (page_count-- > 0) {
995                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
996                 memset(ptr, 0, pga[i]->count);
997                 kunmap(pga[i]->pg);
998                 i++;
999         }
1000 }
1001
1002 static int check_write_rcs(struct ptlrpc_request *req,
1003                            int requested_nob, int niocount,
1004                            size_t page_count, struct brw_page **pga)
1005 {
1006         int     i;
1007         __u32   *remote_rcs;
1008
1009         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1010                                                   sizeof(*remote_rcs) *
1011                                                   niocount);
1012         if (remote_rcs == NULL) {
1013                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1014                 return(-EPROTO);
1015         }
1016
1017         /* return error if any niobuf was in error */
1018         for (i = 0; i < niocount; i++) {
1019                 if ((int)remote_rcs[i] < 0)
1020                         return(remote_rcs[i]);
1021
1022                 if (remote_rcs[i] != 0) {
1023                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1024                                 i, remote_rcs[i], req);
1025                         return(-EPROTO);
1026                 }
1027         }
1028
1029         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1030                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1031                        req->rq_bulk->bd_nob_transferred, requested_nob);
1032                 return(-EPROTO);
1033         }
1034
1035         return (0);
1036 }
1037
1038 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1039 {
1040         if (p1->flag != p2->flag) {
1041                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1042                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1043                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1044
1045                 /* warn if we try to combine flags that we don't know to be
1046                  * safe to combine */
1047                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1048                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1049                               "report this at https://jira.hpdd.intel.com/\n",
1050                               p1->flag, p2->flag);
1051                 }
1052                 return 0;
1053         }
1054
1055         return (p1->off + p1->count == p2->off);
1056 }
1057
1058 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1059                              struct brw_page **pga, int opc,
1060                              enum cksum_types cksum_type)
1061 {
1062         u32                             cksum;
1063         int                             i = 0;
1064         struct cfs_crypto_hash_desc     *hdesc;
1065         unsigned int                    bufsize;
1066         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1067
1068         LASSERT(pg_count > 0);
1069
1070         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1071         if (IS_ERR(hdesc)) {
1072                 CERROR("Unable to initialize checksum hash %s\n",
1073                        cfs_crypto_hash_name(cfs_alg));
1074                 return PTR_ERR(hdesc);
1075         }
1076
1077         while (nob > 0 && pg_count > 0) {
1078                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1079
1080                 /* corrupt the data before we compute the checksum, to
1081                  * simulate an OST->client data error */
1082                 if (i == 0 && opc == OST_READ &&
1083                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1084                         unsigned char *ptr = kmap(pga[i]->pg);
1085                         int off = pga[i]->off & ~PAGE_MASK;
1086
1087                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1088                         kunmap(pga[i]->pg);
1089                 }
1090                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1091                                             pga[i]->off & ~PAGE_MASK,
1092                                             count);
1093                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1094                                (int)(pga[i]->off & ~PAGE_MASK));
1095
1096                 nob -= pga[i]->count;
1097                 pg_count--;
1098                 i++;
1099         }
1100
1101         bufsize = sizeof(cksum);
1102         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1103
1104         /* For sending we only compute the wrong checksum instead
1105          * of corrupting the data so it is still correct on a redo */
1106         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1107                 cksum++;
1108
1109         return cksum;
1110 }
1111
1112 static int
1113 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1114                      u32 page_count, struct brw_page **pga,
1115                      struct ptlrpc_request **reqp, int resend)
1116 {
1117         struct ptlrpc_request   *req;
1118         struct ptlrpc_bulk_desc *desc;
1119         struct ost_body         *body;
1120         struct obd_ioobj        *ioobj;
1121         struct niobuf_remote    *niobuf;
1122         int niocount, i, requested_nob, opc, rc;
1123         struct osc_brw_async_args *aa;
1124         struct req_capsule      *pill;
1125         struct brw_page *pg_prev;
1126
1127         ENTRY;
1128         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1129                 RETURN(-ENOMEM); /* Recoverable */
1130         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1131                 RETURN(-EINVAL); /* Fatal */
1132
1133         if ((cmd & OBD_BRW_WRITE) != 0) {
1134                 opc = OST_WRITE;
1135                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1136                                                 osc_rq_pool,
1137                                                 &RQF_OST_BRW_WRITE);
1138         } else {
1139                 opc = OST_READ;
1140                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1141         }
1142         if (req == NULL)
1143                 RETURN(-ENOMEM);
1144
1145         for (niocount = i = 1; i < page_count; i++) {
1146                 if (!can_merge_pages(pga[i - 1], pga[i]))
1147                         niocount++;
1148         }
1149
1150         pill = &req->rq_pill;
1151         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1152                              sizeof(*ioobj));
1153         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1154                              niocount * sizeof(*niobuf));
1155
1156         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1157         if (rc) {
1158                 ptlrpc_request_free(req);
1159                 RETURN(rc);
1160         }
1161         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1162         ptlrpc_at_set_req_timeout(req);
1163         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1164          * retry logic */
1165         req->rq_no_retry_einprogress = 1;
1166
1167         desc = ptlrpc_prep_bulk_imp(req, page_count,
1168                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1169                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1170                         PTLRPC_BULK_PUT_SINK) |
1171                         PTLRPC_BULK_BUF_KIOV,
1172                 OST_BULK_PORTAL,
1173                 &ptlrpc_bulk_kiov_pin_ops);
1174
1175         if (desc == NULL)
1176                 GOTO(out, rc = -ENOMEM);
1177         /* NB request now owns desc and will free it when it gets freed */
1178
1179         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1180         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1181         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1183
1184         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1185
1186         obdo_to_ioobj(oa, ioobj);
1187         ioobj->ioo_bufcnt = niocount;
1188         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1189          * that might be send for this request.  The actual number is decided
1190          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1191          * "max - 1" for old client compatibility sending "0", and also so the
1192          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1193         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1194         LASSERT(page_count > 0);
1195         pg_prev = pga[0];
1196         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1197                 struct brw_page *pg = pga[i];
1198                 int poff = pg->off & ~PAGE_MASK;
1199
1200                 LASSERT(pg->count > 0);
1201                 /* make sure there is no gap in the middle of page array */
1202                 LASSERTF(page_count == 1 ||
1203                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1204                           ergo(i > 0 && i < page_count - 1,
1205                                poff == 0 && pg->count == PAGE_SIZE)   &&
1206                           ergo(i == page_count - 1, poff == 0)),
1207                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1208                          i, page_count, pg, pg->off, pg->count);
1209                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1210                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1211                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1212                          i, page_count,
1213                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1214                          pg_prev->pg, page_private(pg_prev->pg),
1215                          pg_prev->pg->index, pg_prev->off);
1216                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1217                         (pg->flag & OBD_BRW_SRVLOCK));
1218
1219                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1220                 requested_nob += pg->count;
1221
1222                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1223                         niobuf--;
1224                         niobuf->rnb_len += pg->count;
1225                 } else {
1226                         niobuf->rnb_offset = pg->off;
1227                         niobuf->rnb_len    = pg->count;
1228                         niobuf->rnb_flags  = pg->flag;
1229                 }
1230                 pg_prev = pg;
1231         }
1232
1233         LASSERTF((void *)(niobuf - niocount) ==
1234                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1235                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1236                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1237
1238         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1239         if (resend) {
1240                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1241                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1242                         body->oa.o_flags = 0;
1243                 }
1244                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1245         }
1246
1247         if (osc_should_shrink_grant(cli))
1248                 osc_shrink_grant_local(cli, &body->oa);
1249
1250         /* size[REQ_REC_OFF] still sizeof (*body) */
1251         if (opc == OST_WRITE) {
1252                 if (cli->cl_checksum &&
1253                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1254                         /* store cl_cksum_type in a local variable since
1255                          * it can be changed via lprocfs */
1256                         enum cksum_types cksum_type = cli->cl_cksum_type;
1257
1258                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1259                                 body->oa.o_flags = 0;
1260
1261                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1262                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1263                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1264                                                              page_count, pga,
1265                                                              OST_WRITE,
1266                                                              cksum_type);
1267                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1268                                body->oa.o_cksum);
1269                         /* save this in 'oa', too, for later checking */
1270                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         oa->o_flags |= cksum_type_pack(cksum_type);
1272                 } else {
1273                         /* clear out the checksum flag, in case this is a
1274                          * resend but cl_checksum is no longer set. b=11238 */
1275                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1276                 }
1277                 oa->o_cksum = body->oa.o_cksum;
1278                 /* 1 RC per niobuf */
1279                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1280                                      sizeof(__u32) * niocount);
1281         } else {
1282                 if (cli->cl_checksum &&
1283                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1284                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1285                                 body->oa.o_flags = 0;
1286                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1287                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1288                 }
1289
1290                 /* Client cksum has been already copied to wire obdo in previous
1291                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1292                  * resent due to cksum error, this will allow Server to
1293                  * check+dump pages on its side */
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307
1308         *reqp = req;
1309         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1310         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1311                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1312                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1313         RETURN(0);
1314
1315  out:
1316         ptlrpc_req_finished(req);
1317         RETURN(rc);
1318 }
1319
1320 char dbgcksum_file_name[PATH_MAX];
1321
1322 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1323                                 struct brw_page **pga, __u32 server_cksum,
1324                                 __u32 client_cksum)
1325 {
1326         struct file *filp;
1327         int rc, i;
1328         unsigned int len;
1329         char *buf;
1330         mm_segment_t oldfs;
1331
1332         /* will only keep dump of pages on first error for the same range in
1333          * file/fid, not during the resends/retries. */
1334         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1335                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1336                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1337                   libcfs_debug_file_path_arr :
1338                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1339                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1340                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1341                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1342                  pga[0]->off,
1343                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1344                  client_cksum, server_cksum);
1345         filp = filp_open(dbgcksum_file_name,
1346                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1347         if (IS_ERR(filp)) {
1348                 rc = PTR_ERR(filp);
1349                 if (rc == -EEXIST)
1350                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1351                                "checksum error: rc = %d\n", dbgcksum_file_name,
1352                                rc);
1353                 else
1354                         CERROR("%s: can't open to dump pages with checksum "
1355                                "error: rc = %d\n", dbgcksum_file_name, rc);
1356                 return;
1357         }
1358
1359         oldfs = get_fs();
1360         set_fs(KERNEL_DS);
1361         for (i = 0; i < page_count; i++) {
1362                 len = pga[i]->count;
1363                 buf = kmap(pga[i]->pg);
1364                 while (len != 0) {
1365                         rc = vfs_write(filp, (__force const char __user *)buf,
1366                                        len, &filp->f_pos);
1367                         if (rc < 0) {
1368                                 CERROR("%s: wanted to write %u but got %d "
1369                                        "error\n", dbgcksum_file_name, len, rc);
1370                                 break;
1371                         }
1372                         len -= rc;
1373                         buf += rc;
1374                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1375                                dbgcksum_file_name, rc);
1376                 }
1377                 kunmap(pga[i]->pg);
1378         }
1379         set_fs(oldfs);
1380
1381         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1382         if (rc)
1383                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1384         filp_close(filp, NULL);
1385         return;
1386 }
1387
1388 static int
1389 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1390                                 __u32 client_cksum, __u32 server_cksum,
1391                                 struct osc_brw_async_args *aa)
1392 {
1393         __u32 new_cksum;
1394         char *msg;
1395         enum cksum_types cksum_type;
1396
1397         if (server_cksum == client_cksum) {
1398                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1399                 return 0;
1400         }
1401
1402         if (aa->aa_cli->cl_checksum_dump)
1403                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1404                                     server_cksum, client_cksum);
1405
1406         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1407                                        oa->o_flags : 0);
1408         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1409                                       aa->aa_ppga, OST_WRITE, cksum_type);
1410
1411         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1412                 msg = "the server did not use the checksum type specified in "
1413                       "the original request - likely a protocol problem";
1414         else if (new_cksum == server_cksum)
1415                 msg = "changed on the client after we checksummed it - "
1416                       "likely false positive due to mmap IO (bug 11742)";
1417         else if (new_cksum == client_cksum)
1418                 msg = "changed in transit before arrival at OST";
1419         else
1420                 msg = "changed in transit AND doesn't match the original - "
1421                       "likely false positive due to mmap IO (bug 11742)";
1422
1423         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1424                            DFID " object "DOSTID" extent [%llu-%llu], original "
1425                            "client csum %x (type %x), server csum %x (type %x),"
1426                            " client csum now %x\n",
1427                            aa->aa_cli->cl_import->imp_obd->obd_name,
1428                            msg, libcfs_nid2str(peer->nid),
1429                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1430                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1431                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1432                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1433                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1434                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1435                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1436                            server_cksum, cksum_type, new_cksum);
1437         return 1;
1438 }
1439
1440 /* Note rc enters this function as number of bytes transferred */
1441 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1442 {
1443         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1444         const struct lnet_process_id *peer =
1445                         &req->rq_import->imp_connection->c_peer;
1446         struct client_obd *cli = aa->aa_cli;
1447         struct ost_body *body;
1448         u32 client_cksum = 0;
1449         ENTRY;
1450
1451         if (rc < 0 && rc != -EDQUOT) {
1452                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1453                 RETURN(rc);
1454         }
1455
1456         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1458         if (body == NULL) {
1459                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1460                 RETURN(-EPROTO);
1461         }
1462
1463         /* set/clear over quota flag for a uid/gid/projid */
1464         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1465             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1466                 unsigned qid[LL_MAXQUOTAS] = {
1467                                          body->oa.o_uid, body->oa.o_gid,
1468                                          body->oa.o_projid };
1469                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1470                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1471                        body->oa.o_valid, body->oa.o_flags);
1472                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1473                                        body->oa.o_flags);
1474         }
1475
1476         osc_update_grant(cli, body);
1477
1478         if (rc < 0)
1479                 RETURN(rc);
1480
1481         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1482                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1483
1484         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1485                 if (rc > 0) {
1486                         CERROR("Unexpected +ve rc %d\n", rc);
1487                         RETURN(-EPROTO);
1488                 }
1489                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1490
1491                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1492                         RETURN(-EAGAIN);
1493
1494                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1495                     check_write_checksum(&body->oa, peer, client_cksum,
1496                                          body->oa.o_cksum, aa))
1497                         RETURN(-EAGAIN);
1498
1499                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1500                                      aa->aa_page_count, aa->aa_ppga);
1501                 GOTO(out, rc);
1502         }
1503
1504         /* The rest of this function executes only for OST_READs */
1505
1506         /* if unwrap_bulk failed, return -EAGAIN to retry */
1507         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1508         if (rc < 0)
1509                 GOTO(out, rc = -EAGAIN);
1510
1511         if (rc > aa->aa_requested_nob) {
1512                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1513                        aa->aa_requested_nob);
1514                 RETURN(-EPROTO);
1515         }
1516
1517         if (rc != req->rq_bulk->bd_nob_transferred) {
1518                 CERROR ("Unexpected rc %d (%d transferred)\n",
1519                         rc, req->rq_bulk->bd_nob_transferred);
1520                 return (-EPROTO);
1521         }
1522
1523         if (rc < aa->aa_requested_nob)
1524                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1525
1526         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1527                 static int cksum_counter;
1528                 u32        server_cksum = body->oa.o_cksum;
1529                 char      *via = "";
1530                 char      *router = "";
1531                 enum cksum_types cksum_type;
1532
1533                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1534                                                body->oa.o_flags : 0);
1535                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1536                                                  aa->aa_ppga, OST_READ,
1537                                                  cksum_type);
1538
1539                 if (peer->nid != req->rq_bulk->bd_sender) {
1540                         via = " via ";
1541                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1542                 }
1543
1544                 if (server_cksum != client_cksum) {
1545                         struct ost_body *clbody;
1546                         u32 page_count = aa->aa_page_count;
1547
1548                         clbody = req_capsule_client_get(&req->rq_pill,
1549                                                         &RMF_OST_BODY);
1550                         if (cli->cl_checksum_dump)
1551                                 dump_all_bulk_pages(&clbody->oa, page_count,
1552                                                     aa->aa_ppga, server_cksum,
1553                                                     client_cksum);
1554
1555                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1556                                            "%s%s%s inode "DFID" object "DOSTID
1557                                            " extent [%llu-%llu], client %x, "
1558                                            "server %x, cksum_type %x\n",
1559                                            req->rq_import->imp_obd->obd_name,
1560                                            libcfs_nid2str(peer->nid),
1561                                            via, router,
1562                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1563                                                 clbody->oa.o_parent_seq : 0ULL,
1564                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1565                                                 clbody->oa.o_parent_oid : 0,
1566                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1567                                                 clbody->oa.o_parent_ver : 0,
1568                                            POSTID(&body->oa.o_oi),
1569                                            aa->aa_ppga[0]->off,
1570                                            aa->aa_ppga[page_count-1]->off +
1571                                            aa->aa_ppga[page_count-1]->count - 1,
1572                                            client_cksum, server_cksum,
1573                                            cksum_type);
1574                         cksum_counter = 0;
1575                         aa->aa_oa->o_cksum = client_cksum;
1576                         rc = -EAGAIN;
1577                 } else {
1578                         cksum_counter++;
1579                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1580                         rc = 0;
1581                 }
1582         } else if (unlikely(client_cksum)) {
1583                 static int cksum_missed;
1584
1585                 cksum_missed++;
1586                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1587                         CERROR("Checksum %u requested from %s but not sent\n",
1588                                cksum_missed, libcfs_nid2str(peer->nid));
1589         } else {
1590                 rc = 0;
1591         }
1592 out:
1593         if (rc >= 0)
1594                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1595                                      aa->aa_oa, &body->oa);
1596
1597         RETURN(rc);
1598 }
1599
1600 static int osc_brw_redo_request(struct ptlrpc_request *request,
1601                                 struct osc_brw_async_args *aa, int rc)
1602 {
1603         struct ptlrpc_request *new_req;
1604         struct osc_brw_async_args *new_aa;
1605         struct osc_async_page *oap;
1606         ENTRY;
1607
1608         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1609                   "redo for recoverable error %d", rc);
1610
1611         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1612                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1613                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1614                                   aa->aa_ppga, &new_req, 1);
1615         if (rc)
1616                 RETURN(rc);
1617
1618         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1619                 if (oap->oap_request != NULL) {
1620                         LASSERTF(request == oap->oap_request,
1621                                  "request %p != oap_request %p\n",
1622                                  request, oap->oap_request);
1623                         if (oap->oap_interrupted) {
1624                                 ptlrpc_req_finished(new_req);
1625                                 RETURN(-EINTR);
1626                         }
1627                 }
1628         }
1629         /* New request takes over pga and oaps from old request.
1630          * Note that copying a list_head doesn't work, need to move it... */
1631         aa->aa_resends++;
1632         new_req->rq_interpret_reply = request->rq_interpret_reply;
1633         new_req->rq_async_args = request->rq_async_args;
1634         new_req->rq_commit_cb = request->rq_commit_cb;
1635         /* cap resend delay to the current request timeout, this is similar to
1636          * what ptlrpc does (see after_reply()) */
1637         if (aa->aa_resends > new_req->rq_timeout)
1638                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1639         else
1640                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1641         new_req->rq_generation_set = 1;
1642         new_req->rq_import_generation = request->rq_import_generation;
1643
1644         new_aa = ptlrpc_req_async_args(new_req);
1645
1646         INIT_LIST_HEAD(&new_aa->aa_oaps);
1647         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1648         INIT_LIST_HEAD(&new_aa->aa_exts);
1649         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1650         new_aa->aa_resends = aa->aa_resends;
1651
1652         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1653                 if (oap->oap_request) {
1654                         ptlrpc_req_finished(oap->oap_request);
1655                         oap->oap_request = ptlrpc_request_addref(new_req);
1656                 }
1657         }
1658
1659         /* XXX: This code will run into problem if we're going to support
1660          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1661          * and wait for all of them to be finished. We should inherit request
1662          * set from old request. */
1663         ptlrpcd_add_req(new_req);
1664
1665         DEBUG_REQ(D_INFO, new_req, "new request");
1666         RETURN(0);
1667 }
1668
1669 /*
1670  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1671  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1672  * fine for our small page arrays and doesn't require allocation.  its an
1673  * insertion sort that swaps elements that are strides apart, shrinking the
1674  * stride down until its '1' and the array is sorted.
1675  */
1676 static void sort_brw_pages(struct brw_page **array, int num)
1677 {
1678         int stride, i, j;
1679         struct brw_page *tmp;
1680
1681         if (num == 1)
1682                 return;
1683         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1684                 ;
1685
1686         do {
1687                 stride /= 3;
1688                 for (i = stride ; i < num ; i++) {
1689                         tmp = array[i];
1690                         j = i;
1691                         while (j >= stride && array[j - stride]->off > tmp->off) {
1692                                 array[j] = array[j - stride];
1693                                 j -= stride;
1694                         }
1695                         array[j] = tmp;
1696                 }
1697         } while (stride > 1);
1698 }
1699
1700 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1701 {
1702         LASSERT(ppga != NULL);
1703         OBD_FREE(ppga, sizeof(*ppga) * count);
1704 }
1705
1706 static int brw_interpret(const struct lu_env *env,
1707                          struct ptlrpc_request *req, void *data, int rc)
1708 {
1709         struct osc_brw_async_args *aa = data;
1710         struct osc_extent *ext;
1711         struct osc_extent *tmp;
1712         struct client_obd *cli = aa->aa_cli;
1713         ENTRY;
1714
1715         rc = osc_brw_fini_request(req, rc);
1716         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1717         /* When server return -EINPROGRESS, client should always retry
1718          * regardless of the number of times the bulk was resent already. */
1719         if (osc_recoverable_error(rc)) {
1720                 if (req->rq_import_generation !=
1721                     req->rq_import->imp_generation) {
1722                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1723                                ""DOSTID", rc = %d.\n",
1724                                req->rq_import->imp_obd->obd_name,
1725                                POSTID(&aa->aa_oa->o_oi), rc);
1726                 } else if (rc == -EINPROGRESS ||
1727                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1728                         rc = osc_brw_redo_request(req, aa, rc);
1729                 } else {
1730                         CERROR("%s: too many resent retries for object: "
1731                                "%llu:%llu, rc = %d.\n",
1732                                req->rq_import->imp_obd->obd_name,
1733                                POSTID(&aa->aa_oa->o_oi), rc);
1734                 }
1735
1736                 if (rc == 0)
1737                         RETURN(0);
1738                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1739                         rc = -EIO;
1740         }
1741
1742         if (rc == 0) {
1743                 struct obdo *oa = aa->aa_oa;
1744                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1745                 unsigned long valid = 0;
1746                 struct cl_object *obj;
1747                 struct osc_async_page *last;
1748
1749                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1750                 obj = osc2cl(last->oap_obj);
1751
1752                 cl_object_attr_lock(obj);
1753                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1754                         attr->cat_blocks = oa->o_blocks;
1755                         valid |= CAT_BLOCKS;
1756                 }
1757                 if (oa->o_valid & OBD_MD_FLMTIME) {
1758                         attr->cat_mtime = oa->o_mtime;
1759                         valid |= CAT_MTIME;
1760                 }
1761                 if (oa->o_valid & OBD_MD_FLATIME) {
1762                         attr->cat_atime = oa->o_atime;
1763                         valid |= CAT_ATIME;
1764                 }
1765                 if (oa->o_valid & OBD_MD_FLCTIME) {
1766                         attr->cat_ctime = oa->o_ctime;
1767                         valid |= CAT_CTIME;
1768                 }
1769
1770                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1771                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1772                         loff_t last_off = last->oap_count + last->oap_obj_off +
1773                                 last->oap_page_off;
1774
1775                         /* Change file size if this is an out of quota or
1776                          * direct IO write and it extends the file size */
1777                         if (loi->loi_lvb.lvb_size < last_off) {
1778                                 attr->cat_size = last_off;
1779                                 valid |= CAT_SIZE;
1780                         }
1781                         /* Extend KMS if it's not a lockless write */
1782                         if (loi->loi_kms < last_off &&
1783                             oap2osc_page(last)->ops_srvlock == 0) {
1784                                 attr->cat_kms = last_off;
1785                                 valid |= CAT_KMS;
1786                         }
1787                 }
1788
1789                 if (valid != 0)
1790                         cl_object_attr_update(env, obj, attr, valid);
1791                 cl_object_attr_unlock(obj);
1792         }
1793         OBDO_FREE(aa->aa_oa);
1794
1795         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1796                 osc_inc_unstable_pages(req);
1797
1798         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799                 list_del_init(&ext->oe_link);
1800                 osc_extent_finish(env, ext, 1, rc);
1801         }
1802         LASSERT(list_empty(&aa->aa_exts));
1803         LASSERT(list_empty(&aa->aa_oaps));
1804
1805         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1806         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1807
1808         spin_lock(&cli->cl_loi_list_lock);
1809         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1810          * is called so we know whether to go to sync BRWs or wait for more
1811          * RPCs to complete */
1812         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1813                 cli->cl_w_in_flight--;
1814         else
1815                 cli->cl_r_in_flight--;
1816         osc_wake_cache_waiters(cli);
1817         spin_unlock(&cli->cl_loi_list_lock);
1818
1819         osc_io_unplug(env, cli, NULL);
1820         RETURN(rc);
1821 }
1822
1823 static void brw_commit(struct ptlrpc_request *req)
1824 {
1825         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1826          * this called via the rq_commit_cb, I need to ensure
1827          * osc_dec_unstable_pages is still called. Otherwise unstable
1828          * pages may be leaked. */
1829         spin_lock(&req->rq_lock);
1830         if (likely(req->rq_unstable)) {
1831                 req->rq_unstable = 0;
1832                 spin_unlock(&req->rq_lock);
1833
1834                 osc_dec_unstable_pages(req);
1835         } else {
1836                 req->rq_committed = 1;
1837                 spin_unlock(&req->rq_lock);
1838         }
1839 }
1840
1841 /**
1842  * Build an RPC by the list of extent @ext_list. The caller must ensure
1843  * that the total pages in this list are NOT over max pages per RPC.
1844  * Extents in the list must be in OES_RPC state.
1845  */
1846 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1847                   struct list_head *ext_list, int cmd)
1848 {
1849         struct ptlrpc_request           *req = NULL;
1850         struct osc_extent               *ext;
1851         struct brw_page                 **pga = NULL;
1852         struct osc_brw_async_args       *aa = NULL;
1853         struct obdo                     *oa = NULL;
1854         struct osc_async_page           *oap;
1855         struct osc_object               *obj = NULL;
1856         struct cl_req_attr              *crattr = NULL;
1857         loff_t                          starting_offset = OBD_OBJECT_EOF;
1858         loff_t                          ending_offset = 0;
1859         int                             mpflag = 0;
1860         int                             mem_tight = 0;
1861         int                             page_count = 0;
1862         bool                            soft_sync = false;
1863         bool                            interrupted = false;
1864         int                             i;
1865         int                             grant = 0;
1866         int                             rc;
1867         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1868         struct ost_body                 *body;
1869         ENTRY;
1870         LASSERT(!list_empty(ext_list));
1871
1872         /* add pages into rpc_list to build BRW rpc */
1873         list_for_each_entry(ext, ext_list, oe_link) {
1874                 LASSERT(ext->oe_state == OES_RPC);
1875                 mem_tight |= ext->oe_memalloc;
1876                 grant += ext->oe_grants;
1877                 page_count += ext->oe_nr_pages;
1878                 if (obj == NULL)
1879                         obj = ext->oe_obj;
1880         }
1881
1882         soft_sync = osc_over_unstable_soft_limit(cli);
1883         if (mem_tight)
1884                 mpflag = cfs_memory_pressure_get_and_set();
1885
1886         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1887         if (pga == NULL)
1888                 GOTO(out, rc = -ENOMEM);
1889
1890         OBDO_ALLOC(oa);
1891         if (oa == NULL)
1892                 GOTO(out, rc = -ENOMEM);
1893
1894         i = 0;
1895         list_for_each_entry(ext, ext_list, oe_link) {
1896                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1897                         if (mem_tight)
1898                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1899                         if (soft_sync)
1900                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1901                         pga[i] = &oap->oap_brw_page;
1902                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1903                         i++;
1904
1905                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1906                         if (starting_offset == OBD_OBJECT_EOF ||
1907                             starting_offset > oap->oap_obj_off)
1908                                 starting_offset = oap->oap_obj_off;
1909                         else
1910                                 LASSERT(oap->oap_page_off == 0);
1911                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1912                                 ending_offset = oap->oap_obj_off +
1913                                                 oap->oap_count;
1914                         else
1915                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1916                                         PAGE_SIZE);
1917                         if (oap->oap_interrupted)
1918                                 interrupted = true;
1919                 }
1920         }
1921
1922         /* first page in the list */
1923         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1924
1925         crattr = &osc_env_info(env)->oti_req_attr;
1926         memset(crattr, 0, sizeof(*crattr));
1927         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1928         crattr->cra_flags = ~0ULL;
1929         crattr->cra_page = oap2cl_page(oap);
1930         crattr->cra_oa = oa;
1931         cl_req_attr_set(env, osc2cl(obj), crattr);
1932
1933         if (cmd == OBD_BRW_WRITE)
1934                 oa->o_grant_used = grant;
1935
1936         sort_brw_pages(pga, page_count);
1937         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1938         if (rc != 0) {
1939                 CERROR("prep_req failed: %d\n", rc);
1940                 GOTO(out, rc);
1941         }
1942
1943         req->rq_commit_cb = brw_commit;
1944         req->rq_interpret_reply = brw_interpret;
1945         req->rq_memalloc = mem_tight != 0;
1946         oap->oap_request = ptlrpc_request_addref(req);
1947         if (interrupted && !req->rq_intr)
1948                 ptlrpc_mark_interrupted(req);
1949
1950         /* Need to update the timestamps after the request is built in case
1951          * we race with setattr (locally or in queue at OST).  If OST gets
1952          * later setattr before earlier BRW (as determined by the request xid),
1953          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1954          * way to do this in a single call.  bug 10150 */
1955         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1956         crattr->cra_oa = &body->oa;
1957         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1958         cl_req_attr_set(env, osc2cl(obj), crattr);
1959         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1960
1961         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1962         aa = ptlrpc_req_async_args(req);
1963         INIT_LIST_HEAD(&aa->aa_oaps);
1964         list_splice_init(&rpc_list, &aa->aa_oaps);
1965         INIT_LIST_HEAD(&aa->aa_exts);
1966         list_splice_init(ext_list, &aa->aa_exts);
1967
1968         spin_lock(&cli->cl_loi_list_lock);
1969         starting_offset >>= PAGE_SHIFT;
1970         if (cmd == OBD_BRW_READ) {
1971                 cli->cl_r_in_flight++;
1972                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1973                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1974                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1975                                       starting_offset + 1);
1976         } else {
1977                 cli->cl_w_in_flight++;
1978                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1979                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1980                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1981                                       starting_offset + 1);
1982         }
1983         spin_unlock(&cli->cl_loi_list_lock);
1984
1985         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1986                   page_count, aa, cli->cl_r_in_flight,
1987                   cli->cl_w_in_flight);
1988         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1989
1990         ptlrpcd_add_req(req);
1991         rc = 0;
1992         EXIT;
1993
1994 out:
1995         if (mem_tight != 0)
1996                 cfs_memory_pressure_restore(mpflag);
1997
1998         if (rc != 0) {
1999                 LASSERT(req == NULL);
2000
2001                 if (oa)
2002                         OBDO_FREE(oa);
2003                 if (pga)
2004                         OBD_FREE(pga, sizeof(*pga) * page_count);
2005                 /* this should happen rarely and is pretty bad, it makes the
2006                  * pending list not follow the dirty order */
2007                 while (!list_empty(ext_list)) {
2008                         ext = list_entry(ext_list->next, struct osc_extent,
2009                                          oe_link);
2010                         list_del_init(&ext->oe_link);
2011                         osc_extent_finish(env, ext, 0, rc);
2012                 }
2013         }
2014         RETURN(rc);
2015 }
2016
2017 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2018 {
2019         int set = 0;
2020
2021         LASSERT(lock != NULL);
2022
2023         lock_res_and_lock(lock);
2024
2025         if (lock->l_ast_data == NULL)
2026                 lock->l_ast_data = data;
2027         if (lock->l_ast_data == data)
2028                 set = 1;
2029
2030         unlock_res_and_lock(lock);
2031
2032         return set;
2033 }
2034
2035 static int osc_enqueue_fini(struct ptlrpc_request *req,
2036                             osc_enqueue_upcall_f upcall, void *cookie,
2037                             struct lustre_handle *lockh, enum ldlm_mode mode,
2038                             __u64 *flags, bool speculative, int errcode)
2039 {
2040         bool intent = *flags & LDLM_FL_HAS_INTENT;
2041         int rc;
2042         ENTRY;
2043
2044         /* The request was created before ldlm_cli_enqueue call. */
2045         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2046                 struct ldlm_reply *rep;
2047
2048                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2049                 LASSERT(rep != NULL);
2050
2051                 rep->lock_policy_res1 =
2052                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2053                 if (rep->lock_policy_res1)
2054                         errcode = rep->lock_policy_res1;
2055                 if (!speculative)
2056                         *flags |= LDLM_FL_LVB_READY;
2057         } else if (errcode == ELDLM_OK) {
2058                 *flags |= LDLM_FL_LVB_READY;
2059         }
2060
2061         /* Call the update callback. */
2062         rc = (*upcall)(cookie, lockh, errcode);
2063
2064         /* release the reference taken in ldlm_cli_enqueue() */
2065         if (errcode == ELDLM_LOCK_MATCHED)
2066                 errcode = ELDLM_OK;
2067         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2068                 ldlm_lock_decref(lockh, mode);
2069
2070         RETURN(rc);
2071 }
2072
2073 static int osc_enqueue_interpret(const struct lu_env *env,
2074                                  struct ptlrpc_request *req,
2075                                  struct osc_enqueue_args *aa, int rc)
2076 {
2077         struct ldlm_lock *lock;
2078         struct lustre_handle *lockh = &aa->oa_lockh;
2079         enum ldlm_mode mode = aa->oa_mode;
2080         struct ost_lvb *lvb = aa->oa_lvb;
2081         __u32 lvb_len = sizeof(*lvb);
2082         __u64 flags = 0;
2083
2084         ENTRY;
2085
2086         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2087          * be valid. */
2088         lock = ldlm_handle2lock(lockh);
2089         LASSERTF(lock != NULL,
2090                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2091                  lockh->cookie, req, aa);
2092
2093         /* Take an additional reference so that a blocking AST that
2094          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2095          * to arrive after an upcall has been executed by
2096          * osc_enqueue_fini(). */
2097         ldlm_lock_addref(lockh, mode);
2098
2099         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2100         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2101
2102         /* Let CP AST to grant the lock first. */
2103         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2104
2105         if (aa->oa_speculative) {
2106                 LASSERT(aa->oa_lvb == NULL);
2107                 LASSERT(aa->oa_flags == NULL);
2108                 aa->oa_flags = &flags;
2109         }
2110
2111         /* Complete obtaining the lock procedure. */
2112         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2113                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2114                                    lockh, rc);
2115         /* Complete osc stuff. */
2116         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2117                               aa->oa_flags, aa->oa_speculative, rc);
2118
2119         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2120
2121         ldlm_lock_decref(lockh, mode);
2122         LDLM_LOCK_PUT(lock);
2123         RETURN(rc);
2124 }
2125
2126 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2127
2128 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2129  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2130  * other synchronous requests, however keeping some locks and trying to obtain
2131  * others may take a considerable amount of time in a case of ost failure; and
2132  * when other sync requests do not get released lock from a client, the client
2133  * is evicted from the cluster -- such scenarious make the life difficult, so
2134  * release locks just after they are obtained. */
2135 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2136                      __u64 *flags, union ldlm_policy_data *policy,
2137                      struct ost_lvb *lvb, int kms_valid,
2138                      osc_enqueue_upcall_f upcall, void *cookie,
2139                      struct ldlm_enqueue_info *einfo,
2140                      struct ptlrpc_request_set *rqset, int async,
2141                      bool speculative)
2142 {
2143         struct obd_device *obd = exp->exp_obd;
2144         struct lustre_handle lockh = { 0 };
2145         struct ptlrpc_request *req = NULL;
2146         int intent = *flags & LDLM_FL_HAS_INTENT;
2147         __u64 match_flags = *flags;
2148         enum ldlm_mode mode;
2149         int rc;
2150         ENTRY;
2151
2152         /* Filesystem lock extents are extended to page boundaries so that
2153          * dealing with the page cache is a little smoother.  */
2154         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2155         policy->l_extent.end |= ~PAGE_MASK;
2156
2157         /*
2158          * kms is not valid when either object is completely fresh (so that no
2159          * locks are cached), or object was evicted. In the latter case cached
2160          * lock cannot be used, because it would prime inode state with
2161          * potentially stale LVB.
2162          */
2163         if (!kms_valid)
2164                 goto no_match;
2165
2166         /* Next, search for already existing extent locks that will cover us */
2167         /* If we're trying to read, we also search for an existing PW lock.  The
2168          * VFS and page cache already protect us locally, so lots of readers/
2169          * writers can share a single PW lock.
2170          *
2171          * There are problems with conversion deadlocks, so instead of
2172          * converting a read lock to a write lock, we'll just enqueue a new
2173          * one.
2174          *
2175          * At some point we should cancel the read lock instead of making them
2176          * send us a blocking callback, but there are problems with canceling
2177          * locks out from other users right now, too. */
2178         mode = einfo->ei_mode;
2179         if (einfo->ei_mode == LCK_PR)
2180                 mode |= LCK_PW;
2181         /* Normal lock requests must wait for the LVB to be ready before
2182          * matching a lock; speculative lock requests do not need to,
2183          * because they will not actually use the lock. */
2184         if (!speculative)
2185                 match_flags |= LDLM_FL_LVB_READY;
2186         if (intent != 0)
2187                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2188         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2189                                einfo->ei_type, policy, mode, &lockh, 0);
2190         if (mode) {
2191                 struct ldlm_lock *matched;
2192
2193                 if (*flags & LDLM_FL_TEST_LOCK)
2194                         RETURN(ELDLM_OK);
2195
2196                 matched = ldlm_handle2lock(&lockh);
2197                 if (speculative) {
2198                         /* This DLM lock request is speculative, and does not
2199                          * have an associated IO request. Therefore if there
2200                          * is already a DLM lock, it wll just inform the
2201                          * caller to cancel the request for this stripe.*/
2202                         lock_res_and_lock(matched);
2203                         if (ldlm_extent_equal(&policy->l_extent,
2204                             &matched->l_policy_data.l_extent))
2205                                 rc = -EEXIST;
2206                         else
2207                                 rc = -ECANCELED;
2208                         unlock_res_and_lock(matched);
2209
2210                         ldlm_lock_decref(&lockh, mode);
2211                         LDLM_LOCK_PUT(matched);
2212                         RETURN(rc);
2213                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2214                         *flags |= LDLM_FL_LVB_READY;
2215
2216                         /* We already have a lock, and it's referenced. */
2217                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2218
2219                         ldlm_lock_decref(&lockh, mode);
2220                         LDLM_LOCK_PUT(matched);
2221                         RETURN(ELDLM_OK);
2222                 } else {
2223                         ldlm_lock_decref(&lockh, mode);
2224                         LDLM_LOCK_PUT(matched);
2225                 }
2226         }
2227
2228 no_match:
2229         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2230                 RETURN(-ENOLCK);
2231
2232         if (intent) {
2233                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2234                                            &RQF_LDLM_ENQUEUE_LVB);
2235                 if (req == NULL)
2236                         RETURN(-ENOMEM);
2237
2238                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2239                 if (rc) {
2240                         ptlrpc_request_free(req);
2241                         RETURN(rc);
2242                 }
2243
2244                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2245                                      sizeof *lvb);
2246                 ptlrpc_request_set_replen(req);
2247         }
2248
2249         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2250         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2251
2252         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2253                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2254         if (async) {
2255                 if (!rc) {
2256                         struct osc_enqueue_args *aa;
2257                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2258                         aa = ptlrpc_req_async_args(req);
2259                         aa->oa_exp         = exp;
2260                         aa->oa_mode        = einfo->ei_mode;
2261                         aa->oa_type        = einfo->ei_type;
2262                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2263                         aa->oa_upcall      = upcall;
2264                         aa->oa_cookie      = cookie;
2265                         aa->oa_speculative = speculative;
2266                         if (!speculative) {
2267                                 aa->oa_flags  = flags;
2268                                 aa->oa_lvb    = lvb;
2269                         } else {
2270                                 /* speculative locks are essentially to enqueue
2271                                  * a DLM lock  in advance, so we don't care
2272                                  * about the result of the enqueue. */
2273                                 aa->oa_lvb    = NULL;
2274                                 aa->oa_flags  = NULL;
2275                         }
2276
2277                         req->rq_interpret_reply =
2278                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2279                         if (rqset == PTLRPCD_SET)
2280                                 ptlrpcd_add_req(req);
2281                         else
2282                                 ptlrpc_set_add_req(rqset, req);
2283                 } else if (intent) {
2284                         ptlrpc_req_finished(req);
2285                 }
2286                 RETURN(rc);
2287         }
2288
2289         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2290                               flags, speculative, rc);
2291         if (intent)
2292                 ptlrpc_req_finished(req);
2293
2294         RETURN(rc);
2295 }
2296
2297 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2298                    enum ldlm_type type, union ldlm_policy_data *policy,
2299                    enum ldlm_mode mode, __u64 *flags, void *data,
2300                    struct lustre_handle *lockh, int unref)
2301 {
2302         struct obd_device *obd = exp->exp_obd;
2303         __u64 lflags = *flags;
2304         enum ldlm_mode rc;
2305         ENTRY;
2306
2307         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2308                 RETURN(-EIO);
2309
2310         /* Filesystem lock extents are extended to page boundaries so that
2311          * dealing with the page cache is a little smoother */
2312         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2313         policy->l_extent.end |= ~PAGE_MASK;
2314
2315         /* Next, search for already existing extent locks that will cover us */
2316         /* If we're trying to read, we also search for an existing PW lock.  The
2317          * VFS and page cache already protect us locally, so lots of readers/
2318          * writers can share a single PW lock. */
2319         rc = mode;
2320         if (mode == LCK_PR)
2321                 rc |= LCK_PW;
2322         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2323                              res_id, type, policy, rc, lockh, unref);
2324         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2325                 RETURN(rc);
2326
2327         if (data != NULL) {
2328                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2329
2330                 LASSERT(lock != NULL);
2331                 if (!osc_set_lock_data(lock, data)) {
2332                         ldlm_lock_decref(lockh, rc);
2333                         rc = 0;
2334                 }
2335                 LDLM_LOCK_PUT(lock);
2336         }
2337         RETURN(rc);
2338 }
2339
2340 static int osc_statfs_interpret(const struct lu_env *env,
2341                                 struct ptlrpc_request *req,
2342                                 struct osc_async_args *aa, int rc)
2343 {
2344         struct obd_statfs *msfs;
2345         ENTRY;
2346
2347         if (rc == -EBADR)
2348                 /* The request has in fact never been sent
2349                  * due to issues at a higher level (LOV).
2350                  * Exit immediately since the caller is
2351                  * aware of the problem and takes care
2352                  * of the clean up */
2353                  RETURN(rc);
2354
2355         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2356             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2357                 GOTO(out, rc = 0);
2358
2359         if (rc != 0)
2360                 GOTO(out, rc);
2361
2362         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2363         if (msfs == NULL) {
2364                 GOTO(out, rc = -EPROTO);
2365         }
2366
2367         *aa->aa_oi->oi_osfs = *msfs;
2368 out:
2369         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2370         RETURN(rc);
2371 }
2372
2373 static int osc_statfs_async(struct obd_export *exp,
2374                             struct obd_info *oinfo, __u64 max_age,
2375                             struct ptlrpc_request_set *rqset)
2376 {
2377         struct obd_device     *obd = class_exp2obd(exp);
2378         struct ptlrpc_request *req;
2379         struct osc_async_args *aa;
2380         int                    rc;
2381         ENTRY;
2382
2383         /* We could possibly pass max_age in the request (as an absolute
2384          * timestamp or a "seconds.usec ago") so the target can avoid doing
2385          * extra calls into the filesystem if that isn't necessary (e.g.
2386          * during mount that would help a bit).  Having relative timestamps
2387          * is not so great if request processing is slow, while absolute
2388          * timestamps are not ideal because they need time synchronization. */
2389         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2390         if (req == NULL)
2391                 RETURN(-ENOMEM);
2392
2393         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2394         if (rc) {
2395                 ptlrpc_request_free(req);
2396                 RETURN(rc);
2397         }
2398         ptlrpc_request_set_replen(req);
2399         req->rq_request_portal = OST_CREATE_PORTAL;
2400         ptlrpc_at_set_req_timeout(req);
2401
2402         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2403                 /* procfs requests not want stat in wait for avoid deadlock */
2404                 req->rq_no_resend = 1;
2405                 req->rq_no_delay = 1;
2406         }
2407
2408         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2409         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2410         aa = ptlrpc_req_async_args(req);
2411         aa->aa_oi = oinfo;
2412
2413         ptlrpc_set_add_req(rqset, req);
2414         RETURN(0);
2415 }
2416
2417 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2418                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2419 {
2420         struct obd_device     *obd = class_exp2obd(exp);
2421         struct obd_statfs     *msfs;
2422         struct ptlrpc_request *req;
2423         struct obd_import     *imp = NULL;
2424         int rc;
2425         ENTRY;
2426
2427         /*Since the request might also come from lprocfs, so we need
2428          *sync this with client_disconnect_export Bug15684*/
2429         down_read(&obd->u.cli.cl_sem);
2430         if (obd->u.cli.cl_import)
2431                 imp = class_import_get(obd->u.cli.cl_import);
2432         up_read(&obd->u.cli.cl_sem);
2433         if (!imp)
2434                 RETURN(-ENODEV);
2435
2436         /* We could possibly pass max_age in the request (as an absolute
2437          * timestamp or a "seconds.usec ago") so the target can avoid doing
2438          * extra calls into the filesystem if that isn't necessary (e.g.
2439          * during mount that would help a bit).  Having relative timestamps
2440          * is not so great if request processing is slow, while absolute
2441          * timestamps are not ideal because they need time synchronization. */
2442         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2443
2444         class_import_put(imp);
2445
2446         if (req == NULL)
2447                 RETURN(-ENOMEM);
2448
2449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2450         if (rc) {
2451                 ptlrpc_request_free(req);
2452                 RETURN(rc);
2453         }
2454         ptlrpc_request_set_replen(req);
2455         req->rq_request_portal = OST_CREATE_PORTAL;
2456         ptlrpc_at_set_req_timeout(req);
2457
2458         if (flags & OBD_STATFS_NODELAY) {
2459                 /* procfs requests not want stat in wait for avoid deadlock */
2460                 req->rq_no_resend = 1;
2461                 req->rq_no_delay = 1;
2462         }
2463
2464         rc = ptlrpc_queue_wait(req);
2465         if (rc)
2466                 GOTO(out, rc);
2467
2468         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2469         if (msfs == NULL) {
2470                 GOTO(out, rc = -EPROTO);
2471         }
2472
2473         *osfs = *msfs;
2474
2475         EXIT;
2476  out:
2477         ptlrpc_req_finished(req);
2478         return rc;
2479 }
2480
2481 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2482                          void *karg, void __user *uarg)
2483 {
2484         struct obd_device *obd = exp->exp_obd;
2485         struct obd_ioctl_data *data = karg;
2486         int err = 0;
2487         ENTRY;
2488
2489         if (!try_module_get(THIS_MODULE)) {
2490                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2491                        module_name(THIS_MODULE));
2492                 return -EINVAL;
2493         }
2494         switch (cmd) {
2495         case OBD_IOC_CLIENT_RECOVER:
2496                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2497                                             data->ioc_inlbuf1, 0);
2498                 if (err > 0)
2499                         err = 0;
2500                 GOTO(out, err);
2501         case IOC_OSC_SET_ACTIVE:
2502                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2503                                                data->ioc_offset);
2504                 GOTO(out, err);
2505         case OBD_IOC_PING_TARGET:
2506                 err = ptlrpc_obd_ping(obd);
2507                 GOTO(out, err);
2508         default:
2509                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2510                        cmd, current_comm());
2511                 GOTO(out, err = -ENOTTY);
2512         }
2513 out:
2514         module_put(THIS_MODULE);
2515         return err;
2516 }
2517
2518 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2519                               u32 keylen, void *key,
2520                               u32 vallen, void *val,
2521                               struct ptlrpc_request_set *set)
2522 {
2523         struct ptlrpc_request *req;
2524         struct obd_device     *obd = exp->exp_obd;
2525         struct obd_import     *imp = class_exp2cliimp(exp);
2526         char                  *tmp;
2527         int                    rc;
2528         ENTRY;
2529
2530         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2531
2532         if (KEY_IS(KEY_CHECKSUM)) {
2533                 if (vallen != sizeof(int))
2534                         RETURN(-EINVAL);
2535                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2536                 RETURN(0);
2537         }
2538
2539         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2540                 sptlrpc_conf_client_adapt(obd);
2541                 RETURN(0);
2542         }
2543
2544         if (KEY_IS(KEY_FLUSH_CTX)) {
2545                 sptlrpc_import_flush_my_ctx(imp);
2546                 RETURN(0);
2547         }
2548
2549         if (KEY_IS(KEY_CACHE_SET)) {
2550                 struct client_obd *cli = &obd->u.cli;
2551
2552                 LASSERT(cli->cl_cache == NULL); /* only once */
2553                 cli->cl_cache = (struct cl_client_cache *)val;
2554                 cl_cache_incref(cli->cl_cache);
2555                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2556
2557                 /* add this osc into entity list */
2558                 LASSERT(list_empty(&cli->cl_lru_osc));
2559                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2560                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2561                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2562
2563                 RETURN(0);
2564         }
2565
2566         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2567                 struct client_obd *cli = &obd->u.cli;
2568                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2569                 long target = *(long *)val;
2570
2571                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2572                 *(long *)val -= nr;
2573                 RETURN(0);
2574         }
2575
2576         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2577                 RETURN(-EINVAL);
2578
2579         /* We pass all other commands directly to OST. Since nobody calls osc
2580            methods directly and everybody is supposed to go through LOV, we
2581            assume lov checked invalid values for us.
2582            The only recognised values so far are evict_by_nid and mds_conn.
2583            Even if something bad goes through, we'd get a -EINVAL from OST
2584            anyway. */
2585
2586         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2587                                                 &RQF_OST_SET_GRANT_INFO :
2588                                                 &RQF_OBD_SET_INFO);
2589         if (req == NULL)
2590                 RETURN(-ENOMEM);
2591
2592         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2593                              RCL_CLIENT, keylen);
2594         if (!KEY_IS(KEY_GRANT_SHRINK))
2595                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2596                                      RCL_CLIENT, vallen);
2597         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2598         if (rc) {
2599                 ptlrpc_request_free(req);
2600                 RETURN(rc);
2601         }
2602
2603         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2604         memcpy(tmp, key, keylen);
2605         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2606                                                         &RMF_OST_BODY :
2607                                                         &RMF_SETINFO_VAL);
2608         memcpy(tmp, val, vallen);
2609
2610         if (KEY_IS(KEY_GRANT_SHRINK)) {
2611                 struct osc_grant_args *aa;
2612                 struct obdo *oa;
2613
2614                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2615                 aa = ptlrpc_req_async_args(req);
2616                 OBDO_ALLOC(oa);
2617                 if (!oa) {
2618                         ptlrpc_req_finished(req);
2619                         RETURN(-ENOMEM);
2620                 }
2621                 *oa = ((struct ost_body *)val)->oa;
2622                 aa->aa_oa = oa;
2623                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2624         }
2625
2626         ptlrpc_request_set_replen(req);
2627         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2628                 LASSERT(set != NULL);
2629                 ptlrpc_set_add_req(set, req);
2630                 ptlrpc_check_set(NULL, set);
2631         } else {
2632                 ptlrpcd_add_req(req);
2633         }
2634
2635         RETURN(0);
2636 }
2637
2638 static int osc_reconnect(const struct lu_env *env,
2639                          struct obd_export *exp, struct obd_device *obd,
2640                          struct obd_uuid *cluuid,
2641                          struct obd_connect_data *data,
2642                          void *localdata)
2643 {
2644         struct client_obd *cli = &obd->u.cli;
2645
2646         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2647                 long lost_grant;
2648                 long grant;
2649
2650                 spin_lock(&cli->cl_loi_list_lock);
2651                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2652                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2653                         grant += cli->cl_dirty_grant;
2654                 else
2655                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2656                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2657                 lost_grant = cli->cl_lost_grant;
2658                 cli->cl_lost_grant = 0;
2659                 spin_unlock(&cli->cl_loi_list_lock);
2660
2661                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2662                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2663                        data->ocd_version, data->ocd_grant, lost_grant);
2664         }
2665
2666         RETURN(0);
2667 }
2668
2669 static int osc_disconnect(struct obd_export *exp)
2670 {
2671         struct obd_device *obd = class_exp2obd(exp);
2672         int rc;
2673
2674         rc = client_disconnect_export(exp);
2675         /**
2676          * Initially we put del_shrink_grant before disconnect_export, but it
2677          * causes the following problem if setup (connect) and cleanup
2678          * (disconnect) are tangled together.
2679          *      connect p1                     disconnect p2
2680          *   ptlrpc_connect_import
2681          *     ...............               class_manual_cleanup
2682          *                                     osc_disconnect
2683          *                                     del_shrink_grant
2684          *   ptlrpc_connect_interrupt
2685          *     init_grant_shrink
2686          *   add this client to shrink list
2687          *                                      cleanup_osc
2688          * Bang! pinger trigger the shrink.
2689          * So the osc should be disconnected from the shrink list, after we
2690          * are sure the import has been destroyed. BUG18662
2691          */
2692         if (obd->u.cli.cl_import == NULL)
2693                 osc_del_shrink_grant(&obd->u.cli);
2694         return rc;
2695 }
2696
2697 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2698         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2699 {
2700         struct lu_env *env = arg;
2701         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2702         struct ldlm_lock *lock;
2703         struct osc_object *osc = NULL;
2704         ENTRY;
2705
2706         lock_res(res);
2707         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2708                 if (lock->l_ast_data != NULL && osc == NULL) {
2709                         osc = lock->l_ast_data;
2710                         cl_object_get(osc2cl(osc));
2711                 }
2712
2713                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2714                  * by the 2nd round of ldlm_namespace_clean() call in
2715                  * osc_import_event(). */
2716                 ldlm_clear_cleaned(lock);
2717         }
2718         unlock_res(res);
2719
2720         if (osc != NULL) {
2721                 osc_object_invalidate(env, osc);
2722                 cl_object_put(env, osc2cl(osc));
2723         }
2724
2725         RETURN(0);
2726 }
2727
2728 static int osc_import_event(struct obd_device *obd,
2729                             struct obd_import *imp,
2730                             enum obd_import_event event)
2731 {
2732         struct client_obd *cli;
2733         int rc = 0;
2734
2735         ENTRY;
2736         LASSERT(imp->imp_obd == obd);
2737
2738         switch (event) {
2739         case IMP_EVENT_DISCON: {
2740                 cli = &obd->u.cli;
2741                 spin_lock(&cli->cl_loi_list_lock);
2742                 cli->cl_avail_grant = 0;
2743                 cli->cl_lost_grant = 0;
2744                 spin_unlock(&cli->cl_loi_list_lock);
2745                 break;
2746         }
2747         case IMP_EVENT_INACTIVE: {
2748                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2749                 break;
2750         }
2751         case IMP_EVENT_INVALIDATE: {
2752                 struct ldlm_namespace *ns = obd->obd_namespace;
2753                 struct lu_env         *env;
2754                 __u16                  refcheck;
2755
2756                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2757
2758                 env = cl_env_get(&refcheck);
2759                 if (!IS_ERR(env)) {
2760                         osc_io_unplug(env, &obd->u.cli, NULL);
2761
2762                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2763                                                  osc_ldlm_resource_invalidate,
2764                                                  env, 0);
2765                         cl_env_put(env, &refcheck);
2766
2767                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2768                 } else
2769                         rc = PTR_ERR(env);
2770                 break;
2771         }
2772         case IMP_EVENT_ACTIVE: {
2773                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2774                 break;
2775         }
2776         case IMP_EVENT_OCD: {
2777                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2778
2779                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2780                         osc_init_grant(&obd->u.cli, ocd);
2781
2782                 /* See bug 7198 */
2783                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2784                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2785
2786                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2787                 break;
2788         }
2789         case IMP_EVENT_DEACTIVATE: {
2790                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2791                 break;
2792         }
2793         case IMP_EVENT_ACTIVATE: {
2794                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2795                 break;
2796         }
2797         default:
2798                 CERROR("Unknown import event %d\n", event);
2799                 LBUG();
2800         }
2801         RETURN(rc);
2802 }
2803
2804 /**
2805  * Determine whether the lock can be canceled before replaying the lock
2806  * during recovery, see bug16774 for detailed information.
2807  *
2808  * \retval zero the lock can't be canceled
2809  * \retval other ok to cancel
2810  */
2811 static int osc_cancel_weight(struct ldlm_lock *lock)
2812 {
2813         /*
2814          * Cancel all unused and granted extent lock.
2815          */
2816         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2817             lock->l_granted_mode == lock->l_req_mode &&
2818             osc_ldlm_weigh_ast(lock) == 0)
2819                 RETURN(1);
2820
2821         RETURN(0);
2822 }
2823
2824 static int brw_queue_work(const struct lu_env *env, void *data)
2825 {
2826         struct client_obd *cli = data;
2827
2828         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2829
2830         osc_io_unplug(env, cli, NULL);
2831         RETURN(0);
2832 }
2833
2834 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2835 {
2836         struct client_obd *cli = &obd->u.cli;
2837         struct obd_type   *type;
2838         void              *handler;
2839         int                rc;
2840         int                adding;
2841         int                added;
2842         int                req_count;
2843         ENTRY;
2844
2845         rc = ptlrpcd_addref();
2846         if (rc)
2847                 RETURN(rc);
2848
2849         rc = client_obd_setup(obd, lcfg);
2850         if (rc)
2851                 GOTO(out_ptlrpcd, rc);
2852
2853         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2854         if (IS_ERR(handler))
2855                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2856         cli->cl_writeback_work = handler;
2857
2858         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2859         if (IS_ERR(handler))
2860                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2861         cli->cl_lru_work = handler;
2862
2863         rc = osc_quota_setup(obd);
2864         if (rc)
2865                 GOTO(out_ptlrpcd_work, rc);
2866
2867         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2868
2869 #ifdef CONFIG_PROC_FS
2870         obd->obd_vars = lprocfs_osc_obd_vars;
2871 #endif
2872         /* If this is true then both client (osc) and server (osp) are on the
2873          * same node. The osp layer if loaded first will register the osc proc
2874          * directory. In that case this obd_device will be attached its proc
2875          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2876          */
2877         type = class_search_type(LUSTRE_OSP_NAME);
2878         if (type && type->typ_procsym) {
2879                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2880                                                        type->typ_procsym,
2881                                                        obd->obd_vars, obd);
2882                 if (IS_ERR(obd->obd_proc_entry)) {
2883                         rc = PTR_ERR(obd->obd_proc_entry);
2884                         CERROR("error %d setting up lprocfs for %s\n", rc,
2885                                obd->obd_name);
2886                         obd->obd_proc_entry = NULL;
2887                 }
2888         }
2889
2890         rc = lprocfs_obd_setup(obd, false);
2891         if (!rc) {
2892                 /* If the basic OSC proc tree construction succeeded then
2893                  * lets do the rest.
2894                  */
2895                 lproc_osc_attach_seqstat(obd);
2896                 sptlrpc_lprocfs_cliobd_attach(obd);
2897                 ptlrpc_lprocfs_register_obd(obd);
2898         }
2899
2900         /*
2901          * We try to control the total number of requests with a upper limit
2902          * osc_reqpool_maxreqcount. There might be some race which will cause
2903          * over-limit allocation, but it is fine.
2904          */
2905         req_count = atomic_read(&osc_pool_req_count);
2906         if (req_count < osc_reqpool_maxreqcount) {
2907                 adding = cli->cl_max_rpcs_in_flight + 2;
2908                 if (req_count + adding > osc_reqpool_maxreqcount)
2909                         adding = osc_reqpool_maxreqcount - req_count;
2910
2911                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2912                 atomic_add(added, &osc_pool_req_count);
2913         }
2914
2915         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2916         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2917
2918         spin_lock(&osc_shrink_lock);
2919         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2920         spin_unlock(&osc_shrink_lock);
2921
2922         RETURN(0);
2923
2924 out_ptlrpcd_work:
2925         if (cli->cl_writeback_work != NULL) {
2926                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2927                 cli->cl_writeback_work = NULL;
2928         }
2929         if (cli->cl_lru_work != NULL) {
2930                 ptlrpcd_destroy_work(cli->cl_lru_work);
2931                 cli->cl_lru_work = NULL;
2932         }
2933 out_client_setup:
2934         client_obd_cleanup(obd);
2935 out_ptlrpcd:
2936         ptlrpcd_decref();
2937         RETURN(rc);
2938 }
2939
2940 static int osc_precleanup(struct obd_device *obd)
2941 {
2942         struct client_obd *cli = &obd->u.cli;
2943         ENTRY;
2944
2945         /* LU-464
2946          * for echo client, export may be on zombie list, wait for
2947          * zombie thread to cull it, because cli.cl_import will be
2948          * cleared in client_disconnect_export():
2949          *   class_export_destroy() -> obd_cleanup() ->
2950          *   echo_device_free() -> echo_client_cleanup() ->
2951          *   obd_disconnect() -> osc_disconnect() ->
2952          *   client_disconnect_export()
2953          */
2954         obd_zombie_barrier();
2955         if (cli->cl_writeback_work) {
2956                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2957                 cli->cl_writeback_work = NULL;
2958         }
2959
2960         if (cli->cl_lru_work) {
2961                 ptlrpcd_destroy_work(cli->cl_lru_work);
2962                 cli->cl_lru_work = NULL;
2963         }
2964
2965         obd_cleanup_client_import(obd);
2966         ptlrpc_lprocfs_unregister_obd(obd);
2967         lprocfs_obd_cleanup(obd);
2968         RETURN(0);
2969 }
2970
2971 int osc_cleanup(struct obd_device *obd)
2972 {
2973         struct client_obd *cli = &obd->u.cli;
2974         int rc;
2975
2976         ENTRY;
2977
2978         spin_lock(&osc_shrink_lock);
2979         list_del(&cli->cl_shrink_list);
2980         spin_unlock(&osc_shrink_lock);
2981
2982         /* lru cleanup */
2983         if (cli->cl_cache != NULL) {
2984                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2985                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2986                 list_del_init(&cli->cl_lru_osc);
2987                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2988                 cli->cl_lru_left = NULL;
2989                 cl_cache_decref(cli->cl_cache);
2990                 cli->cl_cache = NULL;
2991         }
2992
2993         /* free memory of osc quota cache */
2994         osc_quota_cleanup(obd);
2995
2996         rc = client_obd_cleanup(obd);
2997
2998         ptlrpcd_decref();
2999         RETURN(rc);
3000 }
3001
3002 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3003 {
3004         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3005         return rc > 0 ? 0: rc;
3006 }
3007
3008 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3009 {
3010         return osc_process_config_base(obd, buf);
3011 }
3012
3013 static struct obd_ops osc_obd_ops = {
3014         .o_owner                = THIS_MODULE,
3015         .o_setup                = osc_setup,
3016         .o_precleanup           = osc_precleanup,
3017         .o_cleanup              = osc_cleanup,
3018         .o_add_conn             = client_import_add_conn,
3019         .o_del_conn             = client_import_del_conn,
3020         .o_connect              = client_connect_import,
3021         .o_reconnect            = osc_reconnect,
3022         .o_disconnect           = osc_disconnect,
3023         .o_statfs               = osc_statfs,
3024         .o_statfs_async         = osc_statfs_async,
3025         .o_create               = osc_create,
3026         .o_destroy              = osc_destroy,
3027         .o_getattr              = osc_getattr,
3028         .o_setattr              = osc_setattr,
3029         .o_iocontrol            = osc_iocontrol,
3030         .o_set_info_async       = osc_set_info_async,
3031         .o_import_event         = osc_import_event,
3032         .o_process_config       = osc_process_config,
3033         .o_quotactl             = osc_quotactl,
3034 };
3035
3036 static struct shrinker *osc_cache_shrinker;
3037 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3038 DEFINE_SPINLOCK(osc_shrink_lock);
3039
3040 #ifndef HAVE_SHRINKER_COUNT
3041 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3042 {
3043         struct shrink_control scv = {
3044                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3045                 .gfp_mask   = shrink_param(sc, gfp_mask)
3046         };
3047 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3048         struct shrinker *shrinker = NULL;
3049 #endif
3050
3051         (void)osc_cache_shrink_scan(shrinker, &scv);
3052
3053         return osc_cache_shrink_count(shrinker, &scv);
3054 }
3055 #endif
3056
3057 static int __init osc_init(void)
3058 {
3059         bool enable_proc = true;
3060         struct obd_type *type;
3061         unsigned int reqpool_size;
3062         unsigned int reqsize;
3063         int rc;
3064         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3065                          osc_cache_shrink_count, osc_cache_shrink_scan);
3066         ENTRY;
3067
3068         /* print an address of _any_ initialized kernel symbol from this
3069          * module, to allow debugging with gdb that doesn't support data
3070          * symbols from modules.*/
3071         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3072
3073         rc = lu_kmem_init(osc_caches);
3074         if (rc)
3075                 RETURN(rc);
3076
3077         type = class_search_type(LUSTRE_OSP_NAME);
3078         if (type != NULL && type->typ_procsym != NULL)
3079                 enable_proc = false;
3080
3081         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3082                                  LUSTRE_OSC_NAME, &osc_device_type);
3083         if (rc)
3084                 GOTO(out_kmem, rc);
3085
3086         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3087
3088         /* This is obviously too much memory, only prevent overflow here */
3089         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3090                 GOTO(out_type, rc = -EINVAL);
3091
3092         reqpool_size = osc_reqpool_mem_max << 20;
3093
3094         reqsize = 1;
3095         while (reqsize < OST_IO_MAXREQSIZE)
3096                 reqsize = reqsize << 1;
3097
3098         /*
3099          * We don't enlarge the request count in OSC pool according to
3100          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3101          * tried after normal allocation failed. So a small OSC pool won't
3102          * cause much performance degression in most of cases.
3103          */
3104         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3105
3106         atomic_set(&osc_pool_req_count, 0);
3107         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3108                                           ptlrpc_add_rqs_to_pool);
3109
3110         if (osc_rq_pool != NULL)
3111                 GOTO(out, rc);
3112         rc = -ENOMEM;
3113 out_type:
3114         class_unregister_type(LUSTRE_OSC_NAME);
3115 out_kmem:
3116         lu_kmem_fini(osc_caches);
3117 out:
3118         RETURN(rc);
3119 }
3120
3121 static void __exit osc_exit(void)
3122 {
3123         remove_shrinker(osc_cache_shrinker);
3124         class_unregister_type(LUSTRE_OSC_NAME);
3125         lu_kmem_fini(osc_caches);
3126         ptlrpc_free_rq_pool(osc_rq_pool);
3127 }
3128
3129 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3130 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3131 MODULE_VERSION(LUSTRE_VERSION_STRING);
3132 MODULE_LICENSE("GPL");
3133
3134 module_init(osc_init);
3135 module_exit(osc_exit);