Whamcloud - gitweb
6c3b354877c5ce25fbdacd6316944cb91518d607
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lustre/lustre_user.h>
38
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <uapi/linux/lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
48 #include <obd.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
51
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
54
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
58
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
62
63 struct osc_brw_async_args {
64         struct obdo              *aa_oa;
65         int                       aa_requested_nob;
66         int                       aa_nio_count;
67         u32                       aa_page_count;
68         int                       aa_resends;
69         struct brw_page **aa_ppga;
70         struct client_obd        *aa_cli;
71         struct list_head          aa_oaps;
72         struct list_head          aa_exts;
73 };
74
75 #define osc_grant_args osc_brw_async_args
76
77 struct osc_setattr_args {
78         struct obdo             *sa_oa;
79         obd_enqueue_update_f     sa_upcall;
80         void                    *sa_cookie;
81 };
82
83 struct osc_fsync_args {
84         struct osc_object       *fa_obj;
85         struct obdo             *fa_oa;
86         obd_enqueue_update_f    fa_upcall;
87         void                    *fa_cookie;
88 };
89
90 struct osc_ladvise_args {
91         struct obdo             *la_oa;
92         obd_enqueue_update_f     la_upcall;
93         void                    *la_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export       *oa_exp;
98         enum ldlm_type          oa_type;
99         enum ldlm_mode          oa_mode;
100         __u64                   *oa_flags;
101         osc_enqueue_upcall_f    oa_upcall;
102         void                    *oa_cookie;
103         struct ost_lvb          *oa_lvb;
104         struct lustre_handle    oa_lockh;
105         unsigned int            oa_agl:1;
106 };
107
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110                          void *data, int rc);
111
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
113 {
114         struct ost_body *body;
115
116         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117         LASSERT(body);
118
119         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 }
121
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123                        struct obdo *oa)
124 {
125         struct ptlrpc_request   *req;
126         struct ost_body         *body;
127         int                      rc;
128
129         ENTRY;
130         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
131         if (req == NULL)
132                 RETURN(-ENOMEM);
133
134         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
135         if (rc) {
136                 ptlrpc_request_free(req);
137                 RETURN(rc);
138         }
139
140         osc_pack_req_body(req, oa);
141
142         ptlrpc_request_set_replen(req);
143
144         rc = ptlrpc_queue_wait(req);
145         if (rc)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body == NULL)
150                 GOTO(out, rc = -EPROTO);
151
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
154
155         oa->o_blksize = cli_brw_size(exp->exp_obd);
156         oa->o_valid |= OBD_MD_FLBLKSZ;
157
158         EXIT;
159 out:
160         ptlrpc_req_finished(req);
161
162         return rc;
163 }
164
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166                        struct obdo *oa)
167 {
168         struct ptlrpc_request   *req;
169         struct ost_body         *body;
170         int                      rc;
171
172         ENTRY;
173         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
180         if (rc) {
181                 ptlrpc_request_free(req);
182                 RETURN(rc);
183         }
184
185         osc_pack_req_body(req, oa);
186
187         ptlrpc_request_set_replen(req);
188
189         rc = ptlrpc_queue_wait(req);
190         if (rc)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
198
199         EXIT;
200 out:
201         ptlrpc_req_finished(req);
202
203         RETURN(rc);
204 }
205
206 static int osc_setattr_interpret(const struct lu_env *env,
207                                  struct ptlrpc_request *req,
208                                  struct osc_setattr_args *sa, int rc)
209 {
210         struct ost_body *body;
211         ENTRY;
212
213         if (rc != 0)
214                 GOTO(out, rc);
215
216         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         if (body == NULL)
218                 GOTO(out, rc = -EPROTO);
219
220         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221                              &body->oa);
222 out:
223         rc = sa->sa_upcall(sa->sa_cookie, rc);
224         RETURN(rc);
225 }
226
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228                       obd_enqueue_update_f upcall, void *cookie,
229                       struct ptlrpc_request_set *rqset)
230 {
231         struct ptlrpc_request   *req;
232         struct osc_setattr_args *sa;
233         int                      rc;
234
235         ENTRY;
236
237         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
242         if (rc) {
243                 ptlrpc_request_free(req);
244                 RETURN(rc);
245         }
246
247         osc_pack_req_body(req, oa);
248
249         ptlrpc_request_set_replen(req);
250
251         /* do mds to ost setattr asynchronously */
252         if (!rqset) {
253                 /* Do not wait for response. */
254                 ptlrpcd_add_req(req);
255         } else {
256                 req->rq_interpret_reply =
257                         (ptlrpc_interpterer_t)osc_setattr_interpret;
258
259                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260                 sa = ptlrpc_req_async_args(req);
261                 sa->sa_oa = oa;
262                 sa->sa_upcall = upcall;
263                 sa->sa_cookie = cookie;
264
265                 if (rqset == PTLRPCD_SET)
266                         ptlrpcd_add_req(req);
267                 else
268                         ptlrpc_set_add_req(rqset, req);
269         }
270
271         RETURN(0);
272 }
273
274 static int osc_ladvise_interpret(const struct lu_env *env,
275                                  struct ptlrpc_request *req,
276                                  void *arg, int rc)
277 {
278         struct osc_ladvise_args *la = arg;
279         struct ost_body *body;
280         ENTRY;
281
282         if (rc != 0)
283                 GOTO(out, rc);
284
285         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286         if (body == NULL)
287                 GOTO(out, rc = -EPROTO);
288
289         *la->la_oa = body->oa;
290 out:
291         rc = la->la_upcall(la->la_cookie, rc);
292         RETURN(rc);
293 }
294
295 /**
296  * If rqset is NULL, do not wait for response. Upcall and cookie could also
297  * be NULL in this case
298  */
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300                      struct ladvise_hdr *ladvise_hdr,
301                      obd_enqueue_update_f upcall, void *cookie,
302                      struct ptlrpc_request_set *rqset)
303 {
304         struct ptlrpc_request   *req;
305         struct ost_body         *body;
306         struct osc_ladvise_args *la;
307         int                      rc;
308         struct lu_ladvise       *req_ladvise;
309         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
310         int                      num_advise = ladvise_hdr->lah_count;
311         struct ladvise_hdr      *req_ladvise_hdr;
312         ENTRY;
313
314         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
315         if (req == NULL)
316                 RETURN(-ENOMEM);
317
318         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319                              num_advise * sizeof(*ladvise));
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
321         if (rc != 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325         req->rq_request_portal = OST_IO_PORTAL;
326         ptlrpc_at_set_req_timeout(req);
327
328         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
329         LASSERT(body);
330         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331                              oa);
332
333         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334                                                  &RMF_OST_LADVISE_HDR);
335         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
336
337         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339         ptlrpc_request_set_replen(req);
340
341         if (rqset == NULL) {
342                 /* Do not wait for response. */
343                 ptlrpcd_add_req(req);
344                 RETURN(0);
345         }
346
347         req->rq_interpret_reply = osc_ladvise_interpret;
348         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349         la = ptlrpc_req_async_args(req);
350         la->la_oa = oa;
351         la->la_upcall = upcall;
352         la->la_cookie = cookie;
353
354         if (rqset == PTLRPCD_SET)
355                 ptlrpcd_add_req(req);
356         else
357                 ptlrpc_set_add_req(rqset, req);
358
359         RETURN(0);
360 }
361
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363                       struct obdo *oa)
364 {
365         struct ptlrpc_request *req;
366         struct ost_body       *body;
367         int                    rc;
368         ENTRY;
369
370         LASSERT(oa != NULL);
371         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
375         if (req == NULL)
376                 GOTO(out, rc = -ENOMEM);
377
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 GOTO(out, rc);
382         }
383
384         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385         LASSERT(body);
386
387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
388
389         ptlrpc_request_set_replen(req);
390
391         rc = ptlrpc_queue_wait(req);
392         if (rc)
393                 GOTO(out_req, rc);
394
395         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
396         if (body == NULL)
397                 GOTO(out_req, rc = -EPROTO);
398
399         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
401
402         oa->o_blksize = cli_brw_size(exp->exp_obd);
403         oa->o_valid |= OBD_MD_FLBLKSZ;
404
405         CDEBUG(D_HA, "transno: %lld\n",
406                lustre_msg_get_transno(req->rq_repmsg));
407 out_req:
408         ptlrpc_req_finished(req);
409 out:
410         RETURN(rc);
411 }
412
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414                    obd_enqueue_update_f upcall, void *cookie,
415                    struct ptlrpc_request_set *rqset)
416 {
417         struct ptlrpc_request   *req;
418         struct osc_setattr_args *sa;
419         struct ost_body         *body;
420         int                      rc;
421         ENTRY;
422
423         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
424         if (req == NULL)
425                 RETURN(-ENOMEM);
426
427         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(rc);
431         }
432         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433         ptlrpc_at_set_req_timeout(req);
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
438
439         ptlrpc_request_set_replen(req);
440
441         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443         sa = ptlrpc_req_async_args(req);
444         sa->sa_oa = oa;
445         sa->sa_upcall = upcall;
446         sa->sa_cookie = cookie;
447         if (rqset == PTLRPCD_SET)
448                 ptlrpcd_add_req(req);
449         else
450                 ptlrpc_set_add_req(rqset, req);
451
452         RETURN(0);
453 }
454
455 static int osc_sync_interpret(const struct lu_env *env,
456                               struct ptlrpc_request *req,
457                               void *arg, int rc)
458 {
459         struct osc_fsync_args   *fa = arg;
460         struct ost_body         *body;
461         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
462         unsigned long           valid = 0;
463         struct cl_object        *obj;
464         ENTRY;
465
466         if (rc != 0)
467                 GOTO(out, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL) {
471                 CERROR("can't unpack ost_body\n");
472                 GOTO(out, rc = -EPROTO);
473         }
474
475         *fa->fa_oa = body->oa;
476         obj = osc2cl(fa->fa_obj);
477
478         /* Update osc object's blocks attribute */
479         cl_object_attr_lock(obj);
480         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481                 attr->cat_blocks = body->oa.o_blocks;
482                 valid |= CAT_BLOCKS;
483         }
484
485         if (valid != 0)
486                 cl_object_attr_update(env, obj, attr, valid);
487         cl_object_attr_unlock(obj);
488
489 out:
490         rc = fa->fa_upcall(fa->fa_cookie, rc);
491         RETURN(rc);
492 }
493
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495                   obd_enqueue_update_f upcall, void *cookie,
496                   struct ptlrpc_request_set *rqset)
497 {
498         struct obd_export     *exp = osc_export(obj);
499         struct ptlrpc_request *req;
500         struct ost_body       *body;
501         struct osc_fsync_args *fa;
502         int                    rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
506         if (req == NULL)
507                 RETURN(-ENOMEM);
508
509         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(rc);
513         }
514
515         /* overload the size and blocks fields in the oa with start/end */
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521         req->rq_interpret_reply = osc_sync_interpret;
522
523         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524         fa = ptlrpc_req_async_args(req);
525         fa->fa_obj = obj;
526         fa->fa_oa = oa;
527         fa->fa_upcall = upcall;
528         fa->fa_cookie = cookie;
529
530         if (rqset == PTLRPCD_SET)
531                 ptlrpcd_add_req(req);
532         else
533                 ptlrpc_set_add_req(rqset, req);
534
535         RETURN (0);
536 }
537
538 /* Find and cancel locally locks matched by @mode in the resource found by
539  * @objid. Found locks are added into @cancel list. Returns the amount of
540  * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542                                    struct list_head *cancels,
543                                    enum ldlm_mode mode, __u64 lock_flags)
544 {
545         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546         struct ldlm_res_id res_id;
547         struct ldlm_resource *res;
548         int count;
549         ENTRY;
550
551         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552          * export) but disabled through procfs (flag in NS).
553          *
554          * This distinguishes from a case when ELC is not supported originally,
555          * when we still want to cancel locks in advance and just cancel them
556          * locally, without sending any RPC. */
557         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558                 RETURN(0);
559
560         ostid_build_res_name(&oa->o_oi, &res_id);
561         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
562         if (IS_ERR(res))
563                 RETURN(0);
564
565         LDLM_RESOURCE_ADDREF(res);
566         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567                                            lock_flags, 0, NULL);
568         LDLM_RESOURCE_DELREF(res);
569         ldlm_resource_putref(res);
570         RETURN(count);
571 }
572
573 static int osc_destroy_interpret(const struct lu_env *env,
574                                  struct ptlrpc_request *req, void *data,
575                                  int rc)
576 {
577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
578
579         atomic_dec(&cli->cl_destroy_in_flight);
580         wake_up(&cli->cl_destroy_waitq);
581         return 0;
582 }
583
584 static int osc_can_send_destroy(struct client_obd *cli)
585 {
586         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587             cli->cl_max_rpcs_in_flight) {
588                 /* The destroy request can be sent */
589                 return 1;
590         }
591         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592             cli->cl_max_rpcs_in_flight) {
593                 /*
594                  * The counter has been modified between the two atomic
595                  * operations.
596                  */
597                 wake_up(&cli->cl_destroy_waitq);
598         }
599         return 0;
600 }
601
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603                        struct obdo *oa)
604 {
605         struct client_obd     *cli = &exp->exp_obd->u.cli;
606         struct ptlrpc_request *req;
607         struct ost_body       *body;
608         struct list_head       cancels = LIST_HEAD_INIT(cancels);
609         int rc, count;
610         ENTRY;
611
612         if (!oa) {
613                 CDEBUG(D_INFO, "oa NULL\n");
614                 RETURN(-EINVAL);
615         }
616
617         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618                                         LDLM_FL_DISCARD_DATA);
619
620         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
621         if (req == NULL) {
622                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
623                 RETURN(-ENOMEM);
624         }
625
626         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627                                0, &cancels, count);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 RETURN(rc);
631         }
632
633         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634         ptlrpc_at_set_req_timeout(req);
635
636         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
637         LASSERT(body);
638         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
639
640         ptlrpc_request_set_replen(req);
641
642         req->rq_interpret_reply = osc_destroy_interpret;
643         if (!osc_can_send_destroy(cli)) {
644                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645
646                 /*
647                  * Wait until the number of on-going destroy RPCs drops
648                  * under max_rpc_in_flight
649                  */
650                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
651                                             osc_can_send_destroy(cli), &lwi);
652                 if (rc) {
653                         ptlrpc_req_finished(req);
654                         RETURN(rc);
655                 }
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_SHIFT;
927         }
928
929         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
930                 u64 size;
931                 int chunk_mask;
932
933                 /* overhead for each extent insertion */
934                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
935                 /* determine the appropriate chunk size used by osc_extent. */
936                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
937                                           ocd->ocd_grant_blkbits);
938                 /* max_pages_per_rpc must be chunk aligned */
939                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
940                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
941                                              ~chunk_mask) & chunk_mask;
942                 /* determine maximum extent size, in #pages */
943                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
944                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
945                 if (cli->cl_max_extent_pages == 0)
946                         cli->cl_max_extent_pages = 1;
947         } else {
948                 cli->cl_grant_extent_tax = 0;
949                 cli->cl_chunkbits = PAGE_SHIFT;
950                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
951         }
952         spin_unlock(&cli->cl_loi_list_lock);
953
954         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
955                 "chunk bits: %d cl_max_extent_pages: %d\n",
956                 cli_name(cli),
957                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
958                 cli->cl_max_extent_pages);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, size_t page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            size_t page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1061                              struct brw_page **pga, int opc,
1062                              cksum_type_t cksum_type)
1063 {
1064         u32                             cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1069
1070         LASSERT(pg_count > 0);
1071
1072         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1073         if (IS_ERR(hdesc)) {
1074                 CERROR("Unable to initialize checksum hash %s\n",
1075                        cfs_crypto_hash_name(cfs_alg));
1076                 return PTR_ERR(hdesc);
1077         }
1078
1079         while (nob > 0 && pg_count > 0) {
1080                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1081
1082                 /* corrupt the data before we compute the checksum, to
1083                  * simulate an OST->client data error */
1084                 if (i == 0 && opc == OST_READ &&
1085                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1086                         unsigned char *ptr = kmap(pga[i]->pg);
1087                         int off = pga[i]->off & ~PAGE_MASK;
1088
1089                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1090                         kunmap(pga[i]->pg);
1091                 }
1092                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1093                                             pga[i]->off & ~PAGE_MASK,
1094                                             count);
1095                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1096                                (int)(pga[i]->off & ~PAGE_MASK));
1097
1098                 nob -= pga[i]->count;
1099                 pg_count--;
1100                 i++;
1101         }
1102
1103         bufsize = sizeof(cksum);
1104         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1105
1106         /* For sending we only compute the wrong checksum instead
1107          * of corrupting the data so it is still correct on a redo */
1108         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1109                 cksum++;
1110
1111         return cksum;
1112 }
1113
1114 static int
1115 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1116                      u32 page_count, struct brw_page **pga,
1117                      struct ptlrpc_request **reqp, int resend)
1118 {
1119         struct ptlrpc_request   *req;
1120         struct ptlrpc_bulk_desc *desc;
1121         struct ost_body         *body;
1122         struct obd_ioobj        *ioobj;
1123         struct niobuf_remote    *niobuf;
1124         int niocount, i, requested_nob, opc, rc;
1125         struct osc_brw_async_args *aa;
1126         struct req_capsule      *pill;
1127         struct brw_page *pg_prev;
1128
1129         ENTRY;
1130         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1131                 RETURN(-ENOMEM); /* Recoverable */
1132         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1133                 RETURN(-EINVAL); /* Fatal */
1134
1135         if ((cmd & OBD_BRW_WRITE) != 0) {
1136                 opc = OST_WRITE;
1137                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1138                                                 osc_rq_pool,
1139                                                 &RQF_OST_BRW_WRITE);
1140         } else {
1141                 opc = OST_READ;
1142                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1143         }
1144         if (req == NULL)
1145                 RETURN(-ENOMEM);
1146
1147         for (niocount = i = 1; i < page_count; i++) {
1148                 if (!can_merge_pages(pga[i - 1], pga[i]))
1149                         niocount++;
1150         }
1151
1152         pill = &req->rq_pill;
1153         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1154                              sizeof(*ioobj));
1155         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1156                              niocount * sizeof(*niobuf));
1157
1158         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1159         if (rc) {
1160                 ptlrpc_request_free(req);
1161                 RETURN(rc);
1162         }
1163         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1164         ptlrpc_at_set_req_timeout(req);
1165         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1166          * retry logic */
1167         req->rq_no_retry_einprogress = 1;
1168
1169         desc = ptlrpc_prep_bulk_imp(req, page_count,
1170                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1171                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1172                         PTLRPC_BULK_PUT_SINK) |
1173                         PTLRPC_BULK_BUF_KIOV,
1174                 OST_BULK_PORTAL,
1175                 &ptlrpc_bulk_kiov_pin_ops);
1176
1177         if (desc == NULL)
1178                 GOTO(out, rc = -ENOMEM);
1179         /* NB request now owns desc and will free it when it gets freed */
1180
1181         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1182         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1183         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1184         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1185
1186         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1187
1188         obdo_to_ioobj(oa, ioobj);
1189         ioobj->ioo_bufcnt = niocount;
1190         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1191          * that might be send for this request.  The actual number is decided
1192          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1193          * "max - 1" for old client compatibility sending "0", and also so the
1194          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1195         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1196         LASSERT(page_count > 0);
1197         pg_prev = pga[0];
1198         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1199                 struct brw_page *pg = pga[i];
1200                 int poff = pg->off & ~PAGE_MASK;
1201
1202                 LASSERT(pg->count > 0);
1203                 /* make sure there is no gap in the middle of page array */
1204                 LASSERTF(page_count == 1 ||
1205                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1206                           ergo(i > 0 && i < page_count - 1,
1207                                poff == 0 && pg->count == PAGE_SIZE)   &&
1208                           ergo(i == page_count - 1, poff == 0)),
1209                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1210                          i, page_count, pg, pg->off, pg->count);
1211                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1212                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1213                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1214                          i, page_count,
1215                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1216                          pg_prev->pg, page_private(pg_prev->pg),
1217                          pg_prev->pg->index, pg_prev->off);
1218                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1219                         (pg->flag & OBD_BRW_SRVLOCK));
1220
1221                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1222                 requested_nob += pg->count;
1223
1224                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1225                         niobuf--;
1226                         niobuf->rnb_len += pg->count;
1227                 } else {
1228                         niobuf->rnb_offset = pg->off;
1229                         niobuf->rnb_len    = pg->count;
1230                         niobuf->rnb_flags  = pg->flag;
1231                 }
1232                 pg_prev = pg;
1233         }
1234
1235         LASSERTF((void *)(niobuf - niocount) ==
1236                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1237                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1238                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1239
1240         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1241         if (resend) {
1242                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1243                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1244                         body->oa.o_flags = 0;
1245                 }
1246                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1247         }
1248
1249         if (osc_should_shrink_grant(cli))
1250                 osc_shrink_grant_local(cli, &body->oa);
1251
1252         /* size[REQ_REC_OFF] still sizeof (*body) */
1253         if (opc == OST_WRITE) {
1254                 if (cli->cl_checksum &&
1255                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1256                         /* store cl_cksum_type in a local variable since
1257                          * it can be changed via lprocfs */
1258                         cksum_type_t cksum_type = cli->cl_cksum_type;
1259
1260                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1261                                 body->oa.o_flags = 0;
1262
1263                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1264                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1265                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1266                                                              page_count, pga,
1267                                                              OST_WRITE,
1268                                                              cksum_type);
1269                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1270                                body->oa.o_cksum);
1271                         /* save this in 'oa', too, for later checking */
1272                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1273                         oa->o_flags |= cksum_type_pack(cksum_type);
1274                 } else {
1275                         /* clear out the checksum flag, in case this is a
1276                          * resend but cl_checksum is no longer set. b=11238 */
1277                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1278                 }
1279                 oa->o_cksum = body->oa.o_cksum;
1280                 /* 1 RC per niobuf */
1281                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1282                                      sizeof(__u32) * niocount);
1283         } else {
1284                 if (cli->cl_checksum &&
1285                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1286                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1287                                 body->oa.o_flags = 0;
1288                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1289                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1290                 }
1291
1292                 /* Client cksum has been already copied to wire obdo in previous
1293                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1294                  * resent due to cksum error, this will allow Server to
1295                  * check+dump pages on its side */
1296         }
1297         ptlrpc_request_set_replen(req);
1298
1299         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1300         aa = ptlrpc_req_async_args(req);
1301         aa->aa_oa = oa;
1302         aa->aa_requested_nob = requested_nob;
1303         aa->aa_nio_count = niocount;
1304         aa->aa_page_count = page_count;
1305         aa->aa_resends = 0;
1306         aa->aa_ppga = pga;
1307         aa->aa_cli = cli;
1308         INIT_LIST_HEAD(&aa->aa_oaps);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 char dbgcksum_file_name[PATH_MAX];
1323
1324 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1325                                 struct brw_page **pga, __u32 server_cksum,
1326                                 __u32 client_cksum)
1327 {
1328         struct file *filp;
1329         int rc, i;
1330         unsigned int len;
1331         char *buf;
1332         mm_segment_t oldfs;
1333
1334         /* will only keep dump of pages on first error for the same range in
1335          * file/fid, not during the resends/retries. */
1336         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1337                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1338                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1339                   libcfs_debug_file_path_arr :
1340                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1341                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1342                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1343                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1344                  pga[0]->off,
1345                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1346                  client_cksum, server_cksum);
1347         filp = filp_open(dbgcksum_file_name,
1348                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1349         if (IS_ERR(filp)) {
1350                 rc = PTR_ERR(filp);
1351                 if (rc == -EEXIST)
1352                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1353                                "checksum error: rc = %d\n", dbgcksum_file_name,
1354                                rc);
1355                 else
1356                         CERROR("%s: can't open to dump pages with checksum "
1357                                "error: rc = %d\n", dbgcksum_file_name, rc);
1358                 return;
1359         }
1360
1361         oldfs = get_fs();
1362         set_fs(KERNEL_DS);
1363         for (i = 0; i < page_count; i++) {
1364                 len = pga[i]->count;
1365                 buf = kmap(pga[i]->pg);
1366                 while (len != 0) {
1367                         rc = vfs_write(filp, (__force const char __user *)buf,
1368                                        len, &filp->f_pos);
1369                         if (rc < 0) {
1370                                 CERROR("%s: wanted to write %u but got %d "
1371                                        "error\n", dbgcksum_file_name, len, rc);
1372                                 break;
1373                         }
1374                         len -= rc;
1375                         buf += rc;
1376                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1377                                dbgcksum_file_name, rc);
1378                 }
1379                 kunmap(pga[i]->pg);
1380         }
1381         set_fs(oldfs);
1382
1383         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1384         if (rc)
1385                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1386         filp_close(filp, NULL);
1387         return;
1388 }
1389
1390 static int
1391 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1392                                 __u32 client_cksum, __u32 server_cksum,
1393                                 struct osc_brw_async_args *aa)
1394 {
1395         __u32 new_cksum;
1396         char *msg;
1397         cksum_type_t cksum_type;
1398
1399         if (server_cksum == client_cksum) {
1400                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1401                 return 0;
1402         }
1403
1404         if (aa->aa_cli->cl_checksum_dump)
1405                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1406                                     server_cksum, client_cksum);
1407
1408         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1409                                        oa->o_flags : 0);
1410         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1411                                       aa->aa_ppga, OST_WRITE, cksum_type);
1412
1413         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1414                 msg = "the server did not use the checksum type specified in "
1415                       "the original request - likely a protocol problem";
1416         else if (new_cksum == server_cksum)
1417                 msg = "changed on the client after we checksummed it - "
1418                       "likely false positive due to mmap IO (bug 11742)";
1419         else if (new_cksum == client_cksum)
1420                 msg = "changed in transit before arrival at OST";
1421         else
1422                 msg = "changed in transit AND doesn't match the original - "
1423                       "likely false positive due to mmap IO (bug 11742)";
1424
1425         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1426                            DFID " object "DOSTID" extent [%llu-%llu], original "
1427                            "client csum %x (type %x), server csum %x (type %x),"
1428                            " client csum now %x\n",
1429                            aa->aa_cli->cl_import->imp_obd->obd_name,
1430                            msg, libcfs_nid2str(peer->nid),
1431                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1432                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1433                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1434                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1435                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1436                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1437                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1438                            server_cksum, cksum_type, new_cksum);
1439         return 1;
1440 }
1441
1442 /* Note rc enters this function as number of bytes transferred */
1443 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1444 {
1445         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1446         const struct lnet_process_id *peer =
1447                         &req->rq_import->imp_connection->c_peer;
1448         struct client_obd *cli = aa->aa_cli;
1449         struct ost_body *body;
1450         u32 client_cksum = 0;
1451         ENTRY;
1452
1453         if (rc < 0 && rc != -EDQUOT) {
1454                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1455                 RETURN(rc);
1456         }
1457
1458         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1460         if (body == NULL) {
1461                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1462                 RETURN(-EPROTO);
1463         }
1464
1465         /* set/clear over quota flag for a uid/gid/projid */
1466         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1467             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1468                 unsigned qid[LL_MAXQUOTAS] = {
1469                                          body->oa.o_uid, body->oa.o_gid,
1470                                          body->oa.o_projid };
1471                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1472                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1473                        body->oa.o_valid, body->oa.o_flags);
1474                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1475                                        body->oa.o_flags);
1476         }
1477
1478         osc_update_grant(cli, body);
1479
1480         if (rc < 0)
1481                 RETURN(rc);
1482
1483         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1484                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1485
1486         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1487                 if (rc > 0) {
1488                         CERROR("Unexpected +ve rc %d\n", rc);
1489                         RETURN(-EPROTO);
1490                 }
1491                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1492
1493                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1494                         RETURN(-EAGAIN);
1495
1496                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1497                     check_write_checksum(&body->oa, peer, client_cksum,
1498                                          body->oa.o_cksum, aa))
1499                         RETURN(-EAGAIN);
1500
1501                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1502                                      aa->aa_page_count, aa->aa_ppga);
1503                 GOTO(out, rc);
1504         }
1505
1506         /* The rest of this function executes only for OST_READs */
1507
1508         /* if unwrap_bulk failed, return -EAGAIN to retry */
1509         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1510         if (rc < 0)
1511                 GOTO(out, rc = -EAGAIN);
1512
1513         if (rc > aa->aa_requested_nob) {
1514                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1515                        aa->aa_requested_nob);
1516                 RETURN(-EPROTO);
1517         }
1518
1519         if (rc != req->rq_bulk->bd_nob_transferred) {
1520                 CERROR ("Unexpected rc %d (%d transferred)\n",
1521                         rc, req->rq_bulk->bd_nob_transferred);
1522                 return (-EPROTO);
1523         }
1524
1525         if (rc < aa->aa_requested_nob)
1526                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1527
1528         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1529                 static int cksum_counter;
1530                 u32        server_cksum = body->oa.o_cksum;
1531                 char      *via = "";
1532                 char      *router = "";
1533                 cksum_type_t cksum_type;
1534
1535                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1536                                                body->oa.o_flags : 0);
1537                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1538                                                  aa->aa_ppga, OST_READ,
1539                                                  cksum_type);
1540
1541                 if (peer->nid != req->rq_bulk->bd_sender) {
1542                         via = " via ";
1543                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1544                 }
1545
1546                 if (server_cksum != client_cksum) {
1547                         struct ost_body *clbody;
1548                         u32 page_count = aa->aa_page_count;
1549
1550                         clbody = req_capsule_client_get(&req->rq_pill,
1551                                                         &RMF_OST_BODY);
1552                         if (cli->cl_checksum_dump)
1553                                 dump_all_bulk_pages(&clbody->oa, page_count,
1554                                                     aa->aa_ppga, server_cksum,
1555                                                     client_cksum);
1556
1557                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1558                                            "%s%s%s inode "DFID" object "DOSTID
1559                                            " extent [%llu-%llu], client %x, "
1560                                            "server %x, cksum_type %x\n",
1561                                            req->rq_import->imp_obd->obd_name,
1562                                            libcfs_nid2str(peer->nid),
1563                                            via, router,
1564                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1565                                                 clbody->oa.o_parent_seq : 0ULL,
1566                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1567                                                 clbody->oa.o_parent_oid : 0,
1568                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1569                                                 clbody->oa.o_parent_ver : 0,
1570                                            POSTID(&body->oa.o_oi),
1571                                            aa->aa_ppga[0]->off,
1572                                            aa->aa_ppga[page_count-1]->off +
1573                                            aa->aa_ppga[page_count-1]->count - 1,
1574                                            client_cksum, server_cksum,
1575                                            cksum_type);
1576                         cksum_counter = 0;
1577                         aa->aa_oa->o_cksum = client_cksum;
1578                         rc = -EAGAIN;
1579                 } else {
1580                         cksum_counter++;
1581                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1582                         rc = 0;
1583                 }
1584         } else if (unlikely(client_cksum)) {
1585                 static int cksum_missed;
1586
1587                 cksum_missed++;
1588                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1589                         CERROR("Checksum %u requested from %s but not sent\n",
1590                                cksum_missed, libcfs_nid2str(peer->nid));
1591         } else {
1592                 rc = 0;
1593         }
1594 out:
1595         if (rc >= 0)
1596                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1597                                      aa->aa_oa, &body->oa);
1598
1599         RETURN(rc);
1600 }
1601
1602 static int osc_brw_redo_request(struct ptlrpc_request *request,
1603                                 struct osc_brw_async_args *aa, int rc)
1604 {
1605         struct ptlrpc_request *new_req;
1606         struct osc_brw_async_args *new_aa;
1607         struct osc_async_page *oap;
1608         ENTRY;
1609
1610         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1611                   "redo for recoverable error %d", rc);
1612
1613         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1614                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1615                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1616                                   aa->aa_ppga, &new_req, 1);
1617         if (rc)
1618                 RETURN(rc);
1619
1620         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1621                 if (oap->oap_request != NULL) {
1622                         LASSERTF(request == oap->oap_request,
1623                                  "request %p != oap_request %p\n",
1624                                  request, oap->oap_request);
1625                         if (oap->oap_interrupted) {
1626                                 ptlrpc_req_finished(new_req);
1627                                 RETURN(-EINTR);
1628                         }
1629                 }
1630         }
1631         /* New request takes over pga and oaps from old request.
1632          * Note that copying a list_head doesn't work, need to move it... */
1633         aa->aa_resends++;
1634         new_req->rq_interpret_reply = request->rq_interpret_reply;
1635         new_req->rq_async_args = request->rq_async_args;
1636         new_req->rq_commit_cb = request->rq_commit_cb;
1637         /* cap resend delay to the current request timeout, this is similar to
1638          * what ptlrpc does (see after_reply()) */
1639         if (aa->aa_resends > new_req->rq_timeout)
1640                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1641         else
1642                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1643         new_req->rq_generation_set = 1;
1644         new_req->rq_import_generation = request->rq_import_generation;
1645
1646         new_aa = ptlrpc_req_async_args(new_req);
1647
1648         INIT_LIST_HEAD(&new_aa->aa_oaps);
1649         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1650         INIT_LIST_HEAD(&new_aa->aa_exts);
1651         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1652         new_aa->aa_resends = aa->aa_resends;
1653
1654         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1655                 if (oap->oap_request) {
1656                         ptlrpc_req_finished(oap->oap_request);
1657                         oap->oap_request = ptlrpc_request_addref(new_req);
1658                 }
1659         }
1660
1661         /* XXX: This code will run into problem if we're going to support
1662          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1663          * and wait for all of them to be finished. We should inherit request
1664          * set from old request. */
1665         ptlrpcd_add_req(new_req);
1666
1667         DEBUG_REQ(D_INFO, new_req, "new request");
1668         RETURN(0);
1669 }
1670
1671 /*
1672  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1673  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1674  * fine for our small page arrays and doesn't require allocation.  its an
1675  * insertion sort that swaps elements that are strides apart, shrinking the
1676  * stride down until its '1' and the array is sorted.
1677  */
1678 static void sort_brw_pages(struct brw_page **array, int num)
1679 {
1680         int stride, i, j;
1681         struct brw_page *tmp;
1682
1683         if (num == 1)
1684                 return;
1685         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1686                 ;
1687
1688         do {
1689                 stride /= 3;
1690                 for (i = stride ; i < num ; i++) {
1691                         tmp = array[i];
1692                         j = i;
1693                         while (j >= stride && array[j - stride]->off > tmp->off) {
1694                                 array[j] = array[j - stride];
1695                                 j -= stride;
1696                         }
1697                         array[j] = tmp;
1698                 }
1699         } while (stride > 1);
1700 }
1701
1702 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1703 {
1704         LASSERT(ppga != NULL);
1705         OBD_FREE(ppga, sizeof(*ppga) * count);
1706 }
1707
1708 static int brw_interpret(const struct lu_env *env,
1709                          struct ptlrpc_request *req, void *data, int rc)
1710 {
1711         struct osc_brw_async_args *aa = data;
1712         struct osc_extent *ext;
1713         struct osc_extent *tmp;
1714         struct client_obd *cli = aa->aa_cli;
1715         ENTRY;
1716
1717         rc = osc_brw_fini_request(req, rc);
1718         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1719         /* When server return -EINPROGRESS, client should always retry
1720          * regardless of the number of times the bulk was resent already. */
1721         if (osc_recoverable_error(rc)) {
1722                 if (req->rq_import_generation !=
1723                     req->rq_import->imp_generation) {
1724                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1725                                ""DOSTID", rc = %d.\n",
1726                                req->rq_import->imp_obd->obd_name,
1727                                POSTID(&aa->aa_oa->o_oi), rc);
1728                 } else if (rc == -EINPROGRESS ||
1729                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1730                         rc = osc_brw_redo_request(req, aa, rc);
1731                 } else {
1732                         CERROR("%s: too many resent retries for object: "
1733                                "%llu:%llu, rc = %d.\n",
1734                                req->rq_import->imp_obd->obd_name,
1735                                POSTID(&aa->aa_oa->o_oi), rc);
1736                 }
1737
1738                 if (rc == 0)
1739                         RETURN(0);
1740                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1741                         rc = -EIO;
1742         }
1743
1744         if (rc == 0) {
1745                 struct obdo *oa = aa->aa_oa;
1746                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1747                 unsigned long valid = 0;
1748                 struct cl_object *obj;
1749                 struct osc_async_page *last;
1750
1751                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1752                 obj = osc2cl(last->oap_obj);
1753
1754                 cl_object_attr_lock(obj);
1755                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1756                         attr->cat_blocks = oa->o_blocks;
1757                         valid |= CAT_BLOCKS;
1758                 }
1759                 if (oa->o_valid & OBD_MD_FLMTIME) {
1760                         attr->cat_mtime = oa->o_mtime;
1761                         valid |= CAT_MTIME;
1762                 }
1763                 if (oa->o_valid & OBD_MD_FLATIME) {
1764                         attr->cat_atime = oa->o_atime;
1765                         valid |= CAT_ATIME;
1766                 }
1767                 if (oa->o_valid & OBD_MD_FLCTIME) {
1768                         attr->cat_ctime = oa->o_ctime;
1769                         valid |= CAT_CTIME;
1770                 }
1771
1772                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1773                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1774                         loff_t last_off = last->oap_count + last->oap_obj_off +
1775                                 last->oap_page_off;
1776
1777                         /* Change file size if this is an out of quota or
1778                          * direct IO write and it extends the file size */
1779                         if (loi->loi_lvb.lvb_size < last_off) {
1780                                 attr->cat_size = last_off;
1781                                 valid |= CAT_SIZE;
1782                         }
1783                         /* Extend KMS if it's not a lockless write */
1784                         if (loi->loi_kms < last_off &&
1785                             oap2osc_page(last)->ops_srvlock == 0) {
1786                                 attr->cat_kms = last_off;
1787                                 valid |= CAT_KMS;
1788                         }
1789                 }
1790
1791                 if (valid != 0)
1792                         cl_object_attr_update(env, obj, attr, valid);
1793                 cl_object_attr_unlock(obj);
1794         }
1795         OBDO_FREE(aa->aa_oa);
1796
1797         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1798                 osc_inc_unstable_pages(req);
1799
1800         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1801                 list_del_init(&ext->oe_link);
1802                 osc_extent_finish(env, ext, 1, rc);
1803         }
1804         LASSERT(list_empty(&aa->aa_exts));
1805         LASSERT(list_empty(&aa->aa_oaps));
1806
1807         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1808         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1809
1810         spin_lock(&cli->cl_loi_list_lock);
1811         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1812          * is called so we know whether to go to sync BRWs or wait for more
1813          * RPCs to complete */
1814         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1815                 cli->cl_w_in_flight--;
1816         else
1817                 cli->cl_r_in_flight--;
1818         osc_wake_cache_waiters(cli);
1819         spin_unlock(&cli->cl_loi_list_lock);
1820
1821         osc_io_unplug(env, cli, NULL);
1822         RETURN(rc);
1823 }
1824
1825 static void brw_commit(struct ptlrpc_request *req)
1826 {
1827         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1828          * this called via the rq_commit_cb, I need to ensure
1829          * osc_dec_unstable_pages is still called. Otherwise unstable
1830          * pages may be leaked. */
1831         spin_lock(&req->rq_lock);
1832         if (likely(req->rq_unstable)) {
1833                 req->rq_unstable = 0;
1834                 spin_unlock(&req->rq_lock);
1835
1836                 osc_dec_unstable_pages(req);
1837         } else {
1838                 req->rq_committed = 1;
1839                 spin_unlock(&req->rq_lock);
1840         }
1841 }
1842
1843 /**
1844  * Build an RPC by the list of extent @ext_list. The caller must ensure
1845  * that the total pages in this list are NOT over max pages per RPC.
1846  * Extents in the list must be in OES_RPC state.
1847  */
1848 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1849                   struct list_head *ext_list, int cmd)
1850 {
1851         struct ptlrpc_request           *req = NULL;
1852         struct osc_extent               *ext;
1853         struct brw_page                 **pga = NULL;
1854         struct osc_brw_async_args       *aa = NULL;
1855         struct obdo                     *oa = NULL;
1856         struct osc_async_page           *oap;
1857         struct osc_object               *obj = NULL;
1858         struct cl_req_attr              *crattr = NULL;
1859         loff_t                          starting_offset = OBD_OBJECT_EOF;
1860         loff_t                          ending_offset = 0;
1861         int                             mpflag = 0;
1862         int                             mem_tight = 0;
1863         int                             page_count = 0;
1864         bool                            soft_sync = false;
1865         bool                            interrupted = false;
1866         int                             i;
1867         int                             grant = 0;
1868         int                             rc;
1869         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1870         struct ost_body                 *body;
1871         ENTRY;
1872         LASSERT(!list_empty(ext_list));
1873
1874         /* add pages into rpc_list to build BRW rpc */
1875         list_for_each_entry(ext, ext_list, oe_link) {
1876                 LASSERT(ext->oe_state == OES_RPC);
1877                 mem_tight |= ext->oe_memalloc;
1878                 grant += ext->oe_grants;
1879                 page_count += ext->oe_nr_pages;
1880                 if (obj == NULL)
1881                         obj = ext->oe_obj;
1882         }
1883
1884         soft_sync = osc_over_unstable_soft_limit(cli);
1885         if (mem_tight)
1886                 mpflag = cfs_memory_pressure_get_and_set();
1887
1888         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1889         if (pga == NULL)
1890                 GOTO(out, rc = -ENOMEM);
1891
1892         OBDO_ALLOC(oa);
1893         if (oa == NULL)
1894                 GOTO(out, rc = -ENOMEM);
1895
1896         i = 0;
1897         list_for_each_entry(ext, ext_list, oe_link) {
1898                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1899                         if (mem_tight)
1900                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1901                         if (soft_sync)
1902                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1903                         pga[i] = &oap->oap_brw_page;
1904                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1905                         i++;
1906
1907                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1908                         if (starting_offset == OBD_OBJECT_EOF ||
1909                             starting_offset > oap->oap_obj_off)
1910                                 starting_offset = oap->oap_obj_off;
1911                         else
1912                                 LASSERT(oap->oap_page_off == 0);
1913                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1914                                 ending_offset = oap->oap_obj_off +
1915                                                 oap->oap_count;
1916                         else
1917                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1918                                         PAGE_SIZE);
1919                         if (oap->oap_interrupted)
1920                                 interrupted = true;
1921                 }
1922         }
1923
1924         /* first page in the list */
1925         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1926
1927         crattr = &osc_env_info(env)->oti_req_attr;
1928         memset(crattr, 0, sizeof(*crattr));
1929         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1930         crattr->cra_flags = ~0ULL;
1931         crattr->cra_page = oap2cl_page(oap);
1932         crattr->cra_oa = oa;
1933         cl_req_attr_set(env, osc2cl(obj), crattr);
1934
1935         if (cmd == OBD_BRW_WRITE)
1936                 oa->o_grant_used = grant;
1937
1938         sort_brw_pages(pga, page_count);
1939         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1940         if (rc != 0) {
1941                 CERROR("prep_req failed: %d\n", rc);
1942                 GOTO(out, rc);
1943         }
1944
1945         req->rq_commit_cb = brw_commit;
1946         req->rq_interpret_reply = brw_interpret;
1947         req->rq_memalloc = mem_tight != 0;
1948         oap->oap_request = ptlrpc_request_addref(req);
1949         if (interrupted && !req->rq_intr)
1950                 ptlrpc_mark_interrupted(req);
1951
1952         /* Need to update the timestamps after the request is built in case
1953          * we race with setattr (locally or in queue at OST).  If OST gets
1954          * later setattr before earlier BRW (as determined by the request xid),
1955          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1956          * way to do this in a single call.  bug 10150 */
1957         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1958         crattr->cra_oa = &body->oa;
1959         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1960         cl_req_attr_set(env, osc2cl(obj), crattr);
1961         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1962
1963         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1964         aa = ptlrpc_req_async_args(req);
1965         INIT_LIST_HEAD(&aa->aa_oaps);
1966         list_splice_init(&rpc_list, &aa->aa_oaps);
1967         INIT_LIST_HEAD(&aa->aa_exts);
1968         list_splice_init(ext_list, &aa->aa_exts);
1969
1970         spin_lock(&cli->cl_loi_list_lock);
1971         starting_offset >>= PAGE_SHIFT;
1972         if (cmd == OBD_BRW_READ) {
1973                 cli->cl_r_in_flight++;
1974                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1975                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1976                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1977                                       starting_offset + 1);
1978         } else {
1979                 cli->cl_w_in_flight++;
1980                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1981                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1982                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1983                                       starting_offset + 1);
1984         }
1985         spin_unlock(&cli->cl_loi_list_lock);
1986
1987         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1988                   page_count, aa, cli->cl_r_in_flight,
1989                   cli->cl_w_in_flight);
1990         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1991
1992         ptlrpcd_add_req(req);
1993         rc = 0;
1994         EXIT;
1995
1996 out:
1997         if (mem_tight != 0)
1998                 cfs_memory_pressure_restore(mpflag);
1999
2000         if (rc != 0) {
2001                 LASSERT(req == NULL);
2002
2003                 if (oa)
2004                         OBDO_FREE(oa);
2005                 if (pga)
2006                         OBD_FREE(pga, sizeof(*pga) * page_count);
2007                 /* this should happen rarely and is pretty bad, it makes the
2008                  * pending list not follow the dirty order */
2009                 while (!list_empty(ext_list)) {
2010                         ext = list_entry(ext_list->next, struct osc_extent,
2011                                          oe_link);
2012                         list_del_init(&ext->oe_link);
2013                         osc_extent_finish(env, ext, 0, rc);
2014                 }
2015         }
2016         RETURN(rc);
2017 }
2018
2019 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2020 {
2021         int set = 0;
2022
2023         LASSERT(lock != NULL);
2024
2025         lock_res_and_lock(lock);
2026
2027         if (lock->l_ast_data == NULL)
2028                 lock->l_ast_data = data;
2029         if (lock->l_ast_data == data)
2030                 set = 1;
2031
2032         unlock_res_and_lock(lock);
2033
2034         return set;
2035 }
2036
2037 static int osc_enqueue_fini(struct ptlrpc_request *req,
2038                             osc_enqueue_upcall_f upcall, void *cookie,
2039                             struct lustre_handle *lockh, enum ldlm_mode mode,
2040                             __u64 *flags, int agl, int errcode)
2041 {
2042         bool intent = *flags & LDLM_FL_HAS_INTENT;
2043         int rc;
2044         ENTRY;
2045
2046         /* The request was created before ldlm_cli_enqueue call. */
2047         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2048                 struct ldlm_reply *rep;
2049
2050                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2051                 LASSERT(rep != NULL);
2052
2053                 rep->lock_policy_res1 =
2054                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2055                 if (rep->lock_policy_res1)
2056                         errcode = rep->lock_policy_res1;
2057                 if (!agl)
2058                         *flags |= LDLM_FL_LVB_READY;
2059         } else if (errcode == ELDLM_OK) {
2060                 *flags |= LDLM_FL_LVB_READY;
2061         }
2062
2063         /* Call the update callback. */
2064         rc = (*upcall)(cookie, lockh, errcode);
2065
2066         /* release the reference taken in ldlm_cli_enqueue() */
2067         if (errcode == ELDLM_LOCK_MATCHED)
2068                 errcode = ELDLM_OK;
2069         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2070                 ldlm_lock_decref(lockh, mode);
2071
2072         RETURN(rc);
2073 }
2074
2075 static int osc_enqueue_interpret(const struct lu_env *env,
2076                                  struct ptlrpc_request *req,
2077                                  struct osc_enqueue_args *aa, int rc)
2078 {
2079         struct ldlm_lock *lock;
2080         struct lustre_handle *lockh = &aa->oa_lockh;
2081         enum ldlm_mode mode = aa->oa_mode;
2082         struct ost_lvb *lvb = aa->oa_lvb;
2083         __u32 lvb_len = sizeof(*lvb);
2084         __u64 flags = 0;
2085
2086         ENTRY;
2087
2088         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2089          * be valid. */
2090         lock = ldlm_handle2lock(lockh);
2091         LASSERTF(lock != NULL,
2092                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2093                  lockh->cookie, req, aa);
2094
2095         /* Take an additional reference so that a blocking AST that
2096          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2097          * to arrive after an upcall has been executed by
2098          * osc_enqueue_fini(). */
2099         ldlm_lock_addref(lockh, mode);
2100
2101         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2102         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2103
2104         /* Let CP AST to grant the lock first. */
2105         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2106
2107         if (aa->oa_agl) {
2108                 LASSERT(aa->oa_lvb == NULL);
2109                 LASSERT(aa->oa_flags == NULL);
2110                 aa->oa_flags = &flags;
2111         }
2112
2113         /* Complete obtaining the lock procedure. */
2114         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2115                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2116                                    lockh, rc);
2117         /* Complete osc stuff. */
2118         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2119                               aa->oa_flags, aa->oa_agl, rc);
2120
2121         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2122
2123         ldlm_lock_decref(lockh, mode);
2124         LDLM_LOCK_PUT(lock);
2125         RETURN(rc);
2126 }
2127
2128 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2129
2130 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2131  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2132  * other synchronous requests, however keeping some locks and trying to obtain
2133  * others may take a considerable amount of time in a case of ost failure; and
2134  * when other sync requests do not get released lock from a client, the client
2135  * is evicted from the cluster -- such scenarious make the life difficult, so
2136  * release locks just after they are obtained. */
2137 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2138                      __u64 *flags, union ldlm_policy_data *policy,
2139                      struct ost_lvb *lvb, int kms_valid,
2140                      osc_enqueue_upcall_f upcall, void *cookie,
2141                      struct ldlm_enqueue_info *einfo,
2142                      struct ptlrpc_request_set *rqset, int async, int agl)
2143 {
2144         struct obd_device *obd = exp->exp_obd;
2145         struct lustre_handle lockh = { 0 };
2146         struct ptlrpc_request *req = NULL;
2147         int intent = *flags & LDLM_FL_HAS_INTENT;
2148         __u64 match_flags = *flags;
2149         enum ldlm_mode mode;
2150         int rc;
2151         ENTRY;
2152
2153         /* Filesystem lock extents are extended to page boundaries so that
2154          * dealing with the page cache is a little smoother.  */
2155         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2156         policy->l_extent.end |= ~PAGE_MASK;
2157
2158         /*
2159          * kms is not valid when either object is completely fresh (so that no
2160          * locks are cached), or object was evicted. In the latter case cached
2161          * lock cannot be used, because it would prime inode state with
2162          * potentially stale LVB.
2163          */
2164         if (!kms_valid)
2165                 goto no_match;
2166
2167         /* Next, search for already existing extent locks that will cover us */
2168         /* If we're trying to read, we also search for an existing PW lock.  The
2169          * VFS and page cache already protect us locally, so lots of readers/
2170          * writers can share a single PW lock.
2171          *
2172          * There are problems with conversion deadlocks, so instead of
2173          * converting a read lock to a write lock, we'll just enqueue a new
2174          * one.
2175          *
2176          * At some point we should cancel the read lock instead of making them
2177          * send us a blocking callback, but there are problems with canceling
2178          * locks out from other users right now, too. */
2179         mode = einfo->ei_mode;
2180         if (einfo->ei_mode == LCK_PR)
2181                 mode |= LCK_PW;
2182         if (agl == 0)
2183                 match_flags |= LDLM_FL_LVB_READY;
2184         if (intent != 0)
2185                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2186         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2187                                einfo->ei_type, policy, mode, &lockh, 0);
2188         if (mode) {
2189                 struct ldlm_lock *matched;
2190
2191                 if (*flags & LDLM_FL_TEST_LOCK)
2192                         RETURN(ELDLM_OK);
2193
2194                 matched = ldlm_handle2lock(&lockh);
2195                 if (agl) {
2196                         /* AGL enqueues DLM locks speculatively. Therefore if
2197                          * it already exists a DLM lock, it wll just inform the
2198                          * caller to cancel the AGL process for this stripe. */
2199                         ldlm_lock_decref(&lockh, mode);
2200                         LDLM_LOCK_PUT(matched);
2201                         RETURN(-ECANCELED);
2202                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2203                         *flags |= LDLM_FL_LVB_READY;
2204
2205                         /* We already have a lock, and it's referenced. */
2206                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2207
2208                         ldlm_lock_decref(&lockh, mode);
2209                         LDLM_LOCK_PUT(matched);
2210                         RETURN(ELDLM_OK);
2211                 } else {
2212                         ldlm_lock_decref(&lockh, mode);
2213                         LDLM_LOCK_PUT(matched);
2214                 }
2215         }
2216
2217 no_match:
2218         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2219                 RETURN(-ENOLCK);
2220
2221         if (intent) {
2222                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2223                                            &RQF_LDLM_ENQUEUE_LVB);
2224                 if (req == NULL)
2225                         RETURN(-ENOMEM);
2226
2227                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2228                 if (rc) {
2229                         ptlrpc_request_free(req);
2230                         RETURN(rc);
2231                 }
2232
2233                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2234                                      sizeof *lvb);
2235                 ptlrpc_request_set_replen(req);
2236         }
2237
2238         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2239         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2240
2241         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2242                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2243         if (async) {
2244                 if (!rc) {
2245                         struct osc_enqueue_args *aa;
2246                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2247                         aa = ptlrpc_req_async_args(req);
2248                         aa->oa_exp    = exp;
2249                         aa->oa_mode   = einfo->ei_mode;
2250                         aa->oa_type   = einfo->ei_type;
2251                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2252                         aa->oa_upcall = upcall;
2253                         aa->oa_cookie = cookie;
2254                         aa->oa_agl    = !!agl;
2255                         if (!agl) {
2256                                 aa->oa_flags  = flags;
2257                                 aa->oa_lvb    = lvb;
2258                         } else {
2259                                 /* AGL is essentially to enqueue an DLM lock
2260                                  * in advance, so we don't care about the
2261                                  * result of AGL enqueue. */
2262                                 aa->oa_lvb    = NULL;
2263                                 aa->oa_flags  = NULL;
2264                         }
2265
2266                         req->rq_interpret_reply =
2267                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2268                         if (rqset == PTLRPCD_SET)
2269                                 ptlrpcd_add_req(req);
2270                         else
2271                                 ptlrpc_set_add_req(rqset, req);
2272                 } else if (intent) {
2273                         ptlrpc_req_finished(req);
2274                 }
2275                 RETURN(rc);
2276         }
2277
2278         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2279                               flags, agl, rc);
2280         if (intent)
2281                 ptlrpc_req_finished(req);
2282
2283         RETURN(rc);
2284 }
2285
2286 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2287                    enum ldlm_type type, union ldlm_policy_data *policy,
2288                    enum ldlm_mode mode, __u64 *flags, void *data,
2289                    struct lustre_handle *lockh, int unref)
2290 {
2291         struct obd_device *obd = exp->exp_obd;
2292         __u64 lflags = *flags;
2293         enum ldlm_mode rc;
2294         ENTRY;
2295
2296         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2297                 RETURN(-EIO);
2298
2299         /* Filesystem lock extents are extended to page boundaries so that
2300          * dealing with the page cache is a little smoother */
2301         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2302         policy->l_extent.end |= ~PAGE_MASK;
2303
2304         /* Next, search for already existing extent locks that will cover us */
2305         /* If we're trying to read, we also search for an existing PW lock.  The
2306          * VFS and page cache already protect us locally, so lots of readers/
2307          * writers can share a single PW lock. */
2308         rc = mode;
2309         if (mode == LCK_PR)
2310                 rc |= LCK_PW;
2311         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2312                              res_id, type, policy, rc, lockh, unref);
2313         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2314                 RETURN(rc);
2315
2316         if (data != NULL) {
2317                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2318
2319                 LASSERT(lock != NULL);
2320                 if (!osc_set_lock_data(lock, data)) {
2321                         ldlm_lock_decref(lockh, rc);
2322                         rc = 0;
2323                 }
2324                 LDLM_LOCK_PUT(lock);
2325         }
2326         RETURN(rc);
2327 }
2328
2329 static int osc_statfs_interpret(const struct lu_env *env,
2330                                 struct ptlrpc_request *req,
2331                                 struct osc_async_args *aa, int rc)
2332 {
2333         struct obd_statfs *msfs;
2334         ENTRY;
2335
2336         if (rc == -EBADR)
2337                 /* The request has in fact never been sent
2338                  * due to issues at a higher level (LOV).
2339                  * Exit immediately since the caller is
2340                  * aware of the problem and takes care
2341                  * of the clean up */
2342                  RETURN(rc);
2343
2344         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2345             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2346                 GOTO(out, rc = 0);
2347
2348         if (rc != 0)
2349                 GOTO(out, rc);
2350
2351         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2352         if (msfs == NULL) {
2353                 GOTO(out, rc = -EPROTO);
2354         }
2355
2356         *aa->aa_oi->oi_osfs = *msfs;
2357 out:
2358         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2359         RETURN(rc);
2360 }
2361
2362 static int osc_statfs_async(struct obd_export *exp,
2363                             struct obd_info *oinfo, __u64 max_age,
2364                             struct ptlrpc_request_set *rqset)
2365 {
2366         struct obd_device     *obd = class_exp2obd(exp);
2367         struct ptlrpc_request *req;
2368         struct osc_async_args *aa;
2369         int                    rc;
2370         ENTRY;
2371
2372         /* We could possibly pass max_age in the request (as an absolute
2373          * timestamp or a "seconds.usec ago") so the target can avoid doing
2374          * extra calls into the filesystem if that isn't necessary (e.g.
2375          * during mount that would help a bit).  Having relative timestamps
2376          * is not so great if request processing is slow, while absolute
2377          * timestamps are not ideal because they need time synchronization. */
2378         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2379         if (req == NULL)
2380                 RETURN(-ENOMEM);
2381
2382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2383         if (rc) {
2384                 ptlrpc_request_free(req);
2385                 RETURN(rc);
2386         }
2387         ptlrpc_request_set_replen(req);
2388         req->rq_request_portal = OST_CREATE_PORTAL;
2389         ptlrpc_at_set_req_timeout(req);
2390
2391         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2392                 /* procfs requests not want stat in wait for avoid deadlock */
2393                 req->rq_no_resend = 1;
2394                 req->rq_no_delay = 1;
2395         }
2396
2397         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2398         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2399         aa = ptlrpc_req_async_args(req);
2400         aa->aa_oi = oinfo;
2401
2402         ptlrpc_set_add_req(rqset, req);
2403         RETURN(0);
2404 }
2405
2406 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2407                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2408 {
2409         struct obd_device     *obd = class_exp2obd(exp);
2410         struct obd_statfs     *msfs;
2411         struct ptlrpc_request *req;
2412         struct obd_import     *imp = NULL;
2413         int rc;
2414         ENTRY;
2415
2416         /*Since the request might also come from lprocfs, so we need
2417          *sync this with client_disconnect_export Bug15684*/
2418         down_read(&obd->u.cli.cl_sem);
2419         if (obd->u.cli.cl_import)
2420                 imp = class_import_get(obd->u.cli.cl_import);
2421         up_read(&obd->u.cli.cl_sem);
2422         if (!imp)
2423                 RETURN(-ENODEV);
2424
2425         /* We could possibly pass max_age in the request (as an absolute
2426          * timestamp or a "seconds.usec ago") so the target can avoid doing
2427          * extra calls into the filesystem if that isn't necessary (e.g.
2428          * during mount that would help a bit).  Having relative timestamps
2429          * is not so great if request processing is slow, while absolute
2430          * timestamps are not ideal because they need time synchronization. */
2431         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2432
2433         class_import_put(imp);
2434
2435         if (req == NULL)
2436                 RETURN(-ENOMEM);
2437
2438         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2439         if (rc) {
2440                 ptlrpc_request_free(req);
2441                 RETURN(rc);
2442         }
2443         ptlrpc_request_set_replen(req);
2444         req->rq_request_portal = OST_CREATE_PORTAL;
2445         ptlrpc_at_set_req_timeout(req);
2446
2447         if (flags & OBD_STATFS_NODELAY) {
2448                 /* procfs requests not want stat in wait for avoid deadlock */
2449                 req->rq_no_resend = 1;
2450                 req->rq_no_delay = 1;
2451         }
2452
2453         rc = ptlrpc_queue_wait(req);
2454         if (rc)
2455                 GOTO(out, rc);
2456
2457         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2458         if (msfs == NULL) {
2459                 GOTO(out, rc = -EPROTO);
2460         }
2461
2462         *osfs = *msfs;
2463
2464         EXIT;
2465  out:
2466         ptlrpc_req_finished(req);
2467         return rc;
2468 }
2469
2470 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2471                          void *karg, void __user *uarg)
2472 {
2473         struct obd_device *obd = exp->exp_obd;
2474         struct obd_ioctl_data *data = karg;
2475         int err = 0;
2476         ENTRY;
2477
2478         if (!try_module_get(THIS_MODULE)) {
2479                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2480                        module_name(THIS_MODULE));
2481                 return -EINVAL;
2482         }
2483         switch (cmd) {
2484         case OBD_IOC_CLIENT_RECOVER:
2485                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2486                                             data->ioc_inlbuf1, 0);
2487                 if (err > 0)
2488                         err = 0;
2489                 GOTO(out, err);
2490         case IOC_OSC_SET_ACTIVE:
2491                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2492                                                data->ioc_offset);
2493                 GOTO(out, err);
2494         case OBD_IOC_PING_TARGET:
2495                 err = ptlrpc_obd_ping(obd);
2496                 GOTO(out, err);
2497         default:
2498                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2499                        cmd, current_comm());
2500                 GOTO(out, err = -ENOTTY);
2501         }
2502 out:
2503         module_put(THIS_MODULE);
2504         return err;
2505 }
2506
2507 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2508                               u32 keylen, void *key,
2509                               u32 vallen, void *val,
2510                               struct ptlrpc_request_set *set)
2511 {
2512         struct ptlrpc_request *req;
2513         struct obd_device     *obd = exp->exp_obd;
2514         struct obd_import     *imp = class_exp2cliimp(exp);
2515         char                  *tmp;
2516         int                    rc;
2517         ENTRY;
2518
2519         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2520
2521         if (KEY_IS(KEY_CHECKSUM)) {
2522                 if (vallen != sizeof(int))
2523                         RETURN(-EINVAL);
2524                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2525                 RETURN(0);
2526         }
2527
2528         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2529                 sptlrpc_conf_client_adapt(obd);
2530                 RETURN(0);
2531         }
2532
2533         if (KEY_IS(KEY_FLUSH_CTX)) {
2534                 sptlrpc_import_flush_my_ctx(imp);
2535                 RETURN(0);
2536         }
2537
2538         if (KEY_IS(KEY_CACHE_SET)) {
2539                 struct client_obd *cli = &obd->u.cli;
2540
2541                 LASSERT(cli->cl_cache == NULL); /* only once */
2542                 cli->cl_cache = (struct cl_client_cache *)val;
2543                 cl_cache_incref(cli->cl_cache);
2544                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2545
2546                 /* add this osc into entity list */
2547                 LASSERT(list_empty(&cli->cl_lru_osc));
2548                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2549                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2550                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2551
2552                 RETURN(0);
2553         }
2554
2555         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2556                 struct client_obd *cli = &obd->u.cli;
2557                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2558                 long target = *(long *)val;
2559
2560                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2561                 *(long *)val -= nr;
2562                 RETURN(0);
2563         }
2564
2565         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2566                 RETURN(-EINVAL);
2567
2568         /* We pass all other commands directly to OST. Since nobody calls osc
2569            methods directly and everybody is supposed to go through LOV, we
2570            assume lov checked invalid values for us.
2571            The only recognised values so far are evict_by_nid and mds_conn.
2572            Even if something bad goes through, we'd get a -EINVAL from OST
2573            anyway. */
2574
2575         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2576                                                 &RQF_OST_SET_GRANT_INFO :
2577                                                 &RQF_OBD_SET_INFO);
2578         if (req == NULL)
2579                 RETURN(-ENOMEM);
2580
2581         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2582                              RCL_CLIENT, keylen);
2583         if (!KEY_IS(KEY_GRANT_SHRINK))
2584                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2585                                      RCL_CLIENT, vallen);
2586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2587         if (rc) {
2588                 ptlrpc_request_free(req);
2589                 RETURN(rc);
2590         }
2591
2592         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2593         memcpy(tmp, key, keylen);
2594         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2595                                                         &RMF_OST_BODY :
2596                                                         &RMF_SETINFO_VAL);
2597         memcpy(tmp, val, vallen);
2598
2599         if (KEY_IS(KEY_GRANT_SHRINK)) {
2600                 struct osc_grant_args *aa;
2601                 struct obdo *oa;
2602
2603                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2604                 aa = ptlrpc_req_async_args(req);
2605                 OBDO_ALLOC(oa);
2606                 if (!oa) {
2607                         ptlrpc_req_finished(req);
2608                         RETURN(-ENOMEM);
2609                 }
2610                 *oa = ((struct ost_body *)val)->oa;
2611                 aa->aa_oa = oa;
2612                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2613         }
2614
2615         ptlrpc_request_set_replen(req);
2616         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2617                 LASSERT(set != NULL);
2618                 ptlrpc_set_add_req(set, req);
2619                 ptlrpc_check_set(NULL, set);
2620         } else {
2621                 ptlrpcd_add_req(req);
2622         }
2623
2624         RETURN(0);
2625 }
2626
2627 static int osc_reconnect(const struct lu_env *env,
2628                          struct obd_export *exp, struct obd_device *obd,
2629                          struct obd_uuid *cluuid,
2630                          struct obd_connect_data *data,
2631                          void *localdata)
2632 {
2633         struct client_obd *cli = &obd->u.cli;
2634
2635         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2636                 long lost_grant;
2637                 long grant;
2638
2639                 spin_lock(&cli->cl_loi_list_lock);
2640                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2641                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2642                         grant += cli->cl_dirty_grant;
2643                 else
2644                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2645                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2646                 lost_grant = cli->cl_lost_grant;
2647                 cli->cl_lost_grant = 0;
2648                 spin_unlock(&cli->cl_loi_list_lock);
2649
2650                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2651                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2652                        data->ocd_version, data->ocd_grant, lost_grant);
2653         }
2654
2655         RETURN(0);
2656 }
2657
2658 static int osc_disconnect(struct obd_export *exp)
2659 {
2660         struct obd_device *obd = class_exp2obd(exp);
2661         int rc;
2662
2663         rc = client_disconnect_export(exp);
2664         /**
2665          * Initially we put del_shrink_grant before disconnect_export, but it
2666          * causes the following problem if setup (connect) and cleanup
2667          * (disconnect) are tangled together.
2668          *      connect p1                     disconnect p2
2669          *   ptlrpc_connect_import
2670          *     ...............               class_manual_cleanup
2671          *                                     osc_disconnect
2672          *                                     del_shrink_grant
2673          *   ptlrpc_connect_interrupt
2674          *     init_grant_shrink
2675          *   add this client to shrink list
2676          *                                      cleanup_osc
2677          * Bang! pinger trigger the shrink.
2678          * So the osc should be disconnected from the shrink list, after we
2679          * are sure the import has been destroyed. BUG18662
2680          */
2681         if (obd->u.cli.cl_import == NULL)
2682                 osc_del_shrink_grant(&obd->u.cli);
2683         return rc;
2684 }
2685
2686 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2687         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2688 {
2689         struct lu_env *env = arg;
2690         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2691         struct ldlm_lock *lock;
2692         struct osc_object *osc = NULL;
2693         ENTRY;
2694
2695         lock_res(res);
2696         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2697                 if (lock->l_ast_data != NULL && osc == NULL) {
2698                         osc = lock->l_ast_data;
2699                         cl_object_get(osc2cl(osc));
2700                 }
2701
2702                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2703                  * by the 2nd round of ldlm_namespace_clean() call in
2704                  * osc_import_event(). */
2705                 ldlm_clear_cleaned(lock);
2706         }
2707         unlock_res(res);
2708
2709         if (osc != NULL) {
2710                 osc_object_invalidate(env, osc);
2711                 cl_object_put(env, osc2cl(osc));
2712         }
2713
2714         RETURN(0);
2715 }
2716
2717 static int osc_import_event(struct obd_device *obd,
2718                             struct obd_import *imp,
2719                             enum obd_import_event event)
2720 {
2721         struct client_obd *cli;
2722         int rc = 0;
2723
2724         ENTRY;
2725         LASSERT(imp->imp_obd == obd);
2726
2727         switch (event) {
2728         case IMP_EVENT_DISCON: {
2729                 cli = &obd->u.cli;
2730                 spin_lock(&cli->cl_loi_list_lock);
2731                 cli->cl_avail_grant = 0;
2732                 cli->cl_lost_grant = 0;
2733                 spin_unlock(&cli->cl_loi_list_lock);
2734                 break;
2735         }
2736         case IMP_EVENT_INACTIVE: {
2737                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2738                 break;
2739         }
2740         case IMP_EVENT_INVALIDATE: {
2741                 struct ldlm_namespace *ns = obd->obd_namespace;
2742                 struct lu_env         *env;
2743                 __u16                  refcheck;
2744
2745                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2746
2747                 env = cl_env_get(&refcheck);
2748                 if (!IS_ERR(env)) {
2749                         osc_io_unplug(env, &obd->u.cli, NULL);
2750
2751                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2752                                                  osc_ldlm_resource_invalidate,
2753                                                  env, 0);
2754                         cl_env_put(env, &refcheck);
2755
2756                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2757                 } else
2758                         rc = PTR_ERR(env);
2759                 break;
2760         }
2761         case IMP_EVENT_ACTIVE: {
2762                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2763                 break;
2764         }
2765         case IMP_EVENT_OCD: {
2766                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2767
2768                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2769                         osc_init_grant(&obd->u.cli, ocd);
2770
2771                 /* See bug 7198 */
2772                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2773                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2774
2775                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2776                 break;
2777         }
2778         case IMP_EVENT_DEACTIVATE: {
2779                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2780                 break;
2781         }
2782         case IMP_EVENT_ACTIVATE: {
2783                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2784                 break;
2785         }
2786         default:
2787                 CERROR("Unknown import event %d\n", event);
2788                 LBUG();
2789         }
2790         RETURN(rc);
2791 }
2792
2793 /**
2794  * Determine whether the lock can be canceled before replaying the lock
2795  * during recovery, see bug16774 for detailed information.
2796  *
2797  * \retval zero the lock can't be canceled
2798  * \retval other ok to cancel
2799  */
2800 static int osc_cancel_weight(struct ldlm_lock *lock)
2801 {
2802         /*
2803          * Cancel all unused and granted extent lock.
2804          */
2805         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2806             lock->l_granted_mode == lock->l_req_mode &&
2807             osc_ldlm_weigh_ast(lock) == 0)
2808                 RETURN(1);
2809
2810         RETURN(0);
2811 }
2812
2813 static int brw_queue_work(const struct lu_env *env, void *data)
2814 {
2815         struct client_obd *cli = data;
2816
2817         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2818
2819         osc_io_unplug(env, cli, NULL);
2820         RETURN(0);
2821 }
2822
2823 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2824 {
2825         struct client_obd *cli = &obd->u.cli;
2826         struct obd_type   *type;
2827         void              *handler;
2828         int                rc;
2829         int                adding;
2830         int                added;
2831         int                req_count;
2832         ENTRY;
2833
2834         rc = ptlrpcd_addref();
2835         if (rc)
2836                 RETURN(rc);
2837
2838         rc = client_obd_setup(obd, lcfg);
2839         if (rc)
2840                 GOTO(out_ptlrpcd, rc);
2841
2842         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2843         if (IS_ERR(handler))
2844                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2845         cli->cl_writeback_work = handler;
2846
2847         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2848         if (IS_ERR(handler))
2849                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2850         cli->cl_lru_work = handler;
2851
2852         rc = osc_quota_setup(obd);
2853         if (rc)
2854                 GOTO(out_ptlrpcd_work, rc);
2855
2856         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2857
2858 #ifdef CONFIG_PROC_FS
2859         obd->obd_vars = lprocfs_osc_obd_vars;
2860 #endif
2861         /* If this is true then both client (osc) and server (osp) are on the
2862          * same node. The osp layer if loaded first will register the osc proc
2863          * directory. In that case this obd_device will be attached its proc
2864          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2865         type = class_search_type(LUSTRE_OSP_NAME);
2866         if (type && type->typ_procsym) {
2867                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2868                                                        type->typ_procsym,
2869                                                        obd->obd_vars, obd);
2870                 if (IS_ERR(obd->obd_proc_entry)) {
2871                         rc = PTR_ERR(obd->obd_proc_entry);
2872                         CERROR("error %d setting up lprocfs for %s\n", rc,
2873                                obd->obd_name);
2874                         obd->obd_proc_entry = NULL;
2875                 }
2876         } else {
2877                 rc = lprocfs_obd_setup(obd);
2878         }
2879
2880         /* If the basic OSC proc tree construction succeeded then
2881          * lets do the rest. */
2882         if (rc == 0) {
2883                 lproc_osc_attach_seqstat(obd);
2884                 sptlrpc_lprocfs_cliobd_attach(obd);
2885                 ptlrpc_lprocfs_register_obd(obd);
2886         }
2887
2888         /*
2889          * We try to control the total number of requests with a upper limit
2890          * osc_reqpool_maxreqcount. There might be some race which will cause
2891          * over-limit allocation, but it is fine.
2892          */
2893         req_count = atomic_read(&osc_pool_req_count);
2894         if (req_count < osc_reqpool_maxreqcount) {
2895                 adding = cli->cl_max_rpcs_in_flight + 2;
2896                 if (req_count + adding > osc_reqpool_maxreqcount)
2897                         adding = osc_reqpool_maxreqcount - req_count;
2898
2899                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2900                 atomic_add(added, &osc_pool_req_count);
2901         }
2902
2903         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2904         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2905
2906         spin_lock(&osc_shrink_lock);
2907         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2908         spin_unlock(&osc_shrink_lock);
2909
2910         RETURN(0);
2911
2912 out_ptlrpcd_work:
2913         if (cli->cl_writeback_work != NULL) {
2914                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2915                 cli->cl_writeback_work = NULL;
2916         }
2917         if (cli->cl_lru_work != NULL) {
2918                 ptlrpcd_destroy_work(cli->cl_lru_work);
2919                 cli->cl_lru_work = NULL;
2920         }
2921 out_client_setup:
2922         client_obd_cleanup(obd);
2923 out_ptlrpcd:
2924         ptlrpcd_decref();
2925         RETURN(rc);
2926 }
2927
2928 static int osc_precleanup(struct obd_device *obd)
2929 {
2930         struct client_obd *cli = &obd->u.cli;
2931         ENTRY;
2932
2933         /* LU-464
2934          * for echo client, export may be on zombie list, wait for
2935          * zombie thread to cull it, because cli.cl_import will be
2936          * cleared in client_disconnect_export():
2937          *   class_export_destroy() -> obd_cleanup() ->
2938          *   echo_device_free() -> echo_client_cleanup() ->
2939          *   obd_disconnect() -> osc_disconnect() ->
2940          *   client_disconnect_export()
2941          */
2942         obd_zombie_barrier();
2943         if (cli->cl_writeback_work) {
2944                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2945                 cli->cl_writeback_work = NULL;
2946         }
2947
2948         if (cli->cl_lru_work) {
2949                 ptlrpcd_destroy_work(cli->cl_lru_work);
2950                 cli->cl_lru_work = NULL;
2951         }
2952
2953         obd_cleanup_client_import(obd);
2954         ptlrpc_lprocfs_unregister_obd(obd);
2955         lprocfs_obd_cleanup(obd);
2956         RETURN(0);
2957 }
2958
2959 int osc_cleanup(struct obd_device *obd)
2960 {
2961         struct client_obd *cli = &obd->u.cli;
2962         int rc;
2963
2964         ENTRY;
2965
2966         spin_lock(&osc_shrink_lock);
2967         list_del(&cli->cl_shrink_list);
2968         spin_unlock(&osc_shrink_lock);
2969
2970         /* lru cleanup */
2971         if (cli->cl_cache != NULL) {
2972                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2973                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2974                 list_del_init(&cli->cl_lru_osc);
2975                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2976                 cli->cl_lru_left = NULL;
2977                 cl_cache_decref(cli->cl_cache);
2978                 cli->cl_cache = NULL;
2979         }
2980
2981         /* free memory of osc quota cache */
2982         osc_quota_cleanup(obd);
2983
2984         rc = client_obd_cleanup(obd);
2985
2986         ptlrpcd_decref();
2987         RETURN(rc);
2988 }
2989
2990 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2991 {
2992         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2993         return rc > 0 ? 0: rc;
2994 }
2995
2996 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2997 {
2998         return osc_process_config_base(obd, buf);
2999 }
3000
3001 static struct obd_ops osc_obd_ops = {
3002         .o_owner                = THIS_MODULE,
3003         .o_setup                = osc_setup,
3004         .o_precleanup           = osc_precleanup,
3005         .o_cleanup              = osc_cleanup,
3006         .o_add_conn             = client_import_add_conn,
3007         .o_del_conn             = client_import_del_conn,
3008         .o_connect              = client_connect_import,
3009         .o_reconnect            = osc_reconnect,
3010         .o_disconnect           = osc_disconnect,
3011         .o_statfs               = osc_statfs,
3012         .o_statfs_async         = osc_statfs_async,
3013         .o_create               = osc_create,
3014         .o_destroy              = osc_destroy,
3015         .o_getattr              = osc_getattr,
3016         .o_setattr              = osc_setattr,
3017         .o_iocontrol            = osc_iocontrol,
3018         .o_set_info_async       = osc_set_info_async,
3019         .o_import_event         = osc_import_event,
3020         .o_process_config       = osc_process_config,
3021         .o_quotactl             = osc_quotactl,
3022 };
3023
3024 static struct shrinker *osc_cache_shrinker;
3025 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3026 DEFINE_SPINLOCK(osc_shrink_lock);
3027
3028 #ifndef HAVE_SHRINKER_COUNT
3029 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3030 {
3031         struct shrink_control scv = {
3032                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3033                 .gfp_mask   = shrink_param(sc, gfp_mask)
3034         };
3035 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3036         struct shrinker *shrinker = NULL;
3037 #endif
3038
3039         (void)osc_cache_shrink_scan(shrinker, &scv);
3040
3041         return osc_cache_shrink_count(shrinker, &scv);
3042 }
3043 #endif
3044
3045 static int __init osc_init(void)
3046 {
3047         bool enable_proc = true;
3048         struct obd_type *type;
3049         unsigned int reqpool_size;
3050         unsigned int reqsize;
3051         int rc;
3052         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3053                          osc_cache_shrink_count, osc_cache_shrink_scan);
3054         ENTRY;
3055
3056         /* print an address of _any_ initialized kernel symbol from this
3057          * module, to allow debugging with gdb that doesn't support data
3058          * symbols from modules.*/
3059         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3060
3061         rc = lu_kmem_init(osc_caches);
3062         if (rc)
3063                 RETURN(rc);
3064
3065         type = class_search_type(LUSTRE_OSP_NAME);
3066         if (type != NULL && type->typ_procsym != NULL)
3067                 enable_proc = false;
3068
3069         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3070                                  LUSTRE_OSC_NAME, &osc_device_type);
3071         if (rc)
3072                 GOTO(out_kmem, rc);
3073
3074         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3075
3076         /* This is obviously too much memory, only prevent overflow here */
3077         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3078                 GOTO(out_type, rc = -EINVAL);
3079
3080         reqpool_size = osc_reqpool_mem_max << 20;
3081
3082         reqsize = 1;
3083         while (reqsize < OST_IO_MAXREQSIZE)
3084                 reqsize = reqsize << 1;
3085
3086         /*
3087          * We don't enlarge the request count in OSC pool according to
3088          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3089          * tried after normal allocation failed. So a small OSC pool won't
3090          * cause much performance degression in most of cases.
3091          */
3092         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3093
3094         atomic_set(&osc_pool_req_count, 0);
3095         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3096                                           ptlrpc_add_rqs_to_pool);
3097
3098         if (osc_rq_pool != NULL)
3099                 GOTO(out, rc);
3100         rc = -ENOMEM;
3101 out_type:
3102         class_unregister_type(LUSTRE_OSC_NAME);
3103 out_kmem:
3104         lu_kmem_fini(osc_caches);
3105 out:
3106         RETURN(rc);
3107 }
3108
3109 static void __exit osc_exit(void)
3110 {
3111         remove_shrinker(osc_cache_shrinker);
3112         class_unregister_type(LUSTRE_OSC_NAME);
3113         lu_kmem_fini(osc_caches);
3114         ptlrpc_free_rq_pool(osc_rq_pool);
3115 }
3116
3117 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3118 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3119 MODULE_VERSION(LUSTRE_VERSION_STRING);
3120 MODULE_LICENSE("GPL");
3121
3122 module_init(osc_init);
3123 module_exit(osc_exit);