Whamcloud - gitweb
LU-3285 osc: common client setup/cleanup
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 struct osc_brw_async_args {
62         struct obdo              *aa_oa;
63         int                       aa_requested_nob;
64         int                       aa_nio_count;
65         u32                       aa_page_count;
66         int                       aa_resends;
67         struct brw_page **aa_ppga;
68         struct client_obd        *aa_cli;
69         struct list_head          aa_oaps;
70         struct list_head          aa_exts;
71 };
72
73 #define osc_grant_args osc_brw_async_args
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct osc_object       *fa_obj;
83         struct obdo             *fa_oa;
84         obd_enqueue_update_f    fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_ladvise_args {
89         struct obdo             *la_oa;
90         obd_enqueue_update_f     la_upcall;
91         void                    *la_cookie;
92 };
93
94 struct osc_enqueue_args {
95         struct obd_export       *oa_exp;
96         enum ldlm_type          oa_type;
97         enum ldlm_mode          oa_mode;
98         __u64                   *oa_flags;
99         osc_enqueue_upcall_f    oa_upcall;
100         void                    *oa_cookie;
101         struct ost_lvb          *oa_lvb;
102         struct lustre_handle    oa_lockh;
103         bool                    oa_speculative;
104 };
105
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108                          void *data, int rc);
109
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
111 {
112         struct ost_body *body;
113
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         LASSERT(body);
116
117         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 }
119
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121                        struct obdo *oa)
122 {
123         struct ptlrpc_request   *req;
124         struct ost_body         *body;
125         int                      rc;
126
127         ENTRY;
128         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
129         if (req == NULL)
130                 RETURN(-ENOMEM);
131
132         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
133         if (rc) {
134                 ptlrpc_request_free(req);
135                 RETURN(rc);
136         }
137
138         osc_pack_req_body(req, oa);
139
140         ptlrpc_request_set_replen(req);
141
142         rc = ptlrpc_queue_wait(req);
143         if (rc)
144                 GOTO(out, rc);
145
146         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
147         if (body == NULL)
148                 GOTO(out, rc = -EPROTO);
149
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
152
153         oa->o_blksize = cli_brw_size(exp->exp_obd);
154         oa->o_valid |= OBD_MD_FLBLKSZ;
155
156         EXIT;
157 out:
158         ptlrpc_req_finished(req);
159
160         return rc;
161 }
162
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164                        struct obdo *oa)
165 {
166         struct ptlrpc_request   *req;
167         struct ost_body         *body;
168         int                      rc;
169
170         ENTRY;
171         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
172
173         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
174         if (req == NULL)
175                 RETURN(-ENOMEM);
176
177         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
178         if (rc) {
179                 ptlrpc_request_free(req);
180                 RETURN(rc);
181         }
182
183         osc_pack_req_body(req, oa);
184
185         ptlrpc_request_set_replen(req);
186
187         rc = ptlrpc_queue_wait(req);
188         if (rc)
189                 GOTO(out, rc);
190
191         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192         if (body == NULL)
193                 GOTO(out, rc = -EPROTO);
194
195         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
196
197         EXIT;
198 out:
199         ptlrpc_req_finished(req);
200
201         RETURN(rc);
202 }
203
204 static int osc_setattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_setattr_args *sa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body == NULL)
216                 GOTO(out, rc = -EPROTO);
217
218         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219                              &body->oa);
220 out:
221         rc = sa->sa_upcall(sa->sa_cookie, rc);
222         RETURN(rc);
223 }
224
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226                       obd_enqueue_update_f upcall, void *cookie,
227                       struct ptlrpc_request_set *rqset)
228 {
229         struct ptlrpc_request   *req;
230         struct osc_setattr_args *sa;
231         int                      rc;
232
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oa);
246
247         ptlrpc_request_set_replen(req);
248
249         /* do mds to ost setattr asynchronously */
250         if (!rqset) {
251                 /* Do not wait for response. */
252                 ptlrpcd_add_req(req);
253         } else {
254                 req->rq_interpret_reply =
255                         (ptlrpc_interpterer_t)osc_setattr_interpret;
256
257                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258                 sa = ptlrpc_req_async_args(req);
259                 sa->sa_oa = oa;
260                 sa->sa_upcall = upcall;
261                 sa->sa_cookie = cookie;
262
263                 if (rqset == PTLRPCD_SET)
264                         ptlrpcd_add_req(req);
265                 else
266                         ptlrpc_set_add_req(rqset, req);
267         }
268
269         RETURN(0);
270 }
271
272 static int osc_ladvise_interpret(const struct lu_env *env,
273                                  struct ptlrpc_request *req,
274                                  void *arg, int rc)
275 {
276         struct osc_ladvise_args *la = arg;
277         struct ost_body *body;
278         ENTRY;
279
280         if (rc != 0)
281                 GOTO(out, rc);
282
283         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284         if (body == NULL)
285                 GOTO(out, rc = -EPROTO);
286
287         *la->la_oa = body->oa;
288 out:
289         rc = la->la_upcall(la->la_cookie, rc);
290         RETURN(rc);
291 }
292
293 /**
294  * If rqset is NULL, do not wait for response. Upcall and cookie could also
295  * be NULL in this case
296  */
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298                      struct ladvise_hdr *ladvise_hdr,
299                      obd_enqueue_update_f upcall, void *cookie,
300                      struct ptlrpc_request_set *rqset)
301 {
302         struct ptlrpc_request   *req;
303         struct ost_body         *body;
304         struct osc_ladvise_args *la;
305         int                      rc;
306         struct lu_ladvise       *req_ladvise;
307         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
308         int                      num_advise = ladvise_hdr->lah_count;
309         struct ladvise_hdr      *req_ladvise_hdr;
310         ENTRY;
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317                              num_advise * sizeof(*ladvise));
318         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
319         if (rc != 0) {
320                 ptlrpc_request_free(req);
321                 RETURN(rc);
322         }
323         req->rq_request_portal = OST_IO_PORTAL;
324         ptlrpc_at_set_req_timeout(req);
325
326         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
327         LASSERT(body);
328         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
329                              oa);
330
331         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332                                                  &RMF_OST_LADVISE_HDR);
333         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
334
335         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337         ptlrpc_request_set_replen(req);
338
339         if (rqset == NULL) {
340                 /* Do not wait for response. */
341                 ptlrpcd_add_req(req);
342                 RETURN(0);
343         }
344
345         req->rq_interpret_reply = osc_ladvise_interpret;
346         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347         la = ptlrpc_req_async_args(req);
348         la->la_oa = oa;
349         la->la_upcall = upcall;
350         la->la_cookie = cookie;
351
352         if (rqset == PTLRPCD_SET)
353                 ptlrpcd_add_req(req);
354         else
355                 ptlrpc_set_add_req(rqset, req);
356
357         RETURN(0);
358 }
359
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
361                       struct obdo *oa)
362 {
363         struct ptlrpc_request *req;
364         struct ost_body       *body;
365         int                    rc;
366         ENTRY;
367
368         LASSERT(oa != NULL);
369         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
371
372         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
373         if (req == NULL)
374                 GOTO(out, rc = -ENOMEM);
375
376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
377         if (rc) {
378                 ptlrpc_request_free(req);
379                 GOTO(out, rc);
380         }
381
382         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
383         LASSERT(body);
384
385         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
386
387         ptlrpc_request_set_replen(req);
388
389         rc = ptlrpc_queue_wait(req);
390         if (rc)
391                 GOTO(out_req, rc);
392
393         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
394         if (body == NULL)
395                 GOTO(out_req, rc = -EPROTO);
396
397         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
399
400         oa->o_blksize = cli_brw_size(exp->exp_obd);
401         oa->o_valid |= OBD_MD_FLBLKSZ;
402
403         CDEBUG(D_HA, "transno: %lld\n",
404                lustre_msg_get_transno(req->rq_repmsg));
405 out_req:
406         ptlrpc_req_finished(req);
407 out:
408         RETURN(rc);
409 }
410
411 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
412                    obd_enqueue_update_f upcall, void *cookie,
413                    struct ptlrpc_request_set *rqset)
414 {
415         struct ptlrpc_request   *req;
416         struct osc_setattr_args *sa;
417         struct ost_body         *body;
418         int                      rc;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
422         if (req == NULL)
423                 RETURN(-ENOMEM);
424
425         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
426         if (rc) {
427                 ptlrpc_request_free(req);
428                 RETURN(rc);
429         }
430         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
431         ptlrpc_at_set_req_timeout(req);
432
433         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
434         LASSERT(body);
435         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
436
437         ptlrpc_request_set_replen(req);
438
439         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
440         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
441         sa = ptlrpc_req_async_args(req);
442         sa->sa_oa = oa;
443         sa->sa_upcall = upcall;
444         sa->sa_cookie = cookie;
445         if (rqset == PTLRPCD_SET)
446                 ptlrpcd_add_req(req);
447         else
448                 ptlrpc_set_add_req(rqset, req);
449
450         RETURN(0);
451 }
452
453 static int osc_sync_interpret(const struct lu_env *env,
454                               struct ptlrpc_request *req,
455                               void *arg, int rc)
456 {
457         struct osc_fsync_args   *fa = arg;
458         struct ost_body         *body;
459         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
460         unsigned long           valid = 0;
461         struct cl_object        *obj;
462         ENTRY;
463
464         if (rc != 0)
465                 GOTO(out, rc);
466
467         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
468         if (body == NULL) {
469                 CERROR("can't unpack ost_body\n");
470                 GOTO(out, rc = -EPROTO);
471         }
472
473         *fa->fa_oa = body->oa;
474         obj = osc2cl(fa->fa_obj);
475
476         /* Update osc object's blocks attribute */
477         cl_object_attr_lock(obj);
478         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
479                 attr->cat_blocks = body->oa.o_blocks;
480                 valid |= CAT_BLOCKS;
481         }
482
483         if (valid != 0)
484                 cl_object_attr_update(env, obj, attr, valid);
485         cl_object_attr_unlock(obj);
486
487 out:
488         rc = fa->fa_upcall(fa->fa_cookie, rc);
489         RETURN(rc);
490 }
491
492 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
493                   obd_enqueue_update_f upcall, void *cookie,
494                   struct ptlrpc_request_set *rqset)
495 {
496         struct obd_export     *exp = osc_export(obj);
497         struct ptlrpc_request *req;
498         struct ost_body       *body;
499         struct osc_fsync_args *fa;
500         int                    rc;
501         ENTRY;
502
503         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
504         if (req == NULL)
505                 RETURN(-ENOMEM);
506
507         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
508         if (rc) {
509                 ptlrpc_request_free(req);
510                 RETURN(rc);
511         }
512
513         /* overload the size and blocks fields in the oa with start/end */
514         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
515         LASSERT(body);
516         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
517
518         ptlrpc_request_set_replen(req);
519         req->rq_interpret_reply = osc_sync_interpret;
520
521         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
522         fa = ptlrpc_req_async_args(req);
523         fa->fa_obj = obj;
524         fa->fa_oa = oa;
525         fa->fa_upcall = upcall;
526         fa->fa_cookie = cookie;
527
528         if (rqset == PTLRPCD_SET)
529                 ptlrpcd_add_req(req);
530         else
531                 ptlrpc_set_add_req(rqset, req);
532
533         RETURN (0);
534 }
535
536 /* Find and cancel locally locks matched by @mode in the resource found by
537  * @objid. Found locks are added into @cancel list. Returns the amount of
538  * locks added to @cancels list. */
539 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
540                                    struct list_head *cancels,
541                                    enum ldlm_mode mode, __u64 lock_flags)
542 {
543         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
544         struct ldlm_res_id res_id;
545         struct ldlm_resource *res;
546         int count;
547         ENTRY;
548
549         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
550          * export) but disabled through procfs (flag in NS).
551          *
552          * This distinguishes from a case when ELC is not supported originally,
553          * when we still want to cancel locks in advance and just cancel them
554          * locally, without sending any RPC. */
555         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
556                 RETURN(0);
557
558         ostid_build_res_name(&oa->o_oi, &res_id);
559         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
560         if (IS_ERR(res))
561                 RETURN(0);
562
563         LDLM_RESOURCE_ADDREF(res);
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         LDLM_RESOURCE_DELREF(res);
567         ldlm_resource_putref(res);
568         RETURN(count);
569 }
570
571 static int osc_destroy_interpret(const struct lu_env *env,
572                                  struct ptlrpc_request *req, void *data,
573                                  int rc)
574 {
575         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
576
577         atomic_dec(&cli->cl_destroy_in_flight);
578         wake_up(&cli->cl_destroy_waitq);
579         return 0;
580 }
581
582 static int osc_can_send_destroy(struct client_obd *cli)
583 {
584         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
585             cli->cl_max_rpcs_in_flight) {
586                 /* The destroy request can be sent */
587                 return 1;
588         }
589         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
590             cli->cl_max_rpcs_in_flight) {
591                 /*
592                  * The counter has been modified between the two atomic
593                  * operations.
594                  */
595                 wake_up(&cli->cl_destroy_waitq);
596         }
597         return 0;
598 }
599
600 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
601                        struct obdo *oa)
602 {
603         struct client_obd     *cli = &exp->exp_obd->u.cli;
604         struct ptlrpc_request *req;
605         struct ost_body       *body;
606         struct list_head       cancels = LIST_HEAD_INIT(cancels);
607         int rc, count;
608         ENTRY;
609
610         if (!oa) {
611                 CDEBUG(D_INFO, "oa NULL\n");
612                 RETURN(-EINVAL);
613         }
614
615         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
616                                         LDLM_FL_DISCARD_DATA);
617
618         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
619         if (req == NULL) {
620                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
621                 RETURN(-ENOMEM);
622         }
623
624         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
625                                0, &cancels, count);
626         if (rc) {
627                 ptlrpc_request_free(req);
628                 RETURN(rc);
629         }
630
631         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
632         ptlrpc_at_set_req_timeout(req);
633
634         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635         LASSERT(body);
636         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
637
638         ptlrpc_request_set_replen(req);
639
640         req->rq_interpret_reply = osc_destroy_interpret;
641         if (!osc_can_send_destroy(cli)) {
642                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
643
644                 /*
645                  * Wait until the number of on-going destroy RPCs drops
646                  * under max_rpc_in_flight
647                  */
648                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
649                                             osc_can_send_destroy(cli), &lwi);
650                 if (rc) {
651                         ptlrpc_req_finished(req);
652                         RETURN(rc);
653                 }
654         }
655
656         /* Do not wait for response */
657         ptlrpcd_add_req(req);
658         RETURN(0);
659 }
660
661 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
662                                 long writing_bytes)
663 {
664         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
665
666         LASSERT(!(oa->o_valid & bits));
667
668         oa->o_valid |= bits;
669         spin_lock(&cli->cl_loi_list_lock);
670         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
671                 oa->o_dirty = cli->cl_dirty_grant;
672         else
673                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
674         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
675                      cli->cl_dirty_max_pages)) {
676                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
677                        cli->cl_dirty_pages, cli->cl_dirty_transit,
678                        cli->cl_dirty_max_pages);
679                 oa->o_undirty = 0;
680         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
681                             atomic_long_read(&obd_dirty_transit_pages) >
682                             (long)(obd_max_dirty_pages + 1))) {
683                 /* The atomic_read() allowing the atomic_inc() are
684                  * not covered by a lock thus they may safely race and trip
685                  * this CERROR() unless we add in a small fudge factor (+1). */
686                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
687                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
688                        atomic_long_read(&obd_dirty_transit_pages),
689                        obd_max_dirty_pages);
690                 oa->o_undirty = 0;
691         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
692                             0x7fffffff)) {
693                 CERROR("dirty %lu - dirty_max %lu too big???\n",
694                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
695                 oa->o_undirty = 0;
696         } else {
697                 unsigned long nrpages;
698
699                 nrpages = cli->cl_max_pages_per_rpc;
700                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
701                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
702                 oa->o_undirty = nrpages << PAGE_SHIFT;
703                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
704                                  GRANT_PARAM)) {
705                         int nrextents;
706
707                         /* take extent tax into account when asking for more
708                          * grant space */
709                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
710                                      cli->cl_max_extent_pages;
711                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
712                 }
713         }
714         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
715         oa->o_dropped = cli->cl_lost_grant;
716         cli->cl_lost_grant = 0;
717         spin_unlock(&cli->cl_loi_list_lock);
718         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
719                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
720 }
721
722 void osc_update_next_shrink(struct client_obd *cli)
723 {
724         cli->cl_next_shrink_grant =
725                 cfs_time_shift(cli->cl_grant_shrink_interval);
726         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
727                cli->cl_next_shrink_grant);
728 }
729
730 static void __osc_update_grant(struct client_obd *cli, u64 grant)
731 {
732         spin_lock(&cli->cl_loi_list_lock);
733         cli->cl_avail_grant += grant;
734         spin_unlock(&cli->cl_loi_list_lock);
735 }
736
737 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
738 {
739         if (body->oa.o_valid & OBD_MD_FLGRANT) {
740                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
741                 __osc_update_grant(cli, body->oa.o_grant);
742         }
743 }
744
745 static int osc_shrink_grant_interpret(const struct lu_env *env,
746                                       struct ptlrpc_request *req,
747                                       void *aa, int rc)
748 {
749         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
750         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
751         struct ost_body *body;
752
753         if (rc != 0) {
754                 __osc_update_grant(cli, oa->o_grant);
755                 GOTO(out, rc);
756         }
757
758         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
759         LASSERT(body);
760         osc_update_grant(cli, body);
761 out:
762         OBDO_FREE(oa);
763         return rc;
764 }
765
766 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         oa->o_grant = cli->cl_avail_grant / 4;
770         cli->cl_avail_grant -= oa->o_grant;
771         spin_unlock(&cli->cl_loi_list_lock);
772         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
773                 oa->o_valid |= OBD_MD_FLFLAGS;
774                 oa->o_flags = 0;
775         }
776         oa->o_flags |= OBD_FL_SHRINK_GRANT;
777         osc_update_next_shrink(cli);
778 }
779
780 /* Shrink the current grant, either from some large amount to enough for a
781  * full set of in-flight RPCs, or if we have already shrunk to that limit
782  * then to enough for a single RPC.  This avoids keeping more grant than
783  * needed, and avoids shrinking the grant piecemeal. */
784 static int osc_shrink_grant(struct client_obd *cli)
785 {
786         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
787                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
788
789         spin_lock(&cli->cl_loi_list_lock);
790         if (cli->cl_avail_grant <= target_bytes)
791                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
792         spin_unlock(&cli->cl_loi_list_lock);
793
794         return osc_shrink_grant_to_target(cli, target_bytes);
795 }
796
797 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
798 {
799         int                     rc = 0;
800         struct ost_body        *body;
801         ENTRY;
802
803         spin_lock(&cli->cl_loi_list_lock);
804         /* Don't shrink if we are already above or below the desired limit
805          * We don't want to shrink below a single RPC, as that will negatively
806          * impact block allocation and long-term performance. */
807         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
808                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
809
810         if (target_bytes >= cli->cl_avail_grant) {
811                 spin_unlock(&cli->cl_loi_list_lock);
812                 RETURN(0);
813         }
814         spin_unlock(&cli->cl_loi_list_lock);
815
816         OBD_ALLOC_PTR(body);
817         if (!body)
818                 RETURN(-ENOMEM);
819
820         osc_announce_cached(cli, &body->oa, 0);
821
822         spin_lock(&cli->cl_loi_list_lock);
823         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
824         cli->cl_avail_grant = target_bytes;
825         spin_unlock(&cli->cl_loi_list_lock);
826         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
827                 body->oa.o_valid |= OBD_MD_FLFLAGS;
828                 body->oa.o_flags = 0;
829         }
830         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
831         osc_update_next_shrink(cli);
832
833         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
834                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
835                                 sizeof(*body), body, NULL);
836         if (rc != 0)
837                 __osc_update_grant(cli, body->oa.o_grant);
838         OBD_FREE_PTR(body);
839         RETURN(rc);
840 }
841
842 static int osc_should_shrink_grant(struct client_obd *client)
843 {
844         cfs_time_t time = cfs_time_current();
845         cfs_time_t next_shrink = client->cl_next_shrink_grant;
846
847         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
848              OBD_CONNECT_GRANT_SHRINK) == 0)
849                 return 0;
850
851         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
852                 /* Get the current RPC size directly, instead of going via:
853                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
854                  * Keep comment here so that it can be found by searching. */
855                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
856
857                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
858                     client->cl_avail_grant > brw_size)
859                         return 1;
860                 else
861                         osc_update_next_shrink(client);
862         }
863         return 0;
864 }
865
866 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
867 {
868         struct client_obd *client;
869
870         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
871                 if (osc_should_shrink_grant(client))
872                         osc_shrink_grant(client);
873         }
874         return 0;
875 }
876
877 static int osc_add_shrink_grant(struct client_obd *client)
878 {
879         int rc;
880
881         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
882                                        TIMEOUT_GRANT,
883                                        osc_grant_shrink_grant_cb, NULL,
884                                        &client->cl_grant_shrink_list);
885         if (rc) {
886                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
887                 return rc;
888         }
889         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
890         osc_update_next_shrink(client);
891         return 0;
892 }
893
894 static int osc_del_shrink_grant(struct client_obd *client)
895 {
896         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
897                                          TIMEOUT_GRANT);
898 }
899
900 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
901 {
902         /*
903          * ocd_grant is the total grant amount we're expect to hold: if we've
904          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
905          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
906          * dirty.
907          *
908          * race is tolerable here: if we're evicted, but imp_state already
909          * left EVICTED state, then cl_dirty_pages must be 0 already.
910          */
911         spin_lock(&cli->cl_loi_list_lock);
912         cli->cl_avail_grant = ocd->ocd_grant;
913         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
914                 cli->cl_avail_grant -= cli->cl_reserved_grant;
915                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
916                         cli->cl_avail_grant -= cli->cl_dirty_grant;
917                 else
918                         cli->cl_avail_grant -=
919                                         cli->cl_dirty_pages << PAGE_SHIFT;
920         }
921
922         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
923                 u64 size;
924                 int chunk_mask;
925
926                 /* overhead for each extent insertion */
927                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
928                 /* determine the appropriate chunk size used by osc_extent. */
929                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
930                                           ocd->ocd_grant_blkbits);
931                 /* max_pages_per_rpc must be chunk aligned */
932                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
933                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
934                                              ~chunk_mask) & chunk_mask;
935                 /* determine maximum extent size, in #pages */
936                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
937                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
938                 if (cli->cl_max_extent_pages == 0)
939                         cli->cl_max_extent_pages = 1;
940         } else {
941                 cli->cl_grant_extent_tax = 0;
942                 cli->cl_chunkbits = PAGE_SHIFT;
943                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
944         }
945         spin_unlock(&cli->cl_loi_list_lock);
946
947         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
948                 "chunk bits: %d cl_max_extent_pages: %d\n",
949                 cli_name(cli),
950                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
951                 cli->cl_max_extent_pages);
952
953         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
954             list_empty(&cli->cl_grant_shrink_list))
955                 osc_add_shrink_grant(cli);
956 }
957 EXPORT_SYMBOL(osc_init_grant);
958
959 /* We assume that the reason this OSC got a short read is because it read
960  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
961  * via the LOV, and it _knows_ it's reading inside the file, it's just that
962  * this stripe never got written at or beyond this stripe offset yet. */
963 static void handle_short_read(int nob_read, size_t page_count,
964                               struct brw_page **pga)
965 {
966         char *ptr;
967         int i = 0;
968
969         /* skip bytes read OK */
970         while (nob_read > 0) {
971                 LASSERT (page_count > 0);
972
973                 if (pga[i]->count > nob_read) {
974                         /* EOF inside this page */
975                         ptr = kmap(pga[i]->pg) +
976                                 (pga[i]->off & ~PAGE_MASK);
977                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
978                         kunmap(pga[i]->pg);
979                         page_count--;
980                         i++;
981                         break;
982                 }
983
984                 nob_read -= pga[i]->count;
985                 page_count--;
986                 i++;
987         }
988
989         /* zero remaining pages */
990         while (page_count-- > 0) {
991                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
992                 memset(ptr, 0, pga[i]->count);
993                 kunmap(pga[i]->pg);
994                 i++;
995         }
996 }
997
998 static int check_write_rcs(struct ptlrpc_request *req,
999                            int requested_nob, int niocount,
1000                            size_t page_count, struct brw_page **pga)
1001 {
1002         int     i;
1003         __u32   *remote_rcs;
1004
1005         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1006                                                   sizeof(*remote_rcs) *
1007                                                   niocount);
1008         if (remote_rcs == NULL) {
1009                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1010                 return(-EPROTO);
1011         }
1012
1013         /* return error if any niobuf was in error */
1014         for (i = 0; i < niocount; i++) {
1015                 if ((int)remote_rcs[i] < 0)
1016                         return(remote_rcs[i]);
1017
1018                 if (remote_rcs[i] != 0) {
1019                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1020                                 i, remote_rcs[i], req);
1021                         return(-EPROTO);
1022                 }
1023         }
1024
1025         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1026                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1027                        req->rq_bulk->bd_nob_transferred, requested_nob);
1028                 return(-EPROTO);
1029         }
1030
1031         return (0);
1032 }
1033
1034 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1035 {
1036         if (p1->flag != p2->flag) {
1037                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1038                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1039                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1040
1041                 /* warn if we try to combine flags that we don't know to be
1042                  * safe to combine */
1043                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1044                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1045                               "report this at https://jira.hpdd.intel.com/\n",
1046                               p1->flag, p2->flag);
1047                 }
1048                 return 0;
1049         }
1050
1051         return (p1->off + p1->count == p2->off);
1052 }
1053
1054 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1055                              struct brw_page **pga, int opc,
1056                              enum cksum_types cksum_type)
1057 {
1058         u32                             cksum;
1059         int                             i = 0;
1060         struct cfs_crypto_hash_desc     *hdesc;
1061         unsigned int                    bufsize;
1062         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1063
1064         LASSERT(pg_count > 0);
1065
1066         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1067         if (IS_ERR(hdesc)) {
1068                 CERROR("Unable to initialize checksum hash %s\n",
1069                        cfs_crypto_hash_name(cfs_alg));
1070                 return PTR_ERR(hdesc);
1071         }
1072
1073         while (nob > 0 && pg_count > 0) {
1074                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1075
1076                 /* corrupt the data before we compute the checksum, to
1077                  * simulate an OST->client data error */
1078                 if (i == 0 && opc == OST_READ &&
1079                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1080                         unsigned char *ptr = kmap(pga[i]->pg);
1081                         int off = pga[i]->off & ~PAGE_MASK;
1082
1083                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1084                         kunmap(pga[i]->pg);
1085                 }
1086                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1087                                             pga[i]->off & ~PAGE_MASK,
1088                                             count);
1089                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1090                                (int)(pga[i]->off & ~PAGE_MASK));
1091
1092                 nob -= pga[i]->count;
1093                 pg_count--;
1094                 i++;
1095         }
1096
1097         bufsize = sizeof(cksum);
1098         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1099
1100         /* For sending we only compute the wrong checksum instead
1101          * of corrupting the data so it is still correct on a redo */
1102         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1103                 cksum++;
1104
1105         return cksum;
1106 }
1107
1108 static int
1109 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1110                      u32 page_count, struct brw_page **pga,
1111                      struct ptlrpc_request **reqp, int resend)
1112 {
1113         struct ptlrpc_request   *req;
1114         struct ptlrpc_bulk_desc *desc;
1115         struct ost_body         *body;
1116         struct obd_ioobj        *ioobj;
1117         struct niobuf_remote    *niobuf;
1118         int niocount, i, requested_nob, opc, rc;
1119         struct osc_brw_async_args *aa;
1120         struct req_capsule      *pill;
1121         struct brw_page *pg_prev;
1122
1123         ENTRY;
1124         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1125                 RETURN(-ENOMEM); /* Recoverable */
1126         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1127                 RETURN(-EINVAL); /* Fatal */
1128
1129         if ((cmd & OBD_BRW_WRITE) != 0) {
1130                 opc = OST_WRITE;
1131                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1132                                                 osc_rq_pool,
1133                                                 &RQF_OST_BRW_WRITE);
1134         } else {
1135                 opc = OST_READ;
1136                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1137         }
1138         if (req == NULL)
1139                 RETURN(-ENOMEM);
1140
1141         for (niocount = i = 1; i < page_count; i++) {
1142                 if (!can_merge_pages(pga[i - 1], pga[i]))
1143                         niocount++;
1144         }
1145
1146         pill = &req->rq_pill;
1147         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1148                              sizeof(*ioobj));
1149         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1150                              niocount * sizeof(*niobuf));
1151
1152         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1153         if (rc) {
1154                 ptlrpc_request_free(req);
1155                 RETURN(rc);
1156         }
1157         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1158         ptlrpc_at_set_req_timeout(req);
1159         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1160          * retry logic */
1161         req->rq_no_retry_einprogress = 1;
1162
1163         desc = ptlrpc_prep_bulk_imp(req, page_count,
1164                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1165                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1166                         PTLRPC_BULK_PUT_SINK) |
1167                         PTLRPC_BULK_BUF_KIOV,
1168                 OST_BULK_PORTAL,
1169                 &ptlrpc_bulk_kiov_pin_ops);
1170
1171         if (desc == NULL)
1172                 GOTO(out, rc = -ENOMEM);
1173         /* NB request now owns desc and will free it when it gets freed */
1174
1175         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1176         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1177         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1178         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1179
1180         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1181
1182         obdo_to_ioobj(oa, ioobj);
1183         ioobj->ioo_bufcnt = niocount;
1184         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1185          * that might be send for this request.  The actual number is decided
1186          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1187          * "max - 1" for old client compatibility sending "0", and also so the
1188          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1189         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1190         LASSERT(page_count > 0);
1191         pg_prev = pga[0];
1192         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1193                 struct brw_page *pg = pga[i];
1194                 int poff = pg->off & ~PAGE_MASK;
1195
1196                 LASSERT(pg->count > 0);
1197                 /* make sure there is no gap in the middle of page array */
1198                 LASSERTF(page_count == 1 ||
1199                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1200                           ergo(i > 0 && i < page_count - 1,
1201                                poff == 0 && pg->count == PAGE_SIZE)   &&
1202                           ergo(i == page_count - 1, poff == 0)),
1203                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1204                          i, page_count, pg, pg->off, pg->count);
1205                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1206                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1207                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1208                          i, page_count,
1209                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1210                          pg_prev->pg, page_private(pg_prev->pg),
1211                          pg_prev->pg->index, pg_prev->off);
1212                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1213                         (pg->flag & OBD_BRW_SRVLOCK));
1214
1215                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1216                 requested_nob += pg->count;
1217
1218                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1219                         niobuf--;
1220                         niobuf->rnb_len += pg->count;
1221                 } else {
1222                         niobuf->rnb_offset = pg->off;
1223                         niobuf->rnb_len    = pg->count;
1224                         niobuf->rnb_flags  = pg->flag;
1225                 }
1226                 pg_prev = pg;
1227         }
1228
1229         LASSERTF((void *)(niobuf - niocount) ==
1230                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1231                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1232                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1233
1234         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1235         if (resend) {
1236                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1237                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1238                         body->oa.o_flags = 0;
1239                 }
1240                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1241         }
1242
1243         if (osc_should_shrink_grant(cli))
1244                 osc_shrink_grant_local(cli, &body->oa);
1245
1246         /* size[REQ_REC_OFF] still sizeof (*body) */
1247         if (opc == OST_WRITE) {
1248                 if (cli->cl_checksum &&
1249                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1250                         /* store cl_cksum_type in a local variable since
1251                          * it can be changed via lprocfs */
1252                         enum cksum_types cksum_type = cli->cl_cksum_type;
1253
1254                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1255                                 body->oa.o_flags = 0;
1256
1257                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1258                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1259                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1260                                                              page_count, pga,
1261                                                              OST_WRITE,
1262                                                              cksum_type);
1263                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1264                                body->oa.o_cksum);
1265                         /* save this in 'oa', too, for later checking */
1266                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1267                         oa->o_flags |= cksum_type_pack(cksum_type);
1268                 } else {
1269                         /* clear out the checksum flag, in case this is a
1270                          * resend but cl_checksum is no longer set. b=11238 */
1271                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1272                 }
1273                 oa->o_cksum = body->oa.o_cksum;
1274                 /* 1 RC per niobuf */
1275                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1276                                      sizeof(__u32) * niocount);
1277         } else {
1278                 if (cli->cl_checksum &&
1279                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1280                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1281                                 body->oa.o_flags = 0;
1282                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1283                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284                 }
1285
1286                 /* Client cksum has been already copied to wire obdo in previous
1287                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1288                  * resent due to cksum error, this will allow Server to
1289                  * check+dump pages on its side */
1290         }
1291         ptlrpc_request_set_replen(req);
1292
1293         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1294         aa = ptlrpc_req_async_args(req);
1295         aa->aa_oa = oa;
1296         aa->aa_requested_nob = requested_nob;
1297         aa->aa_nio_count = niocount;
1298         aa->aa_page_count = page_count;
1299         aa->aa_resends = 0;
1300         aa->aa_ppga = pga;
1301         aa->aa_cli = cli;
1302         INIT_LIST_HEAD(&aa->aa_oaps);
1303
1304         *reqp = req;
1305         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1307                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1308                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1309         RETURN(0);
1310
1311  out:
1312         ptlrpc_req_finished(req);
1313         RETURN(rc);
1314 }
1315
1316 char dbgcksum_file_name[PATH_MAX];
1317
1318 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1319                                 struct brw_page **pga, __u32 server_cksum,
1320                                 __u32 client_cksum)
1321 {
1322         struct file *filp;
1323         int rc, i;
1324         unsigned int len;
1325         char *buf;
1326         mm_segment_t oldfs;
1327
1328         /* will only keep dump of pages on first error for the same range in
1329          * file/fid, not during the resends/retries. */
1330         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1331                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1332                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1333                   libcfs_debug_file_path_arr :
1334                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1335                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1336                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1337                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1338                  pga[0]->off,
1339                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1340                  client_cksum, server_cksum);
1341         filp = filp_open(dbgcksum_file_name,
1342                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1343         if (IS_ERR(filp)) {
1344                 rc = PTR_ERR(filp);
1345                 if (rc == -EEXIST)
1346                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1347                                "checksum error: rc = %d\n", dbgcksum_file_name,
1348                                rc);
1349                 else
1350                         CERROR("%s: can't open to dump pages with checksum "
1351                                "error: rc = %d\n", dbgcksum_file_name, rc);
1352                 return;
1353         }
1354
1355         oldfs = get_fs();
1356         set_fs(KERNEL_DS);
1357         for (i = 0; i < page_count; i++) {
1358                 len = pga[i]->count;
1359                 buf = kmap(pga[i]->pg);
1360                 while (len != 0) {
1361                         rc = vfs_write(filp, (__force const char __user *)buf,
1362                                        len, &filp->f_pos);
1363                         if (rc < 0) {
1364                                 CERROR("%s: wanted to write %u but got %d "
1365                                        "error\n", dbgcksum_file_name, len, rc);
1366                                 break;
1367                         }
1368                         len -= rc;
1369                         buf += rc;
1370                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1371                                dbgcksum_file_name, rc);
1372                 }
1373                 kunmap(pga[i]->pg);
1374         }
1375         set_fs(oldfs);
1376
1377         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1378         if (rc)
1379                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1380         filp_close(filp, NULL);
1381         return;
1382 }
1383
1384 static int
1385 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1386                                 __u32 client_cksum, __u32 server_cksum,
1387                                 struct osc_brw_async_args *aa)
1388 {
1389         __u32 new_cksum;
1390         char *msg;
1391         enum cksum_types cksum_type;
1392
1393         if (server_cksum == client_cksum) {
1394                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1395                 return 0;
1396         }
1397
1398         if (aa->aa_cli->cl_checksum_dump)
1399                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1400                                     server_cksum, client_cksum);
1401
1402         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1403                                        oa->o_flags : 0);
1404         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1405                                       aa->aa_ppga, OST_WRITE, cksum_type);
1406
1407         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1408                 msg = "the server did not use the checksum type specified in "
1409                       "the original request - likely a protocol problem";
1410         else if (new_cksum == server_cksum)
1411                 msg = "changed on the client after we checksummed it - "
1412                       "likely false positive due to mmap IO (bug 11742)";
1413         else if (new_cksum == client_cksum)
1414                 msg = "changed in transit before arrival at OST";
1415         else
1416                 msg = "changed in transit AND doesn't match the original - "
1417                       "likely false positive due to mmap IO (bug 11742)";
1418
1419         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1420                            DFID " object "DOSTID" extent [%llu-%llu], original "
1421                            "client csum %x (type %x), server csum %x (type %x),"
1422                            " client csum now %x\n",
1423                            aa->aa_cli->cl_import->imp_obd->obd_name,
1424                            msg, libcfs_nid2str(peer->nid),
1425                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1426                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1427                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1428                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1429                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1430                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1431                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1432                            server_cksum, cksum_type, new_cksum);
1433         return 1;
1434 }
1435
1436 /* Note rc enters this function as number of bytes transferred */
1437 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1438 {
1439         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1440         const struct lnet_process_id *peer =
1441                         &req->rq_import->imp_connection->c_peer;
1442         struct client_obd *cli = aa->aa_cli;
1443         struct ost_body *body;
1444         u32 client_cksum = 0;
1445         ENTRY;
1446
1447         if (rc < 0 && rc != -EDQUOT) {
1448                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1449                 RETURN(rc);
1450         }
1451
1452         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1453         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1454         if (body == NULL) {
1455                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1456                 RETURN(-EPROTO);
1457         }
1458
1459         /* set/clear over quota flag for a uid/gid/projid */
1460         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1461             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1462                 unsigned qid[LL_MAXQUOTAS] = {
1463                                          body->oa.o_uid, body->oa.o_gid,
1464                                          body->oa.o_projid };
1465                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1466                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1467                        body->oa.o_valid, body->oa.o_flags);
1468                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1469                                        body->oa.o_flags);
1470         }
1471
1472         osc_update_grant(cli, body);
1473
1474         if (rc < 0)
1475                 RETURN(rc);
1476
1477         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1478                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1479
1480         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1481                 if (rc > 0) {
1482                         CERROR("Unexpected +ve rc %d\n", rc);
1483                         RETURN(-EPROTO);
1484                 }
1485                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1486
1487                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1488                         RETURN(-EAGAIN);
1489
1490                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1491                     check_write_checksum(&body->oa, peer, client_cksum,
1492                                          body->oa.o_cksum, aa))
1493                         RETURN(-EAGAIN);
1494
1495                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1496                                      aa->aa_page_count, aa->aa_ppga);
1497                 GOTO(out, rc);
1498         }
1499
1500         /* The rest of this function executes only for OST_READs */
1501
1502         /* if unwrap_bulk failed, return -EAGAIN to retry */
1503         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1504         if (rc < 0)
1505                 GOTO(out, rc = -EAGAIN);
1506
1507         if (rc > aa->aa_requested_nob) {
1508                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1509                        aa->aa_requested_nob);
1510                 RETURN(-EPROTO);
1511         }
1512
1513         if (rc != req->rq_bulk->bd_nob_transferred) {
1514                 CERROR ("Unexpected rc %d (%d transferred)\n",
1515                         rc, req->rq_bulk->bd_nob_transferred);
1516                 return (-EPROTO);
1517         }
1518
1519         if (rc < aa->aa_requested_nob)
1520                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1521
1522         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1523                 static int cksum_counter;
1524                 u32        server_cksum = body->oa.o_cksum;
1525                 char      *via = "";
1526                 char      *router = "";
1527                 enum cksum_types cksum_type;
1528
1529                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1530                                                body->oa.o_flags : 0);
1531                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1532                                                  aa->aa_ppga, OST_READ,
1533                                                  cksum_type);
1534
1535                 if (peer->nid != req->rq_bulk->bd_sender) {
1536                         via = " via ";
1537                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1538                 }
1539
1540                 if (server_cksum != client_cksum) {
1541                         struct ost_body *clbody;
1542                         u32 page_count = aa->aa_page_count;
1543
1544                         clbody = req_capsule_client_get(&req->rq_pill,
1545                                                         &RMF_OST_BODY);
1546                         if (cli->cl_checksum_dump)
1547                                 dump_all_bulk_pages(&clbody->oa, page_count,
1548                                                     aa->aa_ppga, server_cksum,
1549                                                     client_cksum);
1550
1551                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1552                                            "%s%s%s inode "DFID" object "DOSTID
1553                                            " extent [%llu-%llu], client %x, "
1554                                            "server %x, cksum_type %x\n",
1555                                            req->rq_import->imp_obd->obd_name,
1556                                            libcfs_nid2str(peer->nid),
1557                                            via, router,
1558                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1559                                                 clbody->oa.o_parent_seq : 0ULL,
1560                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1561                                                 clbody->oa.o_parent_oid : 0,
1562                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1563                                                 clbody->oa.o_parent_ver : 0,
1564                                            POSTID(&body->oa.o_oi),
1565                                            aa->aa_ppga[0]->off,
1566                                            aa->aa_ppga[page_count-1]->off +
1567                                            aa->aa_ppga[page_count-1]->count - 1,
1568                                            client_cksum, server_cksum,
1569                                            cksum_type);
1570                         cksum_counter = 0;
1571                         aa->aa_oa->o_cksum = client_cksum;
1572                         rc = -EAGAIN;
1573                 } else {
1574                         cksum_counter++;
1575                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1576                         rc = 0;
1577                 }
1578         } else if (unlikely(client_cksum)) {
1579                 static int cksum_missed;
1580
1581                 cksum_missed++;
1582                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1583                         CERROR("Checksum %u requested from %s but not sent\n",
1584                                cksum_missed, libcfs_nid2str(peer->nid));
1585         } else {
1586                 rc = 0;
1587         }
1588 out:
1589         if (rc >= 0)
1590                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1591                                      aa->aa_oa, &body->oa);
1592
1593         RETURN(rc);
1594 }
1595
1596 static int osc_brw_redo_request(struct ptlrpc_request *request,
1597                                 struct osc_brw_async_args *aa, int rc)
1598 {
1599         struct ptlrpc_request *new_req;
1600         struct osc_brw_async_args *new_aa;
1601         struct osc_async_page *oap;
1602         ENTRY;
1603
1604         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1605                   "redo for recoverable error %d", rc);
1606
1607         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1608                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1609                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1610                                   aa->aa_ppga, &new_req, 1);
1611         if (rc)
1612                 RETURN(rc);
1613
1614         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1615                 if (oap->oap_request != NULL) {
1616                         LASSERTF(request == oap->oap_request,
1617                                  "request %p != oap_request %p\n",
1618                                  request, oap->oap_request);
1619                         if (oap->oap_interrupted) {
1620                                 ptlrpc_req_finished(new_req);
1621                                 RETURN(-EINTR);
1622                         }
1623                 }
1624         }
1625         /* New request takes over pga and oaps from old request.
1626          * Note that copying a list_head doesn't work, need to move it... */
1627         aa->aa_resends++;
1628         new_req->rq_interpret_reply = request->rq_interpret_reply;
1629         new_req->rq_async_args = request->rq_async_args;
1630         new_req->rq_commit_cb = request->rq_commit_cb;
1631         /* cap resend delay to the current request timeout, this is similar to
1632          * what ptlrpc does (see after_reply()) */
1633         if (aa->aa_resends > new_req->rq_timeout)
1634                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1635         else
1636                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1637         new_req->rq_generation_set = 1;
1638         new_req->rq_import_generation = request->rq_import_generation;
1639
1640         new_aa = ptlrpc_req_async_args(new_req);
1641
1642         INIT_LIST_HEAD(&new_aa->aa_oaps);
1643         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1644         INIT_LIST_HEAD(&new_aa->aa_exts);
1645         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1646         new_aa->aa_resends = aa->aa_resends;
1647
1648         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1649                 if (oap->oap_request) {
1650                         ptlrpc_req_finished(oap->oap_request);
1651                         oap->oap_request = ptlrpc_request_addref(new_req);
1652                 }
1653         }
1654
1655         /* XXX: This code will run into problem if we're going to support
1656          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1657          * and wait for all of them to be finished. We should inherit request
1658          * set from old request. */
1659         ptlrpcd_add_req(new_req);
1660
1661         DEBUG_REQ(D_INFO, new_req, "new request");
1662         RETURN(0);
1663 }
1664
1665 /*
1666  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1667  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1668  * fine for our small page arrays and doesn't require allocation.  its an
1669  * insertion sort that swaps elements that are strides apart, shrinking the
1670  * stride down until its '1' and the array is sorted.
1671  */
1672 static void sort_brw_pages(struct brw_page **array, int num)
1673 {
1674         int stride, i, j;
1675         struct brw_page *tmp;
1676
1677         if (num == 1)
1678                 return;
1679         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1680                 ;
1681
1682         do {
1683                 stride /= 3;
1684                 for (i = stride ; i < num ; i++) {
1685                         tmp = array[i];
1686                         j = i;
1687                         while (j >= stride && array[j - stride]->off > tmp->off) {
1688                                 array[j] = array[j - stride];
1689                                 j -= stride;
1690                         }
1691                         array[j] = tmp;
1692                 }
1693         } while (stride > 1);
1694 }
1695
1696 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1697 {
1698         LASSERT(ppga != NULL);
1699         OBD_FREE(ppga, sizeof(*ppga) * count);
1700 }
1701
1702 static int brw_interpret(const struct lu_env *env,
1703                          struct ptlrpc_request *req, void *data, int rc)
1704 {
1705         struct osc_brw_async_args *aa = data;
1706         struct osc_extent *ext;
1707         struct osc_extent *tmp;
1708         struct client_obd *cli = aa->aa_cli;
1709         ENTRY;
1710
1711         rc = osc_brw_fini_request(req, rc);
1712         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1713         /* When server return -EINPROGRESS, client should always retry
1714          * regardless of the number of times the bulk was resent already. */
1715         if (osc_recoverable_error(rc)) {
1716                 if (req->rq_import_generation !=
1717                     req->rq_import->imp_generation) {
1718                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1719                                ""DOSTID", rc = %d.\n",
1720                                req->rq_import->imp_obd->obd_name,
1721                                POSTID(&aa->aa_oa->o_oi), rc);
1722                 } else if (rc == -EINPROGRESS ||
1723                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1724                         rc = osc_brw_redo_request(req, aa, rc);
1725                 } else {
1726                         CERROR("%s: too many resent retries for object: "
1727                                "%llu:%llu, rc = %d.\n",
1728                                req->rq_import->imp_obd->obd_name,
1729                                POSTID(&aa->aa_oa->o_oi), rc);
1730                 }
1731
1732                 if (rc == 0)
1733                         RETURN(0);
1734                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1735                         rc = -EIO;
1736         }
1737
1738         if (rc == 0) {
1739                 struct obdo *oa = aa->aa_oa;
1740                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1741                 unsigned long valid = 0;
1742                 struct cl_object *obj;
1743                 struct osc_async_page *last;
1744
1745                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1746                 obj = osc2cl(last->oap_obj);
1747
1748                 cl_object_attr_lock(obj);
1749                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1750                         attr->cat_blocks = oa->o_blocks;
1751                         valid |= CAT_BLOCKS;
1752                 }
1753                 if (oa->o_valid & OBD_MD_FLMTIME) {
1754                         attr->cat_mtime = oa->o_mtime;
1755                         valid |= CAT_MTIME;
1756                 }
1757                 if (oa->o_valid & OBD_MD_FLATIME) {
1758                         attr->cat_atime = oa->o_atime;
1759                         valid |= CAT_ATIME;
1760                 }
1761                 if (oa->o_valid & OBD_MD_FLCTIME) {
1762                         attr->cat_ctime = oa->o_ctime;
1763                         valid |= CAT_CTIME;
1764                 }
1765
1766                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1767                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1768                         loff_t last_off = last->oap_count + last->oap_obj_off +
1769                                 last->oap_page_off;
1770
1771                         /* Change file size if this is an out of quota or
1772                          * direct IO write and it extends the file size */
1773                         if (loi->loi_lvb.lvb_size < last_off) {
1774                                 attr->cat_size = last_off;
1775                                 valid |= CAT_SIZE;
1776                         }
1777                         /* Extend KMS if it's not a lockless write */
1778                         if (loi->loi_kms < last_off &&
1779                             oap2osc_page(last)->ops_srvlock == 0) {
1780                                 attr->cat_kms = last_off;
1781                                 valid |= CAT_KMS;
1782                         }
1783                 }
1784
1785                 if (valid != 0)
1786                         cl_object_attr_update(env, obj, attr, valid);
1787                 cl_object_attr_unlock(obj);
1788         }
1789         OBDO_FREE(aa->aa_oa);
1790
1791         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1792                 osc_inc_unstable_pages(req);
1793
1794         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1795                 list_del_init(&ext->oe_link);
1796                 osc_extent_finish(env, ext, 1, rc);
1797         }
1798         LASSERT(list_empty(&aa->aa_exts));
1799         LASSERT(list_empty(&aa->aa_oaps));
1800
1801         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1802         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1803
1804         spin_lock(&cli->cl_loi_list_lock);
1805         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1806          * is called so we know whether to go to sync BRWs or wait for more
1807          * RPCs to complete */
1808         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1809                 cli->cl_w_in_flight--;
1810         else
1811                 cli->cl_r_in_flight--;
1812         osc_wake_cache_waiters(cli);
1813         spin_unlock(&cli->cl_loi_list_lock);
1814
1815         osc_io_unplug(env, cli, NULL);
1816         RETURN(rc);
1817 }
1818
1819 static void brw_commit(struct ptlrpc_request *req)
1820 {
1821         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1822          * this called via the rq_commit_cb, I need to ensure
1823          * osc_dec_unstable_pages is still called. Otherwise unstable
1824          * pages may be leaked. */
1825         spin_lock(&req->rq_lock);
1826         if (likely(req->rq_unstable)) {
1827                 req->rq_unstable = 0;
1828                 spin_unlock(&req->rq_lock);
1829
1830                 osc_dec_unstable_pages(req);
1831         } else {
1832                 req->rq_committed = 1;
1833                 spin_unlock(&req->rq_lock);
1834         }
1835 }
1836
1837 /**
1838  * Build an RPC by the list of extent @ext_list. The caller must ensure
1839  * that the total pages in this list are NOT over max pages per RPC.
1840  * Extents in the list must be in OES_RPC state.
1841  */
1842 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1843                   struct list_head *ext_list, int cmd)
1844 {
1845         struct ptlrpc_request           *req = NULL;
1846         struct osc_extent               *ext;
1847         struct brw_page                 **pga = NULL;
1848         struct osc_brw_async_args       *aa = NULL;
1849         struct obdo                     *oa = NULL;
1850         struct osc_async_page           *oap;
1851         struct osc_object               *obj = NULL;
1852         struct cl_req_attr              *crattr = NULL;
1853         loff_t                          starting_offset = OBD_OBJECT_EOF;
1854         loff_t                          ending_offset = 0;
1855         int                             mpflag = 0;
1856         int                             mem_tight = 0;
1857         int                             page_count = 0;
1858         bool                            soft_sync = false;
1859         bool                            interrupted = false;
1860         int                             i;
1861         int                             grant = 0;
1862         int                             rc;
1863         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1864         struct ost_body                 *body;
1865         ENTRY;
1866         LASSERT(!list_empty(ext_list));
1867
1868         /* add pages into rpc_list to build BRW rpc */
1869         list_for_each_entry(ext, ext_list, oe_link) {
1870                 LASSERT(ext->oe_state == OES_RPC);
1871                 mem_tight |= ext->oe_memalloc;
1872                 grant += ext->oe_grants;
1873                 page_count += ext->oe_nr_pages;
1874                 if (obj == NULL)
1875                         obj = ext->oe_obj;
1876         }
1877
1878         soft_sync = osc_over_unstable_soft_limit(cli);
1879         if (mem_tight)
1880                 mpflag = cfs_memory_pressure_get_and_set();
1881
1882         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1883         if (pga == NULL)
1884                 GOTO(out, rc = -ENOMEM);
1885
1886         OBDO_ALLOC(oa);
1887         if (oa == NULL)
1888                 GOTO(out, rc = -ENOMEM);
1889
1890         i = 0;
1891         list_for_each_entry(ext, ext_list, oe_link) {
1892                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1893                         if (mem_tight)
1894                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1895                         if (soft_sync)
1896                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1897                         pga[i] = &oap->oap_brw_page;
1898                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1899                         i++;
1900
1901                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1902                         if (starting_offset == OBD_OBJECT_EOF ||
1903                             starting_offset > oap->oap_obj_off)
1904                                 starting_offset = oap->oap_obj_off;
1905                         else
1906                                 LASSERT(oap->oap_page_off == 0);
1907                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1908                                 ending_offset = oap->oap_obj_off +
1909                                                 oap->oap_count;
1910                         else
1911                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1912                                         PAGE_SIZE);
1913                         if (oap->oap_interrupted)
1914                                 interrupted = true;
1915                 }
1916         }
1917
1918         /* first page in the list */
1919         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1920
1921         crattr = &osc_env_info(env)->oti_req_attr;
1922         memset(crattr, 0, sizeof(*crattr));
1923         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1924         crattr->cra_flags = ~0ULL;
1925         crattr->cra_page = oap2cl_page(oap);
1926         crattr->cra_oa = oa;
1927         cl_req_attr_set(env, osc2cl(obj), crattr);
1928
1929         if (cmd == OBD_BRW_WRITE)
1930                 oa->o_grant_used = grant;
1931
1932         sort_brw_pages(pga, page_count);
1933         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1934         if (rc != 0) {
1935                 CERROR("prep_req failed: %d\n", rc);
1936                 GOTO(out, rc);
1937         }
1938
1939         req->rq_commit_cb = brw_commit;
1940         req->rq_interpret_reply = brw_interpret;
1941         req->rq_memalloc = mem_tight != 0;
1942         oap->oap_request = ptlrpc_request_addref(req);
1943         if (interrupted && !req->rq_intr)
1944                 ptlrpc_mark_interrupted(req);
1945
1946         /* Need to update the timestamps after the request is built in case
1947          * we race with setattr (locally or in queue at OST).  If OST gets
1948          * later setattr before earlier BRW (as determined by the request xid),
1949          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1950          * way to do this in a single call.  bug 10150 */
1951         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1952         crattr->cra_oa = &body->oa;
1953         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1954         cl_req_attr_set(env, osc2cl(obj), crattr);
1955         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1956
1957         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1958         aa = ptlrpc_req_async_args(req);
1959         INIT_LIST_HEAD(&aa->aa_oaps);
1960         list_splice_init(&rpc_list, &aa->aa_oaps);
1961         INIT_LIST_HEAD(&aa->aa_exts);
1962         list_splice_init(ext_list, &aa->aa_exts);
1963
1964         spin_lock(&cli->cl_loi_list_lock);
1965         starting_offset >>= PAGE_SHIFT;
1966         if (cmd == OBD_BRW_READ) {
1967                 cli->cl_r_in_flight++;
1968                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1969                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1970                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1971                                       starting_offset + 1);
1972         } else {
1973                 cli->cl_w_in_flight++;
1974                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1975                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1976                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1977                                       starting_offset + 1);
1978         }
1979         spin_unlock(&cli->cl_loi_list_lock);
1980
1981         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1982                   page_count, aa, cli->cl_r_in_flight,
1983                   cli->cl_w_in_flight);
1984         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1985
1986         ptlrpcd_add_req(req);
1987         rc = 0;
1988         EXIT;
1989
1990 out:
1991         if (mem_tight != 0)
1992                 cfs_memory_pressure_restore(mpflag);
1993
1994         if (rc != 0) {
1995                 LASSERT(req == NULL);
1996
1997                 if (oa)
1998                         OBDO_FREE(oa);
1999                 if (pga)
2000                         OBD_FREE(pga, sizeof(*pga) * page_count);
2001                 /* this should happen rarely and is pretty bad, it makes the
2002                  * pending list not follow the dirty order */
2003                 while (!list_empty(ext_list)) {
2004                         ext = list_entry(ext_list->next, struct osc_extent,
2005                                          oe_link);
2006                         list_del_init(&ext->oe_link);
2007                         osc_extent_finish(env, ext, 0, rc);
2008                 }
2009         }
2010         RETURN(rc);
2011 }
2012
2013 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2014 {
2015         int set = 0;
2016
2017         LASSERT(lock != NULL);
2018
2019         lock_res_and_lock(lock);
2020
2021         if (lock->l_ast_data == NULL)
2022                 lock->l_ast_data = data;
2023         if (lock->l_ast_data == data)
2024                 set = 1;
2025
2026         unlock_res_and_lock(lock);
2027
2028         return set;
2029 }
2030
2031 static int osc_enqueue_fini(struct ptlrpc_request *req,
2032                             osc_enqueue_upcall_f upcall, void *cookie,
2033                             struct lustre_handle *lockh, enum ldlm_mode mode,
2034                             __u64 *flags, bool speculative, int errcode)
2035 {
2036         bool intent = *flags & LDLM_FL_HAS_INTENT;
2037         int rc;
2038         ENTRY;
2039
2040         /* The request was created before ldlm_cli_enqueue call. */
2041         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2042                 struct ldlm_reply *rep;
2043
2044                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2045                 LASSERT(rep != NULL);
2046
2047                 rep->lock_policy_res1 =
2048                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2049                 if (rep->lock_policy_res1)
2050                         errcode = rep->lock_policy_res1;
2051                 if (!speculative)
2052                         *flags |= LDLM_FL_LVB_READY;
2053         } else if (errcode == ELDLM_OK) {
2054                 *flags |= LDLM_FL_LVB_READY;
2055         }
2056
2057         /* Call the update callback. */
2058         rc = (*upcall)(cookie, lockh, errcode);
2059
2060         /* release the reference taken in ldlm_cli_enqueue() */
2061         if (errcode == ELDLM_LOCK_MATCHED)
2062                 errcode = ELDLM_OK;
2063         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2064                 ldlm_lock_decref(lockh, mode);
2065
2066         RETURN(rc);
2067 }
2068
2069 static int osc_enqueue_interpret(const struct lu_env *env,
2070                                  struct ptlrpc_request *req,
2071                                  struct osc_enqueue_args *aa, int rc)
2072 {
2073         struct ldlm_lock *lock;
2074         struct lustre_handle *lockh = &aa->oa_lockh;
2075         enum ldlm_mode mode = aa->oa_mode;
2076         struct ost_lvb *lvb = aa->oa_lvb;
2077         __u32 lvb_len = sizeof(*lvb);
2078         __u64 flags = 0;
2079
2080         ENTRY;
2081
2082         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2083          * be valid. */
2084         lock = ldlm_handle2lock(lockh);
2085         LASSERTF(lock != NULL,
2086                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2087                  lockh->cookie, req, aa);
2088
2089         /* Take an additional reference so that a blocking AST that
2090          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2091          * to arrive after an upcall has been executed by
2092          * osc_enqueue_fini(). */
2093         ldlm_lock_addref(lockh, mode);
2094
2095         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2096         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2097
2098         /* Let CP AST to grant the lock first. */
2099         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2100
2101         if (aa->oa_speculative) {
2102                 LASSERT(aa->oa_lvb == NULL);
2103                 LASSERT(aa->oa_flags == NULL);
2104                 aa->oa_flags = &flags;
2105         }
2106
2107         /* Complete obtaining the lock procedure. */
2108         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2109                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2110                                    lockh, rc);
2111         /* Complete osc stuff. */
2112         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2113                               aa->oa_flags, aa->oa_speculative, rc);
2114
2115         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2116
2117         ldlm_lock_decref(lockh, mode);
2118         LDLM_LOCK_PUT(lock);
2119         RETURN(rc);
2120 }
2121
2122 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2123
2124 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2125  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2126  * other synchronous requests, however keeping some locks and trying to obtain
2127  * others may take a considerable amount of time in a case of ost failure; and
2128  * when other sync requests do not get released lock from a client, the client
2129  * is evicted from the cluster -- such scenarious make the life difficult, so
2130  * release locks just after they are obtained. */
2131 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2132                      __u64 *flags, union ldlm_policy_data *policy,
2133                      struct ost_lvb *lvb, int kms_valid,
2134                      osc_enqueue_upcall_f upcall, void *cookie,
2135                      struct ldlm_enqueue_info *einfo,
2136                      struct ptlrpc_request_set *rqset, int async,
2137                      bool speculative)
2138 {
2139         struct obd_device *obd = exp->exp_obd;
2140         struct lustre_handle lockh = { 0 };
2141         struct ptlrpc_request *req = NULL;
2142         int intent = *flags & LDLM_FL_HAS_INTENT;
2143         __u64 match_flags = *flags;
2144         enum ldlm_mode mode;
2145         int rc;
2146         ENTRY;
2147
2148         /* Filesystem lock extents are extended to page boundaries so that
2149          * dealing with the page cache is a little smoother.  */
2150         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2151         policy->l_extent.end |= ~PAGE_MASK;
2152
2153         /*
2154          * kms is not valid when either object is completely fresh (so that no
2155          * locks are cached), or object was evicted. In the latter case cached
2156          * lock cannot be used, because it would prime inode state with
2157          * potentially stale LVB.
2158          */
2159         if (!kms_valid)
2160                 goto no_match;
2161
2162         /* Next, search for already existing extent locks that will cover us */
2163         /* If we're trying to read, we also search for an existing PW lock.  The
2164          * VFS and page cache already protect us locally, so lots of readers/
2165          * writers can share a single PW lock.
2166          *
2167          * There are problems with conversion deadlocks, so instead of
2168          * converting a read lock to a write lock, we'll just enqueue a new
2169          * one.
2170          *
2171          * At some point we should cancel the read lock instead of making them
2172          * send us a blocking callback, but there are problems with canceling
2173          * locks out from other users right now, too. */
2174         mode = einfo->ei_mode;
2175         if (einfo->ei_mode == LCK_PR)
2176                 mode |= LCK_PW;
2177         /* Normal lock requests must wait for the LVB to be ready before
2178          * matching a lock; speculative lock requests do not need to,
2179          * because they will not actually use the lock. */
2180         if (!speculative)
2181                 match_flags |= LDLM_FL_LVB_READY;
2182         if (intent != 0)
2183                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2184         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2185                                einfo->ei_type, policy, mode, &lockh, 0);
2186         if (mode) {
2187                 struct ldlm_lock *matched;
2188
2189                 if (*flags & LDLM_FL_TEST_LOCK)
2190                         RETURN(ELDLM_OK);
2191
2192                 matched = ldlm_handle2lock(&lockh);
2193                 if (speculative) {
2194                         /* This DLM lock request is speculative, and does not
2195                          * have an associated IO request. Therefore if there
2196                          * is already a DLM lock, it wll just inform the
2197                          * caller to cancel the request for this stripe.*/
2198                         lock_res_and_lock(matched);
2199                         if (ldlm_extent_equal(&policy->l_extent,
2200                             &matched->l_policy_data.l_extent))
2201                                 rc = -EEXIST;
2202                         else
2203                                 rc = -ECANCELED;
2204                         unlock_res_and_lock(matched);
2205
2206                         ldlm_lock_decref(&lockh, mode);
2207                         LDLM_LOCK_PUT(matched);
2208                         RETURN(rc);
2209                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2210                         *flags |= LDLM_FL_LVB_READY;
2211
2212                         /* We already have a lock, and it's referenced. */
2213                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2214
2215                         ldlm_lock_decref(&lockh, mode);
2216                         LDLM_LOCK_PUT(matched);
2217                         RETURN(ELDLM_OK);
2218                 } else {
2219                         ldlm_lock_decref(&lockh, mode);
2220                         LDLM_LOCK_PUT(matched);
2221                 }
2222         }
2223
2224 no_match:
2225         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2226                 RETURN(-ENOLCK);
2227
2228         if (intent) {
2229                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2230                                            &RQF_LDLM_ENQUEUE_LVB);
2231                 if (req == NULL)
2232                         RETURN(-ENOMEM);
2233
2234                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2235                 if (rc) {
2236                         ptlrpc_request_free(req);
2237                         RETURN(rc);
2238                 }
2239
2240                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2241                                      sizeof *lvb);
2242                 ptlrpc_request_set_replen(req);
2243         }
2244
2245         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2246         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2247
2248         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2249                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2250         if (async) {
2251                 if (!rc) {
2252                         struct osc_enqueue_args *aa;
2253                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2254                         aa = ptlrpc_req_async_args(req);
2255                         aa->oa_exp         = exp;
2256                         aa->oa_mode        = einfo->ei_mode;
2257                         aa->oa_type        = einfo->ei_type;
2258                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2259                         aa->oa_upcall      = upcall;
2260                         aa->oa_cookie      = cookie;
2261                         aa->oa_speculative = speculative;
2262                         if (!speculative) {
2263                                 aa->oa_flags  = flags;
2264                                 aa->oa_lvb    = lvb;
2265                         } else {
2266                                 /* speculative locks are essentially to enqueue
2267                                  * a DLM lock  in advance, so we don't care
2268                                  * about the result of the enqueue. */
2269                                 aa->oa_lvb    = NULL;
2270                                 aa->oa_flags  = NULL;
2271                         }
2272
2273                         req->rq_interpret_reply =
2274                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2275                         if (rqset == PTLRPCD_SET)
2276                                 ptlrpcd_add_req(req);
2277                         else
2278                                 ptlrpc_set_add_req(rqset, req);
2279                 } else if (intent) {
2280                         ptlrpc_req_finished(req);
2281                 }
2282                 RETURN(rc);
2283         }
2284
2285         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2286                               flags, speculative, rc);
2287         if (intent)
2288                 ptlrpc_req_finished(req);
2289
2290         RETURN(rc);
2291 }
2292
2293 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2294                    enum ldlm_type type, union ldlm_policy_data *policy,
2295                    enum ldlm_mode mode, __u64 *flags, void *data,
2296                    struct lustre_handle *lockh, int unref)
2297 {
2298         struct obd_device *obd = exp->exp_obd;
2299         __u64 lflags = *flags;
2300         enum ldlm_mode rc;
2301         ENTRY;
2302
2303         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2304                 RETURN(-EIO);
2305
2306         /* Filesystem lock extents are extended to page boundaries so that
2307          * dealing with the page cache is a little smoother */
2308         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2309         policy->l_extent.end |= ~PAGE_MASK;
2310
2311         /* Next, search for already existing extent locks that will cover us */
2312         /* If we're trying to read, we also search for an existing PW lock.  The
2313          * VFS and page cache already protect us locally, so lots of readers/
2314          * writers can share a single PW lock. */
2315         rc = mode;
2316         if (mode == LCK_PR)
2317                 rc |= LCK_PW;
2318         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2319                              res_id, type, policy, rc, lockh, unref);
2320         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2321                 RETURN(rc);
2322
2323         if (data != NULL) {
2324                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2325
2326                 LASSERT(lock != NULL);
2327                 if (!osc_set_lock_data(lock, data)) {
2328                         ldlm_lock_decref(lockh, rc);
2329                         rc = 0;
2330                 }
2331                 LDLM_LOCK_PUT(lock);
2332         }
2333         RETURN(rc);
2334 }
2335
2336 static int osc_statfs_interpret(const struct lu_env *env,
2337                                 struct ptlrpc_request *req,
2338                                 struct osc_async_args *aa, int rc)
2339 {
2340         struct obd_statfs *msfs;
2341         ENTRY;
2342
2343         if (rc == -EBADR)
2344                 /* The request has in fact never been sent
2345                  * due to issues at a higher level (LOV).
2346                  * Exit immediately since the caller is
2347                  * aware of the problem and takes care
2348                  * of the clean up */
2349                  RETURN(rc);
2350
2351         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2352             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2353                 GOTO(out, rc = 0);
2354
2355         if (rc != 0)
2356                 GOTO(out, rc);
2357
2358         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2359         if (msfs == NULL) {
2360                 GOTO(out, rc = -EPROTO);
2361         }
2362
2363         *aa->aa_oi->oi_osfs = *msfs;
2364 out:
2365         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2366         RETURN(rc);
2367 }
2368
2369 static int osc_statfs_async(struct obd_export *exp,
2370                             struct obd_info *oinfo, __u64 max_age,
2371                             struct ptlrpc_request_set *rqset)
2372 {
2373         struct obd_device     *obd = class_exp2obd(exp);
2374         struct ptlrpc_request *req;
2375         struct osc_async_args *aa;
2376         int                    rc;
2377         ENTRY;
2378
2379         /* We could possibly pass max_age in the request (as an absolute
2380          * timestamp or a "seconds.usec ago") so the target can avoid doing
2381          * extra calls into the filesystem if that isn't necessary (e.g.
2382          * during mount that would help a bit).  Having relative timestamps
2383          * is not so great if request processing is slow, while absolute
2384          * timestamps are not ideal because they need time synchronization. */
2385         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2386         if (req == NULL)
2387                 RETURN(-ENOMEM);
2388
2389         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2390         if (rc) {
2391                 ptlrpc_request_free(req);
2392                 RETURN(rc);
2393         }
2394         ptlrpc_request_set_replen(req);
2395         req->rq_request_portal = OST_CREATE_PORTAL;
2396         ptlrpc_at_set_req_timeout(req);
2397
2398         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2399                 /* procfs requests not want stat in wait for avoid deadlock */
2400                 req->rq_no_resend = 1;
2401                 req->rq_no_delay = 1;
2402         }
2403
2404         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2405         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2406         aa = ptlrpc_req_async_args(req);
2407         aa->aa_oi = oinfo;
2408
2409         ptlrpc_set_add_req(rqset, req);
2410         RETURN(0);
2411 }
2412
2413 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2414                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2415 {
2416         struct obd_device     *obd = class_exp2obd(exp);
2417         struct obd_statfs     *msfs;
2418         struct ptlrpc_request *req;
2419         struct obd_import     *imp = NULL;
2420         int rc;
2421         ENTRY;
2422
2423         /*Since the request might also come from lprocfs, so we need
2424          *sync this with client_disconnect_export Bug15684*/
2425         down_read(&obd->u.cli.cl_sem);
2426         if (obd->u.cli.cl_import)
2427                 imp = class_import_get(obd->u.cli.cl_import);
2428         up_read(&obd->u.cli.cl_sem);
2429         if (!imp)
2430                 RETURN(-ENODEV);
2431
2432         /* We could possibly pass max_age in the request (as an absolute
2433          * timestamp or a "seconds.usec ago") so the target can avoid doing
2434          * extra calls into the filesystem if that isn't necessary (e.g.
2435          * during mount that would help a bit).  Having relative timestamps
2436          * is not so great if request processing is slow, while absolute
2437          * timestamps are not ideal because they need time synchronization. */
2438         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2439
2440         class_import_put(imp);
2441
2442         if (req == NULL)
2443                 RETURN(-ENOMEM);
2444
2445         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2446         if (rc) {
2447                 ptlrpc_request_free(req);
2448                 RETURN(rc);
2449         }
2450         ptlrpc_request_set_replen(req);
2451         req->rq_request_portal = OST_CREATE_PORTAL;
2452         ptlrpc_at_set_req_timeout(req);
2453
2454         if (flags & OBD_STATFS_NODELAY) {
2455                 /* procfs requests not want stat in wait for avoid deadlock */
2456                 req->rq_no_resend = 1;
2457                 req->rq_no_delay = 1;
2458         }
2459
2460         rc = ptlrpc_queue_wait(req);
2461         if (rc)
2462                 GOTO(out, rc);
2463
2464         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2465         if (msfs == NULL) {
2466                 GOTO(out, rc = -EPROTO);
2467         }
2468
2469         *osfs = *msfs;
2470
2471         EXIT;
2472  out:
2473         ptlrpc_req_finished(req);
2474         return rc;
2475 }
2476
2477 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2478                          void *karg, void __user *uarg)
2479 {
2480         struct obd_device *obd = exp->exp_obd;
2481         struct obd_ioctl_data *data = karg;
2482         int err = 0;
2483         ENTRY;
2484
2485         if (!try_module_get(THIS_MODULE)) {
2486                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2487                        module_name(THIS_MODULE));
2488                 return -EINVAL;
2489         }
2490         switch (cmd) {
2491         case OBD_IOC_CLIENT_RECOVER:
2492                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2493                                             data->ioc_inlbuf1, 0);
2494                 if (err > 0)
2495                         err = 0;
2496                 GOTO(out, err);
2497         case IOC_OSC_SET_ACTIVE:
2498                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2499                                                data->ioc_offset);
2500                 GOTO(out, err);
2501         case OBD_IOC_PING_TARGET:
2502                 err = ptlrpc_obd_ping(obd);
2503                 GOTO(out, err);
2504         default:
2505                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2506                        cmd, current_comm());
2507                 GOTO(out, err = -ENOTTY);
2508         }
2509 out:
2510         module_put(THIS_MODULE);
2511         return err;
2512 }
2513
2514 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2515                        u32 keylen, void *key, u32 vallen, void *val,
2516                        struct ptlrpc_request_set *set)
2517 {
2518         struct ptlrpc_request *req;
2519         struct obd_device     *obd = exp->exp_obd;
2520         struct obd_import     *imp = class_exp2cliimp(exp);
2521         char                  *tmp;
2522         int                    rc;
2523         ENTRY;
2524
2525         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2526
2527         if (KEY_IS(KEY_CHECKSUM)) {
2528                 if (vallen != sizeof(int))
2529                         RETURN(-EINVAL);
2530                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2531                 RETURN(0);
2532         }
2533
2534         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2535                 sptlrpc_conf_client_adapt(obd);
2536                 RETURN(0);
2537         }
2538
2539         if (KEY_IS(KEY_FLUSH_CTX)) {
2540                 sptlrpc_import_flush_my_ctx(imp);
2541                 RETURN(0);
2542         }
2543
2544         if (KEY_IS(KEY_CACHE_SET)) {
2545                 struct client_obd *cli = &obd->u.cli;
2546
2547                 LASSERT(cli->cl_cache == NULL); /* only once */
2548                 cli->cl_cache = (struct cl_client_cache *)val;
2549                 cl_cache_incref(cli->cl_cache);
2550                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2551
2552                 /* add this osc into entity list */
2553                 LASSERT(list_empty(&cli->cl_lru_osc));
2554                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2555                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2556                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2557
2558                 RETURN(0);
2559         }
2560
2561         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2562                 struct client_obd *cli = &obd->u.cli;
2563                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2564                 long target = *(long *)val;
2565
2566                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2567                 *(long *)val -= nr;
2568                 RETURN(0);
2569         }
2570
2571         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2572                 RETURN(-EINVAL);
2573
2574         /* We pass all other commands directly to OST. Since nobody calls osc
2575            methods directly and everybody is supposed to go through LOV, we
2576            assume lov checked invalid values for us.
2577            The only recognised values so far are evict_by_nid and mds_conn.
2578            Even if something bad goes through, we'd get a -EINVAL from OST
2579            anyway. */
2580
2581         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2582                                                 &RQF_OST_SET_GRANT_INFO :
2583                                                 &RQF_OBD_SET_INFO);
2584         if (req == NULL)
2585                 RETURN(-ENOMEM);
2586
2587         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2588                              RCL_CLIENT, keylen);
2589         if (!KEY_IS(KEY_GRANT_SHRINK))
2590                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2591                                      RCL_CLIENT, vallen);
2592         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2593         if (rc) {
2594                 ptlrpc_request_free(req);
2595                 RETURN(rc);
2596         }
2597
2598         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2599         memcpy(tmp, key, keylen);
2600         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2601                                                         &RMF_OST_BODY :
2602                                                         &RMF_SETINFO_VAL);
2603         memcpy(tmp, val, vallen);
2604
2605         if (KEY_IS(KEY_GRANT_SHRINK)) {
2606                 struct osc_grant_args *aa;
2607                 struct obdo *oa;
2608
2609                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2610                 aa = ptlrpc_req_async_args(req);
2611                 OBDO_ALLOC(oa);
2612                 if (!oa) {
2613                         ptlrpc_req_finished(req);
2614                         RETURN(-ENOMEM);
2615                 }
2616                 *oa = ((struct ost_body *)val)->oa;
2617                 aa->aa_oa = oa;
2618                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2619         }
2620
2621         ptlrpc_request_set_replen(req);
2622         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2623                 LASSERT(set != NULL);
2624                 ptlrpc_set_add_req(set, req);
2625                 ptlrpc_check_set(NULL, set);
2626         } else {
2627                 ptlrpcd_add_req(req);
2628         }
2629
2630         RETURN(0);
2631 }
2632 EXPORT_SYMBOL(osc_set_info_async);
2633
2634 static int osc_reconnect(const struct lu_env *env,
2635                          struct obd_export *exp, struct obd_device *obd,
2636                          struct obd_uuid *cluuid,
2637                          struct obd_connect_data *data,
2638                          void *localdata)
2639 {
2640         struct client_obd *cli = &obd->u.cli;
2641
2642         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2643                 long lost_grant;
2644                 long grant;
2645
2646                 spin_lock(&cli->cl_loi_list_lock);
2647                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2648                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2649                         grant += cli->cl_dirty_grant;
2650                 else
2651                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2652                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2653                 lost_grant = cli->cl_lost_grant;
2654                 cli->cl_lost_grant = 0;
2655                 spin_unlock(&cli->cl_loi_list_lock);
2656
2657                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2658                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2659                        data->ocd_version, data->ocd_grant, lost_grant);
2660         }
2661
2662         RETURN(0);
2663 }
2664
2665 static int osc_disconnect(struct obd_export *exp)
2666 {
2667         struct obd_device *obd = class_exp2obd(exp);
2668         int rc;
2669
2670         rc = client_disconnect_export(exp);
2671         /**
2672          * Initially we put del_shrink_grant before disconnect_export, but it
2673          * causes the following problem if setup (connect) and cleanup
2674          * (disconnect) are tangled together.
2675          *      connect p1                     disconnect p2
2676          *   ptlrpc_connect_import
2677          *     ...............               class_manual_cleanup
2678          *                                     osc_disconnect
2679          *                                     del_shrink_grant
2680          *   ptlrpc_connect_interrupt
2681          *     init_grant_shrink
2682          *   add this client to shrink list
2683          *                                      cleanup_osc
2684          * Bang! pinger trigger the shrink.
2685          * So the osc should be disconnected from the shrink list, after we
2686          * are sure the import has been destroyed. BUG18662
2687          */
2688         if (obd->u.cli.cl_import == NULL)
2689                 osc_del_shrink_grant(&obd->u.cli);
2690         return rc;
2691 }
2692
2693 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2694                                  struct hlist_node *hnode, void *arg)
2695 {
2696         struct lu_env *env = arg;
2697         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2698         struct ldlm_lock *lock;
2699         struct osc_object *osc = NULL;
2700         ENTRY;
2701
2702         lock_res(res);
2703         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2704                 if (lock->l_ast_data != NULL && osc == NULL) {
2705                         osc = lock->l_ast_data;
2706                         cl_object_get(osc2cl(osc));
2707                 }
2708
2709                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2710                  * by the 2nd round of ldlm_namespace_clean() call in
2711                  * osc_import_event(). */
2712                 ldlm_clear_cleaned(lock);
2713         }
2714         unlock_res(res);
2715
2716         if (osc != NULL) {
2717                 osc_object_invalidate(env, osc);
2718                 cl_object_put(env, osc2cl(osc));
2719         }
2720
2721         RETURN(0);
2722 }
2723 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2724
2725 static int osc_import_event(struct obd_device *obd,
2726                             struct obd_import *imp,
2727                             enum obd_import_event event)
2728 {
2729         struct client_obd *cli;
2730         int rc = 0;
2731
2732         ENTRY;
2733         LASSERT(imp->imp_obd == obd);
2734
2735         switch (event) {
2736         case IMP_EVENT_DISCON: {
2737                 cli = &obd->u.cli;
2738                 spin_lock(&cli->cl_loi_list_lock);
2739                 cli->cl_avail_grant = 0;
2740                 cli->cl_lost_grant = 0;
2741                 spin_unlock(&cli->cl_loi_list_lock);
2742                 break;
2743         }
2744         case IMP_EVENT_INACTIVE: {
2745                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2746                 break;
2747         }
2748         case IMP_EVENT_INVALIDATE: {
2749                 struct ldlm_namespace *ns = obd->obd_namespace;
2750                 struct lu_env         *env;
2751                 __u16                  refcheck;
2752
2753                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2754
2755                 env = cl_env_get(&refcheck);
2756                 if (!IS_ERR(env)) {
2757                         osc_io_unplug(env, &obd->u.cli, NULL);
2758
2759                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2760                                                  osc_ldlm_resource_invalidate,
2761                                                  env, 0);
2762                         cl_env_put(env, &refcheck);
2763
2764                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2765                 } else
2766                         rc = PTR_ERR(env);
2767                 break;
2768         }
2769         case IMP_EVENT_ACTIVE: {
2770                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2771                 break;
2772         }
2773         case IMP_EVENT_OCD: {
2774                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2775
2776                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2777                         osc_init_grant(&obd->u.cli, ocd);
2778
2779                 /* See bug 7198 */
2780                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2781                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2782
2783                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2784                 break;
2785         }
2786         case IMP_EVENT_DEACTIVATE: {
2787                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2788                 break;
2789         }
2790         case IMP_EVENT_ACTIVATE: {
2791                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2792                 break;
2793         }
2794         default:
2795                 CERROR("Unknown import event %d\n", event);
2796                 LBUG();
2797         }
2798         RETURN(rc);
2799 }
2800
2801 /**
2802  * Determine whether the lock can be canceled before replaying the lock
2803  * during recovery, see bug16774 for detailed information.
2804  *
2805  * \retval zero the lock can't be canceled
2806  * \retval other ok to cancel
2807  */
2808 static int osc_cancel_weight(struct ldlm_lock *lock)
2809 {
2810         /*
2811          * Cancel all unused and granted extent lock.
2812          */
2813         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2814             lock->l_granted_mode == lock->l_req_mode &&
2815             osc_ldlm_weigh_ast(lock) == 0)
2816                 RETURN(1);
2817
2818         RETURN(0);
2819 }
2820
2821 static int brw_queue_work(const struct lu_env *env, void *data)
2822 {
2823         struct client_obd *cli = data;
2824
2825         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2826
2827         osc_io_unplug(env, cli, NULL);
2828         RETURN(0);
2829 }
2830
2831 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2832 {
2833         struct client_obd *cli = &obd->u.cli;
2834         void *handler;
2835         int rc;
2836
2837         ENTRY;
2838
2839         rc = ptlrpcd_addref();
2840         if (rc)
2841                 RETURN(rc);
2842
2843         rc = client_obd_setup(obd, lcfg);
2844         if (rc)
2845                 GOTO(out_ptlrpcd, rc);
2846
2847
2848         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2849         if (IS_ERR(handler))
2850                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2851         cli->cl_writeback_work = handler;
2852
2853         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2854         if (IS_ERR(handler))
2855                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2856         cli->cl_lru_work = handler;
2857
2858         rc = osc_quota_setup(obd);
2859         if (rc)
2860                 GOTO(out_ptlrpcd_work, rc);
2861
2862         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2863
2864         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2865         RETURN(rc);
2866
2867 out_ptlrpcd_work:
2868         if (cli->cl_writeback_work != NULL) {
2869                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2870                 cli->cl_writeback_work = NULL;
2871         }
2872         if (cli->cl_lru_work != NULL) {
2873                 ptlrpcd_destroy_work(cli->cl_lru_work);
2874                 cli->cl_lru_work = NULL;
2875         }
2876         client_obd_cleanup(obd);
2877 out_ptlrpcd:
2878         ptlrpcd_decref();
2879         RETURN(rc);
2880 }
2881 EXPORT_SYMBOL(osc_setup_common);
2882
2883 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2884 {
2885         struct client_obd *cli = &obd->u.cli;
2886         struct obd_type   *type;
2887         int                adding;
2888         int                added;
2889         int                req_count;
2890         int                rc;
2891
2892         ENTRY;
2893
2894         rc = osc_setup_common(obd, lcfg);
2895         if (rc < 0)
2896                 RETURN(rc);
2897
2898 #ifdef CONFIG_PROC_FS
2899         obd->obd_vars = lprocfs_osc_obd_vars;
2900 #endif
2901         /* If this is true then both client (osc) and server (osp) are on the
2902          * same node. The osp layer if loaded first will register the osc proc
2903          * directory. In that case this obd_device will be attached its proc
2904          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2905          */
2906         type = class_search_type(LUSTRE_OSP_NAME);
2907         if (type && type->typ_procsym) {
2908                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2909                                                        type->typ_procsym,
2910                                                        obd->obd_vars, obd);
2911                 if (IS_ERR(obd->obd_proc_entry)) {
2912                         rc = PTR_ERR(obd->obd_proc_entry);
2913                         CERROR("error %d setting up lprocfs for %s\n", rc,
2914                                obd->obd_name);
2915                         obd->obd_proc_entry = NULL;
2916                 }
2917         }
2918
2919         rc = lprocfs_obd_setup(obd, false);
2920         if (!rc) {
2921                 /* If the basic OSC proc tree construction succeeded then
2922                  * lets do the rest.
2923                  */
2924                 lproc_osc_attach_seqstat(obd);
2925                 sptlrpc_lprocfs_cliobd_attach(obd);
2926                 ptlrpc_lprocfs_register_obd(obd);
2927         }
2928
2929         /*
2930          * We try to control the total number of requests with a upper limit
2931          * osc_reqpool_maxreqcount. There might be some race which will cause
2932          * over-limit allocation, but it is fine.
2933          */
2934         req_count = atomic_read(&osc_pool_req_count);
2935         if (req_count < osc_reqpool_maxreqcount) {
2936                 adding = cli->cl_max_rpcs_in_flight + 2;
2937                 if (req_count + adding > osc_reqpool_maxreqcount)
2938                         adding = osc_reqpool_maxreqcount - req_count;
2939
2940                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2941                 atomic_add(added, &osc_pool_req_count);
2942         }
2943
2944         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2945         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2946
2947         spin_lock(&osc_shrink_lock);
2948         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2949         spin_unlock(&osc_shrink_lock);
2950
2951         RETURN(0);
2952 }
2953
2954 int osc_precleanup_common(struct obd_device *obd)
2955 {
2956         struct client_obd *cli = &obd->u.cli;
2957         ENTRY;
2958
2959         /* LU-464
2960          * for echo client, export may be on zombie list, wait for
2961          * zombie thread to cull it, because cli.cl_import will be
2962          * cleared in client_disconnect_export():
2963          *   class_export_destroy() -> obd_cleanup() ->
2964          *   echo_device_free() -> echo_client_cleanup() ->
2965          *   obd_disconnect() -> osc_disconnect() ->
2966          *   client_disconnect_export()
2967          */
2968         obd_zombie_barrier();
2969         if (cli->cl_writeback_work) {
2970                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2971                 cli->cl_writeback_work = NULL;
2972         }
2973
2974         if (cli->cl_lru_work) {
2975                 ptlrpcd_destroy_work(cli->cl_lru_work);
2976                 cli->cl_lru_work = NULL;
2977         }
2978
2979         obd_cleanup_client_import(obd);
2980         RETURN(0);
2981 }
2982 EXPORT_SYMBOL(osc_precleanup_common);
2983
2984 static int osc_precleanup(struct obd_device *obd)
2985 {
2986         ENTRY;
2987
2988         osc_precleanup_common(obd);
2989
2990         ptlrpc_lprocfs_unregister_obd(obd);
2991         lprocfs_obd_cleanup(obd);
2992         RETURN(0);
2993 }
2994
2995 int osc_cleanup_common(struct obd_device *obd)
2996 {
2997         struct client_obd *cli = &obd->u.cli;
2998         int rc;
2999
3000         ENTRY;
3001
3002         spin_lock(&osc_shrink_lock);
3003         list_del(&cli->cl_shrink_list);
3004         spin_unlock(&osc_shrink_lock);
3005
3006         /* lru cleanup */
3007         if (cli->cl_cache != NULL) {
3008                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3009                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3010                 list_del_init(&cli->cl_lru_osc);
3011                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3012                 cli->cl_lru_left = NULL;
3013                 cl_cache_decref(cli->cl_cache);
3014                 cli->cl_cache = NULL;
3015         }
3016
3017         /* free memory of osc quota cache */
3018         osc_quota_cleanup(obd);
3019
3020         rc = client_obd_cleanup(obd);
3021
3022         ptlrpcd_decref();
3023         RETURN(rc);
3024 }
3025 EXPORT_SYMBOL(osc_cleanup_common);
3026
3027 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3028 {
3029         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3030         return rc > 0 ? 0: rc;
3031 }
3032
3033 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3034 {
3035         return osc_process_config_base(obd, buf);
3036 }
3037
3038 static struct obd_ops osc_obd_ops = {
3039         .o_owner                = THIS_MODULE,
3040         .o_setup                = osc_setup,
3041         .o_precleanup           = osc_precleanup,
3042         .o_cleanup              = osc_cleanup_common,
3043         .o_add_conn             = client_import_add_conn,
3044         .o_del_conn             = client_import_del_conn,
3045         .o_connect              = client_connect_import,
3046         .o_reconnect            = osc_reconnect,
3047         .o_disconnect           = osc_disconnect,
3048         .o_statfs               = osc_statfs,
3049         .o_statfs_async         = osc_statfs_async,
3050         .o_create               = osc_create,
3051         .o_destroy              = osc_destroy,
3052         .o_getattr              = osc_getattr,
3053         .o_setattr              = osc_setattr,
3054         .o_iocontrol            = osc_iocontrol,
3055         .o_set_info_async       = osc_set_info_async,
3056         .o_import_event         = osc_import_event,
3057         .o_process_config       = osc_process_config,
3058         .o_quotactl             = osc_quotactl,
3059 };
3060
3061 static struct shrinker *osc_cache_shrinker;
3062 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3063 DEFINE_SPINLOCK(osc_shrink_lock);
3064
3065 #ifndef HAVE_SHRINKER_COUNT
3066 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3067 {
3068         struct shrink_control scv = {
3069                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3070                 .gfp_mask   = shrink_param(sc, gfp_mask)
3071         };
3072 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3073         struct shrinker *shrinker = NULL;
3074 #endif
3075
3076         (void)osc_cache_shrink_scan(shrinker, &scv);
3077
3078         return osc_cache_shrink_count(shrinker, &scv);
3079 }
3080 #endif
3081
3082 static int __init osc_init(void)
3083 {
3084         bool enable_proc = true;
3085         struct obd_type *type;
3086         unsigned int reqpool_size;
3087         unsigned int reqsize;
3088         int rc;
3089         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3090                          osc_cache_shrink_count, osc_cache_shrink_scan);
3091         ENTRY;
3092
3093         /* print an address of _any_ initialized kernel symbol from this
3094          * module, to allow debugging with gdb that doesn't support data
3095          * symbols from modules.*/
3096         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3097
3098         rc = lu_kmem_init(osc_caches);
3099         if (rc)
3100                 RETURN(rc);
3101
3102         type = class_search_type(LUSTRE_OSP_NAME);
3103         if (type != NULL && type->typ_procsym != NULL)
3104                 enable_proc = false;
3105
3106         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3107                                  LUSTRE_OSC_NAME, &osc_device_type);
3108         if (rc)
3109                 GOTO(out_kmem, rc);
3110
3111         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3112
3113         /* This is obviously too much memory, only prevent overflow here */
3114         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3115                 GOTO(out_type, rc = -EINVAL);
3116
3117         reqpool_size = osc_reqpool_mem_max << 20;
3118
3119         reqsize = 1;
3120         while (reqsize < OST_IO_MAXREQSIZE)
3121                 reqsize = reqsize << 1;
3122
3123         /*
3124          * We don't enlarge the request count in OSC pool according to
3125          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3126          * tried after normal allocation failed. So a small OSC pool won't
3127          * cause much performance degression in most of cases.
3128          */
3129         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3130
3131         atomic_set(&osc_pool_req_count, 0);
3132         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3133                                           ptlrpc_add_rqs_to_pool);
3134
3135         if (osc_rq_pool != NULL)
3136                 GOTO(out, rc);
3137         rc = -ENOMEM;
3138 out_type:
3139         class_unregister_type(LUSTRE_OSC_NAME);
3140 out_kmem:
3141         lu_kmem_fini(osc_caches);
3142 out:
3143         RETURN(rc);
3144 }
3145
3146 static void __exit osc_exit(void)
3147 {
3148         remove_shrinker(osc_cache_shrinker);
3149         class_unregister_type(LUSTRE_OSC_NAME);
3150         lu_kmem_fini(osc_caches);
3151         ptlrpc_free_rq_pool(osc_rq_pool);
3152 }
3153
3154 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3155 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3156 MODULE_VERSION(LUSTRE_VERSION_STRING);
3157 MODULE_LICENSE("GPL");
3158
3159 module_init(osc_init);
3160 module_exit(osc_exit);