Whamcloud - gitweb
LU-3285 mdc: add IO methods to the MDC
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 struct osc_brw_async_args {
62         struct obdo              *aa_oa;
63         int                       aa_requested_nob;
64         int                       aa_nio_count;
65         u32                       aa_page_count;
66         int                       aa_resends;
67         struct brw_page **aa_ppga;
68         struct client_obd        *aa_cli;
69         struct list_head          aa_oaps;
70         struct list_head          aa_exts;
71 };
72
73 #define osc_grant_args osc_brw_async_args
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct osc_object       *fa_obj;
83         struct obdo             *fa_oa;
84         obd_enqueue_update_f    fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_ladvise_args {
89         struct obdo             *la_oa;
90         obd_enqueue_update_f     la_upcall;
91         void                    *la_cookie;
92 };
93
94 struct osc_enqueue_args {
95         struct obd_export       *oa_exp;
96         enum ldlm_type          oa_type;
97         enum ldlm_mode          oa_mode;
98         __u64                   *oa_flags;
99         osc_enqueue_upcall_f    oa_upcall;
100         void                    *oa_cookie;
101         struct ost_lvb          *oa_lvb;
102         struct lustre_handle    oa_lockh;
103         bool                    oa_speculative;
104 };
105
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108                          void *data, int rc);
109
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
111 {
112         struct ost_body *body;
113
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         LASSERT(body);
116
117         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 }
119
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121                        struct obdo *oa)
122 {
123         struct ptlrpc_request   *req;
124         struct ost_body         *body;
125         int                      rc;
126
127         ENTRY;
128         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
129         if (req == NULL)
130                 RETURN(-ENOMEM);
131
132         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
133         if (rc) {
134                 ptlrpc_request_free(req);
135                 RETURN(rc);
136         }
137
138         osc_pack_req_body(req, oa);
139
140         ptlrpc_request_set_replen(req);
141
142         rc = ptlrpc_queue_wait(req);
143         if (rc)
144                 GOTO(out, rc);
145
146         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
147         if (body == NULL)
148                 GOTO(out, rc = -EPROTO);
149
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
152
153         oa->o_blksize = cli_brw_size(exp->exp_obd);
154         oa->o_valid |= OBD_MD_FLBLKSZ;
155
156         EXIT;
157 out:
158         ptlrpc_req_finished(req);
159
160         return rc;
161 }
162
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164                        struct obdo *oa)
165 {
166         struct ptlrpc_request   *req;
167         struct ost_body         *body;
168         int                      rc;
169
170         ENTRY;
171         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
172
173         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
174         if (req == NULL)
175                 RETURN(-ENOMEM);
176
177         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
178         if (rc) {
179                 ptlrpc_request_free(req);
180                 RETURN(rc);
181         }
182
183         osc_pack_req_body(req, oa);
184
185         ptlrpc_request_set_replen(req);
186
187         rc = ptlrpc_queue_wait(req);
188         if (rc)
189                 GOTO(out, rc);
190
191         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192         if (body == NULL)
193                 GOTO(out, rc = -EPROTO);
194
195         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
196
197         EXIT;
198 out:
199         ptlrpc_req_finished(req);
200
201         RETURN(rc);
202 }
203
204 static int osc_setattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_setattr_args *sa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body == NULL)
216                 GOTO(out, rc = -EPROTO);
217
218         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219                              &body->oa);
220 out:
221         rc = sa->sa_upcall(sa->sa_cookie, rc);
222         RETURN(rc);
223 }
224
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226                       obd_enqueue_update_f upcall, void *cookie,
227                       struct ptlrpc_request_set *rqset)
228 {
229         struct ptlrpc_request   *req;
230         struct osc_setattr_args *sa;
231         int                      rc;
232
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oa);
246
247         ptlrpc_request_set_replen(req);
248
249         /* do mds to ost setattr asynchronously */
250         if (!rqset) {
251                 /* Do not wait for response. */
252                 ptlrpcd_add_req(req);
253         } else {
254                 req->rq_interpret_reply =
255                         (ptlrpc_interpterer_t)osc_setattr_interpret;
256
257                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258                 sa = ptlrpc_req_async_args(req);
259                 sa->sa_oa = oa;
260                 sa->sa_upcall = upcall;
261                 sa->sa_cookie = cookie;
262
263                 if (rqset == PTLRPCD_SET)
264                         ptlrpcd_add_req(req);
265                 else
266                         ptlrpc_set_add_req(rqset, req);
267         }
268
269         RETURN(0);
270 }
271
272 static int osc_ladvise_interpret(const struct lu_env *env,
273                                  struct ptlrpc_request *req,
274                                  void *arg, int rc)
275 {
276         struct osc_ladvise_args *la = arg;
277         struct ost_body *body;
278         ENTRY;
279
280         if (rc != 0)
281                 GOTO(out, rc);
282
283         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284         if (body == NULL)
285                 GOTO(out, rc = -EPROTO);
286
287         *la->la_oa = body->oa;
288 out:
289         rc = la->la_upcall(la->la_cookie, rc);
290         RETURN(rc);
291 }
292
293 /**
294  * If rqset is NULL, do not wait for response. Upcall and cookie could also
295  * be NULL in this case
296  */
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298                      struct ladvise_hdr *ladvise_hdr,
299                      obd_enqueue_update_f upcall, void *cookie,
300                      struct ptlrpc_request_set *rqset)
301 {
302         struct ptlrpc_request   *req;
303         struct ost_body         *body;
304         struct osc_ladvise_args *la;
305         int                      rc;
306         struct lu_ladvise       *req_ladvise;
307         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
308         int                      num_advise = ladvise_hdr->lah_count;
309         struct ladvise_hdr      *req_ladvise_hdr;
310         ENTRY;
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317                              num_advise * sizeof(*ladvise));
318         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
319         if (rc != 0) {
320                 ptlrpc_request_free(req);
321                 RETURN(rc);
322         }
323         req->rq_request_portal = OST_IO_PORTAL;
324         ptlrpc_at_set_req_timeout(req);
325
326         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
327         LASSERT(body);
328         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
329                              oa);
330
331         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332                                                  &RMF_OST_LADVISE_HDR);
333         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
334
335         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337         ptlrpc_request_set_replen(req);
338
339         if (rqset == NULL) {
340                 /* Do not wait for response. */
341                 ptlrpcd_add_req(req);
342                 RETURN(0);
343         }
344
345         req->rq_interpret_reply = osc_ladvise_interpret;
346         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347         la = ptlrpc_req_async_args(req);
348         la->la_oa = oa;
349         la->la_upcall = upcall;
350         la->la_cookie = cookie;
351
352         if (rqset == PTLRPCD_SET)
353                 ptlrpcd_add_req(req);
354         else
355                 ptlrpc_set_add_req(rqset, req);
356
357         RETURN(0);
358 }
359
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
361                       struct obdo *oa)
362 {
363         struct ptlrpc_request *req;
364         struct ost_body       *body;
365         int                    rc;
366         ENTRY;
367
368         LASSERT(oa != NULL);
369         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
371
372         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
373         if (req == NULL)
374                 GOTO(out, rc = -ENOMEM);
375
376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
377         if (rc) {
378                 ptlrpc_request_free(req);
379                 GOTO(out, rc);
380         }
381
382         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
383         LASSERT(body);
384
385         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
386
387         ptlrpc_request_set_replen(req);
388
389         rc = ptlrpc_queue_wait(req);
390         if (rc)
391                 GOTO(out_req, rc);
392
393         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
394         if (body == NULL)
395                 GOTO(out_req, rc = -EPROTO);
396
397         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
399
400         oa->o_blksize = cli_brw_size(exp->exp_obd);
401         oa->o_valid |= OBD_MD_FLBLKSZ;
402
403         CDEBUG(D_HA, "transno: %lld\n",
404                lustre_msg_get_transno(req->rq_repmsg));
405 out_req:
406         ptlrpc_req_finished(req);
407 out:
408         RETURN(rc);
409 }
410
411 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
412                    obd_enqueue_update_f upcall, void *cookie)
413 {
414         struct ptlrpc_request *req;
415         struct osc_setattr_args *sa;
416         struct obd_import *imp = class_exp2cliimp(exp);
417         struct ost_body *body;
418         int rc;
419
420         ENTRY;
421
422         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
423         if (req == NULL)
424                 RETURN(-ENOMEM);
425
426         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
427         if (rc < 0) {
428                 ptlrpc_request_free(req);
429                 RETURN(rc);
430         }
431
432         osc_set_io_portal(req);
433
434         ptlrpc_at_set_req_timeout(req);
435
436         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437
438         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
439
440         ptlrpc_request_set_replen(req);
441
442         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
443         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
444         sa = ptlrpc_req_async_args(req);
445         sa->sa_oa = oa;
446         sa->sa_upcall = upcall;
447         sa->sa_cookie = cookie;
448
449         ptlrpcd_add_req(req);
450
451         RETURN(0);
452 }
453 EXPORT_SYMBOL(osc_punch_send);
454
455 static int osc_sync_interpret(const struct lu_env *env,
456                               struct ptlrpc_request *req,
457                               void *arg, int rc)
458 {
459         struct osc_fsync_args   *fa = arg;
460         struct ost_body         *body;
461         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
462         unsigned long           valid = 0;
463         struct cl_object        *obj;
464         ENTRY;
465
466         if (rc != 0)
467                 GOTO(out, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL) {
471                 CERROR("can't unpack ost_body\n");
472                 GOTO(out, rc = -EPROTO);
473         }
474
475         *fa->fa_oa = body->oa;
476         obj = osc2cl(fa->fa_obj);
477
478         /* Update osc object's blocks attribute */
479         cl_object_attr_lock(obj);
480         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481                 attr->cat_blocks = body->oa.o_blocks;
482                 valid |= CAT_BLOCKS;
483         }
484
485         if (valid != 0)
486                 cl_object_attr_update(env, obj, attr, valid);
487         cl_object_attr_unlock(obj);
488
489 out:
490         rc = fa->fa_upcall(fa->fa_cookie, rc);
491         RETURN(rc);
492 }
493
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495                   obd_enqueue_update_f upcall, void *cookie,
496                   struct ptlrpc_request_set *rqset)
497 {
498         struct obd_export     *exp = osc_export(obj);
499         struct ptlrpc_request *req;
500         struct ost_body       *body;
501         struct osc_fsync_args *fa;
502         int                    rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
506         if (req == NULL)
507                 RETURN(-ENOMEM);
508
509         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(rc);
513         }
514
515         /* overload the size and blocks fields in the oa with start/end */
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521         req->rq_interpret_reply = osc_sync_interpret;
522
523         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524         fa = ptlrpc_req_async_args(req);
525         fa->fa_obj = obj;
526         fa->fa_oa = oa;
527         fa->fa_upcall = upcall;
528         fa->fa_cookie = cookie;
529
530         if (rqset == PTLRPCD_SET)
531                 ptlrpcd_add_req(req);
532         else
533                 ptlrpc_set_add_req(rqset, req);
534
535         RETURN (0);
536 }
537
538 /* Find and cancel locally locks matched by @mode in the resource found by
539  * @objid. Found locks are added into @cancel list. Returns the amount of
540  * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542                                    struct list_head *cancels,
543                                    enum ldlm_mode mode, __u64 lock_flags)
544 {
545         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546         struct ldlm_res_id res_id;
547         struct ldlm_resource *res;
548         int count;
549         ENTRY;
550
551         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552          * export) but disabled through procfs (flag in NS).
553          *
554          * This distinguishes from a case when ELC is not supported originally,
555          * when we still want to cancel locks in advance and just cancel them
556          * locally, without sending any RPC. */
557         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558                 RETURN(0);
559
560         ostid_build_res_name(&oa->o_oi, &res_id);
561         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
562         if (IS_ERR(res))
563                 RETURN(0);
564
565         LDLM_RESOURCE_ADDREF(res);
566         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567                                            lock_flags, 0, NULL);
568         LDLM_RESOURCE_DELREF(res);
569         ldlm_resource_putref(res);
570         RETURN(count);
571 }
572
573 static int osc_destroy_interpret(const struct lu_env *env,
574                                  struct ptlrpc_request *req, void *data,
575                                  int rc)
576 {
577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
578
579         atomic_dec(&cli->cl_destroy_in_flight);
580         wake_up(&cli->cl_destroy_waitq);
581         return 0;
582 }
583
584 static int osc_can_send_destroy(struct client_obd *cli)
585 {
586         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587             cli->cl_max_rpcs_in_flight) {
588                 /* The destroy request can be sent */
589                 return 1;
590         }
591         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592             cli->cl_max_rpcs_in_flight) {
593                 /*
594                  * The counter has been modified between the two atomic
595                  * operations.
596                  */
597                 wake_up(&cli->cl_destroy_waitq);
598         }
599         return 0;
600 }
601
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603                        struct obdo *oa)
604 {
605         struct client_obd     *cli = &exp->exp_obd->u.cli;
606         struct ptlrpc_request *req;
607         struct ost_body       *body;
608         struct list_head       cancels = LIST_HEAD_INIT(cancels);
609         int rc, count;
610         ENTRY;
611
612         if (!oa) {
613                 CDEBUG(D_INFO, "oa NULL\n");
614                 RETURN(-EINVAL);
615         }
616
617         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618                                         LDLM_FL_DISCARD_DATA);
619
620         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
621         if (req == NULL) {
622                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
623                 RETURN(-ENOMEM);
624         }
625
626         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627                                0, &cancels, count);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 RETURN(rc);
631         }
632
633         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634         ptlrpc_at_set_req_timeout(req);
635
636         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
637         LASSERT(body);
638         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
639
640         ptlrpc_request_set_replen(req);
641
642         req->rq_interpret_reply = osc_destroy_interpret;
643         if (!osc_can_send_destroy(cli)) {
644                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645
646                 /*
647                  * Wait until the number of on-going destroy RPCs drops
648                  * under max_rpc_in_flight
649                  */
650                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
651                                             osc_can_send_destroy(cli), &lwi);
652                 if (rc) {
653                         ptlrpc_req_finished(req);
654                         RETURN(rc);
655                 }
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_shrink_grant_interpret(const struct lu_env *env,
748                                       struct ptlrpc_request *req,
749                                       void *aa, int rc)
750 {
751         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
752         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
753         struct ost_body *body;
754
755         if (rc != 0) {
756                 __osc_update_grant(cli, oa->o_grant);
757                 GOTO(out, rc);
758         }
759
760         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
761         LASSERT(body);
762         osc_update_grant(cli, body);
763 out:
764         OBDO_FREE(oa);
765         return rc;
766 }
767
768 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
769 {
770         spin_lock(&cli->cl_loi_list_lock);
771         oa->o_grant = cli->cl_avail_grant / 4;
772         cli->cl_avail_grant -= oa->o_grant;
773         spin_unlock(&cli->cl_loi_list_lock);
774         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
775                 oa->o_valid |= OBD_MD_FLFLAGS;
776                 oa->o_flags = 0;
777         }
778         oa->o_flags |= OBD_FL_SHRINK_GRANT;
779         osc_update_next_shrink(cli);
780 }
781
782 /* Shrink the current grant, either from some large amount to enough for a
783  * full set of in-flight RPCs, or if we have already shrunk to that limit
784  * then to enough for a single RPC.  This avoids keeping more grant than
785  * needed, and avoids shrinking the grant piecemeal. */
786 static int osc_shrink_grant(struct client_obd *cli)
787 {
788         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
789                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
790
791         spin_lock(&cli->cl_loi_list_lock);
792         if (cli->cl_avail_grant <= target_bytes)
793                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
794         spin_unlock(&cli->cl_loi_list_lock);
795
796         return osc_shrink_grant_to_target(cli, target_bytes);
797 }
798
799 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
800 {
801         int                     rc = 0;
802         struct ost_body        *body;
803         ENTRY;
804
805         spin_lock(&cli->cl_loi_list_lock);
806         /* Don't shrink if we are already above or below the desired limit
807          * We don't want to shrink below a single RPC, as that will negatively
808          * impact block allocation and long-term performance. */
809         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
810                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
811
812         if (target_bytes >= cli->cl_avail_grant) {
813                 spin_unlock(&cli->cl_loi_list_lock);
814                 RETURN(0);
815         }
816         spin_unlock(&cli->cl_loi_list_lock);
817
818         OBD_ALLOC_PTR(body);
819         if (!body)
820                 RETURN(-ENOMEM);
821
822         osc_announce_cached(cli, &body->oa, 0);
823
824         spin_lock(&cli->cl_loi_list_lock);
825         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
826         cli->cl_avail_grant = target_bytes;
827         spin_unlock(&cli->cl_loi_list_lock);
828         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
829                 body->oa.o_valid |= OBD_MD_FLFLAGS;
830                 body->oa.o_flags = 0;
831         }
832         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
833         osc_update_next_shrink(cli);
834
835         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
836                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
837                                 sizeof(*body), body, NULL);
838         if (rc != 0)
839                 __osc_update_grant(cli, body->oa.o_grant);
840         OBD_FREE_PTR(body);
841         RETURN(rc);
842 }
843
844 static int osc_should_shrink_grant(struct client_obd *client)
845 {
846         cfs_time_t time = cfs_time_current();
847         cfs_time_t next_shrink = client->cl_next_shrink_grant;
848
849         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
850              OBD_CONNECT_GRANT_SHRINK) == 0)
851                 return 0;
852
853         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
854                 /* Get the current RPC size directly, instead of going via:
855                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
856                  * Keep comment here so that it can be found by searching. */
857                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
858
859                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
860                     client->cl_avail_grant > brw_size)
861                         return 1;
862                 else
863                         osc_update_next_shrink(client);
864         }
865         return 0;
866 }
867
868 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
869 {
870         struct client_obd *client;
871
872         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
873                 if (osc_should_shrink_grant(client))
874                         osc_shrink_grant(client);
875         }
876         return 0;
877 }
878
879 static int osc_add_shrink_grant(struct client_obd *client)
880 {
881         int rc;
882
883         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
884                                        TIMEOUT_GRANT,
885                                        osc_grant_shrink_grant_cb, NULL,
886                                        &client->cl_grant_shrink_list);
887         if (rc) {
888                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
889                 return rc;
890         }
891         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
892         osc_update_next_shrink(client);
893         return 0;
894 }
895
896 static int osc_del_shrink_grant(struct client_obd *client)
897 {
898         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
899                                          TIMEOUT_GRANT);
900 }
901
902 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
903 {
904         /*
905          * ocd_grant is the total grant amount we're expect to hold: if we've
906          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
907          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
908          * dirty.
909          *
910          * race is tolerable here: if we're evicted, but imp_state already
911          * left EVICTED state, then cl_dirty_pages must be 0 already.
912          */
913         spin_lock(&cli->cl_loi_list_lock);
914         cli->cl_avail_grant = ocd->ocd_grant;
915         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
916                 cli->cl_avail_grant -= cli->cl_reserved_grant;
917                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
918                         cli->cl_avail_grant -= cli->cl_dirty_grant;
919                 else
920                         cli->cl_avail_grant -=
921                                         cli->cl_dirty_pages << PAGE_SHIFT;
922         }
923
924         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
925                 u64 size;
926                 int chunk_mask;
927
928                 /* overhead for each extent insertion */
929                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
930                 /* determine the appropriate chunk size used by osc_extent. */
931                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
932                                           ocd->ocd_grant_blkbits);
933                 /* max_pages_per_rpc must be chunk aligned */
934                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
935                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
936                                              ~chunk_mask) & chunk_mask;
937                 /* determine maximum extent size, in #pages */
938                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
939                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
940                 if (cli->cl_max_extent_pages == 0)
941                         cli->cl_max_extent_pages = 1;
942         } else {
943                 cli->cl_grant_extent_tax = 0;
944                 cli->cl_chunkbits = PAGE_SHIFT;
945                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
946         }
947         spin_unlock(&cli->cl_loi_list_lock);
948
949         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
950                 "chunk bits: %d cl_max_extent_pages: %d\n",
951                 cli_name(cli),
952                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
953                 cli->cl_max_extent_pages);
954
955         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
956             list_empty(&cli->cl_grant_shrink_list))
957                 osc_add_shrink_grant(cli);
958 }
959 EXPORT_SYMBOL(osc_init_grant);
960
961 /* We assume that the reason this OSC got a short read is because it read
962  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
963  * via the LOV, and it _knows_ it's reading inside the file, it's just that
964  * this stripe never got written at or beyond this stripe offset yet. */
965 static void handle_short_read(int nob_read, size_t page_count,
966                               struct brw_page **pga)
967 {
968         char *ptr;
969         int i = 0;
970
971         /* skip bytes read OK */
972         while (nob_read > 0) {
973                 LASSERT (page_count > 0);
974
975                 if (pga[i]->count > nob_read) {
976                         /* EOF inside this page */
977                         ptr = kmap(pga[i]->pg) +
978                                 (pga[i]->off & ~PAGE_MASK);
979                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
980                         kunmap(pga[i]->pg);
981                         page_count--;
982                         i++;
983                         break;
984                 }
985
986                 nob_read -= pga[i]->count;
987                 page_count--;
988                 i++;
989         }
990
991         /* zero remaining pages */
992         while (page_count-- > 0) {
993                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
994                 memset(ptr, 0, pga[i]->count);
995                 kunmap(pga[i]->pg);
996                 i++;
997         }
998 }
999
1000 static int check_write_rcs(struct ptlrpc_request *req,
1001                            int requested_nob, int niocount,
1002                            size_t page_count, struct brw_page **pga)
1003 {
1004         int     i;
1005         __u32   *remote_rcs;
1006
1007         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1008                                                   sizeof(*remote_rcs) *
1009                                                   niocount);
1010         if (remote_rcs == NULL) {
1011                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1012                 return(-EPROTO);
1013         }
1014
1015         /* return error if any niobuf was in error */
1016         for (i = 0; i < niocount; i++) {
1017                 if ((int)remote_rcs[i] < 0)
1018                         return(remote_rcs[i]);
1019
1020                 if (remote_rcs[i] != 0) {
1021                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1022                                 i, remote_rcs[i], req);
1023                         return(-EPROTO);
1024                 }
1025         }
1026
1027         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1028                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1029                        req->rq_bulk->bd_nob_transferred, requested_nob);
1030                 return(-EPROTO);
1031         }
1032
1033         return (0);
1034 }
1035
1036 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1037 {
1038         if (p1->flag != p2->flag) {
1039                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1040                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1041                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1042
1043                 /* warn if we try to combine flags that we don't know to be
1044                  * safe to combine */
1045                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1046                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1047                               "report this at https://jira.hpdd.intel.com/\n",
1048                               p1->flag, p2->flag);
1049                 }
1050                 return 0;
1051         }
1052
1053         return (p1->off + p1->count == p2->off);
1054 }
1055
1056 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1057                              struct brw_page **pga, int opc,
1058                              enum cksum_types cksum_type)
1059 {
1060         u32                             cksum;
1061         int                             i = 0;
1062         struct cfs_crypto_hash_desc     *hdesc;
1063         unsigned int                    bufsize;
1064         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1065
1066         LASSERT(pg_count > 0);
1067
1068         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1069         if (IS_ERR(hdesc)) {
1070                 CERROR("Unable to initialize checksum hash %s\n",
1071                        cfs_crypto_hash_name(cfs_alg));
1072                 return PTR_ERR(hdesc);
1073         }
1074
1075         while (nob > 0 && pg_count > 0) {
1076                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1077
1078                 /* corrupt the data before we compute the checksum, to
1079                  * simulate an OST->client data error */
1080                 if (i == 0 && opc == OST_READ &&
1081                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1082                         unsigned char *ptr = kmap(pga[i]->pg);
1083                         int off = pga[i]->off & ~PAGE_MASK;
1084
1085                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1086                         kunmap(pga[i]->pg);
1087                 }
1088                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1089                                             pga[i]->off & ~PAGE_MASK,
1090                                             count);
1091                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1092                                (int)(pga[i]->off & ~PAGE_MASK));
1093
1094                 nob -= pga[i]->count;
1095                 pg_count--;
1096                 i++;
1097         }
1098
1099         bufsize = sizeof(cksum);
1100         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1101
1102         /* For sending we only compute the wrong checksum instead
1103          * of corrupting the data so it is still correct on a redo */
1104         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1105                 cksum++;
1106
1107         return cksum;
1108 }
1109
1110 static int
1111 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1112                      u32 page_count, struct brw_page **pga,
1113                      struct ptlrpc_request **reqp, int resend)
1114 {
1115         struct ptlrpc_request   *req;
1116         struct ptlrpc_bulk_desc *desc;
1117         struct ost_body         *body;
1118         struct obd_ioobj        *ioobj;
1119         struct niobuf_remote    *niobuf;
1120         int niocount, i, requested_nob, opc, rc;
1121         struct osc_brw_async_args *aa;
1122         struct req_capsule      *pill;
1123         struct brw_page *pg_prev;
1124
1125         ENTRY;
1126         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1127                 RETURN(-ENOMEM); /* Recoverable */
1128         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1129                 RETURN(-EINVAL); /* Fatal */
1130
1131         if ((cmd & OBD_BRW_WRITE) != 0) {
1132                 opc = OST_WRITE;
1133                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1134                                                 osc_rq_pool,
1135                                                 &RQF_OST_BRW_WRITE);
1136         } else {
1137                 opc = OST_READ;
1138                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1139         }
1140         if (req == NULL)
1141                 RETURN(-ENOMEM);
1142
1143         for (niocount = i = 1; i < page_count; i++) {
1144                 if (!can_merge_pages(pga[i - 1], pga[i]))
1145                         niocount++;
1146         }
1147
1148         pill = &req->rq_pill;
1149         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1150                              sizeof(*ioobj));
1151         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1152                              niocount * sizeof(*niobuf));
1153
1154         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1155         if (rc) {
1156                 ptlrpc_request_free(req);
1157                 RETURN(rc);
1158         }
1159         osc_set_io_portal(req);
1160
1161         ptlrpc_at_set_req_timeout(req);
1162         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1163          * retry logic */
1164         req->rq_no_retry_einprogress = 1;
1165
1166         desc = ptlrpc_prep_bulk_imp(req, page_count,
1167                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1168                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1169                         PTLRPC_BULK_PUT_SINK) |
1170                         PTLRPC_BULK_BUF_KIOV,
1171                 OST_BULK_PORTAL,
1172                 &ptlrpc_bulk_kiov_pin_ops);
1173
1174         if (desc == NULL)
1175                 GOTO(out, rc = -ENOMEM);
1176         /* NB request now owns desc and will free it when it gets freed */
1177
1178         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1179         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1180         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1181         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1182
1183         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1184
1185         obdo_to_ioobj(oa, ioobj);
1186         ioobj->ioo_bufcnt = niocount;
1187         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1188          * that might be send for this request.  The actual number is decided
1189          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1190          * "max - 1" for old client compatibility sending "0", and also so the
1191          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1192         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1193         LASSERT(page_count > 0);
1194         pg_prev = pga[0];
1195         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1196                 struct brw_page *pg = pga[i];
1197                 int poff = pg->off & ~PAGE_MASK;
1198
1199                 LASSERT(pg->count > 0);
1200                 /* make sure there is no gap in the middle of page array */
1201                 LASSERTF(page_count == 1 ||
1202                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1203                           ergo(i > 0 && i < page_count - 1,
1204                                poff == 0 && pg->count == PAGE_SIZE)   &&
1205                           ergo(i == page_count - 1, poff == 0)),
1206                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1207                          i, page_count, pg, pg->off, pg->count);
1208                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1209                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1210                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1211                          i, page_count,
1212                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1213                          pg_prev->pg, page_private(pg_prev->pg),
1214                          pg_prev->pg->index, pg_prev->off);
1215                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1216                         (pg->flag & OBD_BRW_SRVLOCK));
1217
1218                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1219                 requested_nob += pg->count;
1220
1221                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1222                         niobuf--;
1223                         niobuf->rnb_len += pg->count;
1224                 } else {
1225                         niobuf->rnb_offset = pg->off;
1226                         niobuf->rnb_len    = pg->count;
1227                         niobuf->rnb_flags  = pg->flag;
1228                 }
1229                 pg_prev = pg;
1230         }
1231
1232         LASSERTF((void *)(niobuf - niocount) ==
1233                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1234                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1235                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1236
1237         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1238         if (resend) {
1239                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1240                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1241                         body->oa.o_flags = 0;
1242                 }
1243                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1244         }
1245
1246         if (osc_should_shrink_grant(cli))
1247                 osc_shrink_grant_local(cli, &body->oa);
1248
1249         /* size[REQ_REC_OFF] still sizeof (*body) */
1250         if (opc == OST_WRITE) {
1251                 if (cli->cl_checksum &&
1252                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1253                         /* store cl_cksum_type in a local variable since
1254                          * it can be changed via lprocfs */
1255                         enum cksum_types cksum_type = cli->cl_cksum_type;
1256
1257                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1258                                 body->oa.o_flags = 0;
1259
1260                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1261                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1262                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1263                                                              page_count, pga,
1264                                                              OST_WRITE,
1265                                                              cksum_type);
1266                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1267                                body->oa.o_cksum);
1268                         /* save this in 'oa', too, for later checking */
1269                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1270                         oa->o_flags |= cksum_type_pack(cksum_type);
1271                 } else {
1272                         /* clear out the checksum flag, in case this is a
1273                          * resend but cl_checksum is no longer set. b=11238 */
1274                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1275                 }
1276                 oa->o_cksum = body->oa.o_cksum;
1277                 /* 1 RC per niobuf */
1278                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1279                                      sizeof(__u32) * niocount);
1280         } else {
1281                 if (cli->cl_checksum &&
1282                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1283                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284                                 body->oa.o_flags = 0;
1285                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1286                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1287                 }
1288
1289                 /* Client cksum has been already copied to wire obdo in previous
1290                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1291                  * resent due to cksum error, this will allow Server to
1292                  * check+dump pages on its side */
1293         }
1294         ptlrpc_request_set_replen(req);
1295
1296         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1297         aa = ptlrpc_req_async_args(req);
1298         aa->aa_oa = oa;
1299         aa->aa_requested_nob = requested_nob;
1300         aa->aa_nio_count = niocount;
1301         aa->aa_page_count = page_count;
1302         aa->aa_resends = 0;
1303         aa->aa_ppga = pga;
1304         aa->aa_cli = cli;
1305         INIT_LIST_HEAD(&aa->aa_oaps);
1306
1307         *reqp = req;
1308         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1309         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1310                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1311                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1312         RETURN(0);
1313
1314  out:
1315         ptlrpc_req_finished(req);
1316         RETURN(rc);
1317 }
1318
1319 char dbgcksum_file_name[PATH_MAX];
1320
1321 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1322                                 struct brw_page **pga, __u32 server_cksum,
1323                                 __u32 client_cksum)
1324 {
1325         struct file *filp;
1326         int rc, i;
1327         unsigned int len;
1328         char *buf;
1329         mm_segment_t oldfs;
1330
1331         /* will only keep dump of pages on first error for the same range in
1332          * file/fid, not during the resends/retries. */
1333         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1334                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1335                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1336                   libcfs_debug_file_path_arr :
1337                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1338                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1339                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1340                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1341                  pga[0]->off,
1342                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1343                  client_cksum, server_cksum);
1344         filp = filp_open(dbgcksum_file_name,
1345                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1346         if (IS_ERR(filp)) {
1347                 rc = PTR_ERR(filp);
1348                 if (rc == -EEXIST)
1349                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1350                                "checksum error: rc = %d\n", dbgcksum_file_name,
1351                                rc);
1352                 else
1353                         CERROR("%s: can't open to dump pages with checksum "
1354                                "error: rc = %d\n", dbgcksum_file_name, rc);
1355                 return;
1356         }
1357
1358         oldfs = get_fs();
1359         set_fs(KERNEL_DS);
1360         for (i = 0; i < page_count; i++) {
1361                 len = pga[i]->count;
1362                 buf = kmap(pga[i]->pg);
1363                 while (len != 0) {
1364                         rc = vfs_write(filp, (__force const char __user *)buf,
1365                                        len, &filp->f_pos);
1366                         if (rc < 0) {
1367                                 CERROR("%s: wanted to write %u but got %d "
1368                                        "error\n", dbgcksum_file_name, len, rc);
1369                                 break;
1370                         }
1371                         len -= rc;
1372                         buf += rc;
1373                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1374                                dbgcksum_file_name, rc);
1375                 }
1376                 kunmap(pga[i]->pg);
1377         }
1378         set_fs(oldfs);
1379
1380         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1381         if (rc)
1382                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1383         filp_close(filp, NULL);
1384         return;
1385 }
1386
1387 static int
1388 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1389                                 __u32 client_cksum, __u32 server_cksum,
1390                                 struct osc_brw_async_args *aa)
1391 {
1392         __u32 new_cksum;
1393         char *msg;
1394         enum cksum_types cksum_type;
1395
1396         if (server_cksum == client_cksum) {
1397                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1398                 return 0;
1399         }
1400
1401         if (aa->aa_cli->cl_checksum_dump)
1402                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1403                                     server_cksum, client_cksum);
1404
1405         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1406                                        oa->o_flags : 0);
1407         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1408                                       aa->aa_ppga, OST_WRITE, cksum_type);
1409
1410         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1411                 msg = "the server did not use the checksum type specified in "
1412                       "the original request - likely a protocol problem";
1413         else if (new_cksum == server_cksum)
1414                 msg = "changed on the client after we checksummed it - "
1415                       "likely false positive due to mmap IO (bug 11742)";
1416         else if (new_cksum == client_cksum)
1417                 msg = "changed in transit before arrival at OST";
1418         else
1419                 msg = "changed in transit AND doesn't match the original - "
1420                       "likely false positive due to mmap IO (bug 11742)";
1421
1422         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1423                            DFID " object "DOSTID" extent [%llu-%llu], original "
1424                            "client csum %x (type %x), server csum %x (type %x),"
1425                            " client csum now %x\n",
1426                            aa->aa_cli->cl_import->imp_obd->obd_name,
1427                            msg, libcfs_nid2str(peer->nid),
1428                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1429                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1430                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1431                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1432                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1433                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1434                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1435                            server_cksum, cksum_type, new_cksum);
1436         return 1;
1437 }
1438
1439 /* Note rc enters this function as number of bytes transferred */
1440 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1441 {
1442         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1443         const struct lnet_process_id *peer =
1444                         &req->rq_import->imp_connection->c_peer;
1445         struct client_obd *cli = aa->aa_cli;
1446         struct ost_body *body;
1447         u32 client_cksum = 0;
1448         ENTRY;
1449
1450         if (rc < 0 && rc != -EDQUOT) {
1451                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1452                 RETURN(rc);
1453         }
1454
1455         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1456         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1457         if (body == NULL) {
1458                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1459                 RETURN(-EPROTO);
1460         }
1461
1462         /* set/clear over quota flag for a uid/gid/projid */
1463         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1464             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1465                 unsigned qid[LL_MAXQUOTAS] = {
1466                                          body->oa.o_uid, body->oa.o_gid,
1467                                          body->oa.o_projid };
1468                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1469                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1470                        body->oa.o_valid, body->oa.o_flags);
1471                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1472                                        body->oa.o_flags);
1473         }
1474
1475         osc_update_grant(cli, body);
1476
1477         if (rc < 0)
1478                 RETURN(rc);
1479
1480         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1481                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1482
1483         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1484                 if (rc > 0) {
1485                         CERROR("Unexpected +ve rc %d\n", rc);
1486                         RETURN(-EPROTO);
1487                 }
1488                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1489
1490                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1491                         RETURN(-EAGAIN);
1492
1493                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1494                     check_write_checksum(&body->oa, peer, client_cksum,
1495                                          body->oa.o_cksum, aa))
1496                         RETURN(-EAGAIN);
1497
1498                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1499                                      aa->aa_page_count, aa->aa_ppga);
1500                 GOTO(out, rc);
1501         }
1502
1503         /* The rest of this function executes only for OST_READs */
1504
1505         /* if unwrap_bulk failed, return -EAGAIN to retry */
1506         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1507         if (rc < 0)
1508                 GOTO(out, rc = -EAGAIN);
1509
1510         if (rc > aa->aa_requested_nob) {
1511                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1512                        aa->aa_requested_nob);
1513                 RETURN(-EPROTO);
1514         }
1515
1516         if (rc != req->rq_bulk->bd_nob_transferred) {
1517                 CERROR ("Unexpected rc %d (%d transferred)\n",
1518                         rc, req->rq_bulk->bd_nob_transferred);
1519                 return (-EPROTO);
1520         }
1521
1522         if (rc < aa->aa_requested_nob)
1523                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1524
1525         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1526                 static int cksum_counter;
1527                 u32        server_cksum = body->oa.o_cksum;
1528                 char      *via = "";
1529                 char      *router = "";
1530                 enum cksum_types cksum_type;
1531
1532                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1533                                                body->oa.o_flags : 0);
1534                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1535                                                  aa->aa_ppga, OST_READ,
1536                                                  cksum_type);
1537
1538                 if (peer->nid != req->rq_bulk->bd_sender) {
1539                         via = " via ";
1540                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1541                 }
1542
1543                 if (server_cksum != client_cksum) {
1544                         struct ost_body *clbody;
1545                         u32 page_count = aa->aa_page_count;
1546
1547                         clbody = req_capsule_client_get(&req->rq_pill,
1548                                                         &RMF_OST_BODY);
1549                         if (cli->cl_checksum_dump)
1550                                 dump_all_bulk_pages(&clbody->oa, page_count,
1551                                                     aa->aa_ppga, server_cksum,
1552                                                     client_cksum);
1553
1554                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1555                                            "%s%s%s inode "DFID" object "DOSTID
1556                                            " extent [%llu-%llu], client %x, "
1557                                            "server %x, cksum_type %x\n",
1558                                            req->rq_import->imp_obd->obd_name,
1559                                            libcfs_nid2str(peer->nid),
1560                                            via, router,
1561                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1562                                                 clbody->oa.o_parent_seq : 0ULL,
1563                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1564                                                 clbody->oa.o_parent_oid : 0,
1565                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1566                                                 clbody->oa.o_parent_ver : 0,
1567                                            POSTID(&body->oa.o_oi),
1568                                            aa->aa_ppga[0]->off,
1569                                            aa->aa_ppga[page_count-1]->off +
1570                                            aa->aa_ppga[page_count-1]->count - 1,
1571                                            client_cksum, server_cksum,
1572                                            cksum_type);
1573                         cksum_counter = 0;
1574                         aa->aa_oa->o_cksum = client_cksum;
1575                         rc = -EAGAIN;
1576                 } else {
1577                         cksum_counter++;
1578                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1579                         rc = 0;
1580                 }
1581         } else if (unlikely(client_cksum)) {
1582                 static int cksum_missed;
1583
1584                 cksum_missed++;
1585                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1586                         CERROR("Checksum %u requested from %s but not sent\n",
1587                                cksum_missed, libcfs_nid2str(peer->nid));
1588         } else {
1589                 rc = 0;
1590         }
1591 out:
1592         if (rc >= 0)
1593                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1594                                      aa->aa_oa, &body->oa);
1595
1596         RETURN(rc);
1597 }
1598
1599 static int osc_brw_redo_request(struct ptlrpc_request *request,
1600                                 struct osc_brw_async_args *aa, int rc)
1601 {
1602         struct ptlrpc_request *new_req;
1603         struct osc_brw_async_args *new_aa;
1604         struct osc_async_page *oap;
1605         ENTRY;
1606
1607         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1608                   "redo for recoverable error %d", rc);
1609
1610         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1611                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1612                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1613                                   aa->aa_ppga, &new_req, 1);
1614         if (rc)
1615                 RETURN(rc);
1616
1617         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1618                 if (oap->oap_request != NULL) {
1619                         LASSERTF(request == oap->oap_request,
1620                                  "request %p != oap_request %p\n",
1621                                  request, oap->oap_request);
1622                         if (oap->oap_interrupted) {
1623                                 ptlrpc_req_finished(new_req);
1624                                 RETURN(-EINTR);
1625                         }
1626                 }
1627         }
1628         /* New request takes over pga and oaps from old request.
1629          * Note that copying a list_head doesn't work, need to move it... */
1630         aa->aa_resends++;
1631         new_req->rq_interpret_reply = request->rq_interpret_reply;
1632         new_req->rq_async_args = request->rq_async_args;
1633         new_req->rq_commit_cb = request->rq_commit_cb;
1634         /* cap resend delay to the current request timeout, this is similar to
1635          * what ptlrpc does (see after_reply()) */
1636         if (aa->aa_resends > new_req->rq_timeout)
1637                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1638         else
1639                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1640         new_req->rq_generation_set = 1;
1641         new_req->rq_import_generation = request->rq_import_generation;
1642
1643         new_aa = ptlrpc_req_async_args(new_req);
1644
1645         INIT_LIST_HEAD(&new_aa->aa_oaps);
1646         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1647         INIT_LIST_HEAD(&new_aa->aa_exts);
1648         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1649         new_aa->aa_resends = aa->aa_resends;
1650
1651         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1652                 if (oap->oap_request) {
1653                         ptlrpc_req_finished(oap->oap_request);
1654                         oap->oap_request = ptlrpc_request_addref(new_req);
1655                 }
1656         }
1657
1658         /* XXX: This code will run into problem if we're going to support
1659          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1660          * and wait for all of them to be finished. We should inherit request
1661          * set from old request. */
1662         ptlrpcd_add_req(new_req);
1663
1664         DEBUG_REQ(D_INFO, new_req, "new request");
1665         RETURN(0);
1666 }
1667
1668 /*
1669  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1670  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1671  * fine for our small page arrays and doesn't require allocation.  its an
1672  * insertion sort that swaps elements that are strides apart, shrinking the
1673  * stride down until its '1' and the array is sorted.
1674  */
1675 static void sort_brw_pages(struct brw_page **array, int num)
1676 {
1677         int stride, i, j;
1678         struct brw_page *tmp;
1679
1680         if (num == 1)
1681                 return;
1682         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1683                 ;
1684
1685         do {
1686                 stride /= 3;
1687                 for (i = stride ; i < num ; i++) {
1688                         tmp = array[i];
1689                         j = i;
1690                         while (j >= stride && array[j - stride]->off > tmp->off) {
1691                                 array[j] = array[j - stride];
1692                                 j -= stride;
1693                         }
1694                         array[j] = tmp;
1695                 }
1696         } while (stride > 1);
1697 }
1698
1699 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1700 {
1701         LASSERT(ppga != NULL);
1702         OBD_FREE(ppga, sizeof(*ppga) * count);
1703 }
1704
1705 static int brw_interpret(const struct lu_env *env,
1706                          struct ptlrpc_request *req, void *data, int rc)
1707 {
1708         struct osc_brw_async_args *aa = data;
1709         struct osc_extent *ext;
1710         struct osc_extent *tmp;
1711         struct client_obd *cli = aa->aa_cli;
1712         ENTRY;
1713
1714         rc = osc_brw_fini_request(req, rc);
1715         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1716         /* When server return -EINPROGRESS, client should always retry
1717          * regardless of the number of times the bulk was resent already. */
1718         if (osc_recoverable_error(rc)) {
1719                 if (req->rq_import_generation !=
1720                     req->rq_import->imp_generation) {
1721                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1722                                ""DOSTID", rc = %d.\n",
1723                                req->rq_import->imp_obd->obd_name,
1724                                POSTID(&aa->aa_oa->o_oi), rc);
1725                 } else if (rc == -EINPROGRESS ||
1726                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1727                         rc = osc_brw_redo_request(req, aa, rc);
1728                 } else {
1729                         CERROR("%s: too many resent retries for object: "
1730                                "%llu:%llu, rc = %d.\n",
1731                                req->rq_import->imp_obd->obd_name,
1732                                POSTID(&aa->aa_oa->o_oi), rc);
1733                 }
1734
1735                 if (rc == 0)
1736                         RETURN(0);
1737                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1738                         rc = -EIO;
1739         }
1740
1741         if (rc == 0) {
1742                 struct obdo *oa = aa->aa_oa;
1743                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1744                 unsigned long valid = 0;
1745                 struct cl_object *obj;
1746                 struct osc_async_page *last;
1747
1748                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1749                 obj = osc2cl(last->oap_obj);
1750
1751                 cl_object_attr_lock(obj);
1752                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1753                         attr->cat_blocks = oa->o_blocks;
1754                         valid |= CAT_BLOCKS;
1755                 }
1756                 if (oa->o_valid & OBD_MD_FLMTIME) {
1757                         attr->cat_mtime = oa->o_mtime;
1758                         valid |= CAT_MTIME;
1759                 }
1760                 if (oa->o_valid & OBD_MD_FLATIME) {
1761                         attr->cat_atime = oa->o_atime;
1762                         valid |= CAT_ATIME;
1763                 }
1764                 if (oa->o_valid & OBD_MD_FLCTIME) {
1765                         attr->cat_ctime = oa->o_ctime;
1766                         valid |= CAT_CTIME;
1767                 }
1768
1769                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1770                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1771                         loff_t last_off = last->oap_count + last->oap_obj_off +
1772                                 last->oap_page_off;
1773
1774                         /* Change file size if this is an out of quota or
1775                          * direct IO write and it extends the file size */
1776                         if (loi->loi_lvb.lvb_size < last_off) {
1777                                 attr->cat_size = last_off;
1778                                 valid |= CAT_SIZE;
1779                         }
1780                         /* Extend KMS if it's not a lockless write */
1781                         if (loi->loi_kms < last_off &&
1782                             oap2osc_page(last)->ops_srvlock == 0) {
1783                                 attr->cat_kms = last_off;
1784                                 valid |= CAT_KMS;
1785                         }
1786                 }
1787
1788                 if (valid != 0)
1789                         cl_object_attr_update(env, obj, attr, valid);
1790                 cl_object_attr_unlock(obj);
1791         }
1792         OBDO_FREE(aa->aa_oa);
1793
1794         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1795                 osc_inc_unstable_pages(req);
1796
1797         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1798                 list_del_init(&ext->oe_link);
1799                 osc_extent_finish(env, ext, 1, rc);
1800         }
1801         LASSERT(list_empty(&aa->aa_exts));
1802         LASSERT(list_empty(&aa->aa_oaps));
1803
1804         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1805         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1806
1807         spin_lock(&cli->cl_loi_list_lock);
1808         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1809          * is called so we know whether to go to sync BRWs or wait for more
1810          * RPCs to complete */
1811         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1812                 cli->cl_w_in_flight--;
1813         else
1814                 cli->cl_r_in_flight--;
1815         osc_wake_cache_waiters(cli);
1816         spin_unlock(&cli->cl_loi_list_lock);
1817
1818         osc_io_unplug(env, cli, NULL);
1819         RETURN(rc);
1820 }
1821
1822 static void brw_commit(struct ptlrpc_request *req)
1823 {
1824         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1825          * this called via the rq_commit_cb, I need to ensure
1826          * osc_dec_unstable_pages is still called. Otherwise unstable
1827          * pages may be leaked. */
1828         spin_lock(&req->rq_lock);
1829         if (likely(req->rq_unstable)) {
1830                 req->rq_unstable = 0;
1831                 spin_unlock(&req->rq_lock);
1832
1833                 osc_dec_unstable_pages(req);
1834         } else {
1835                 req->rq_committed = 1;
1836                 spin_unlock(&req->rq_lock);
1837         }
1838 }
1839
1840 /**
1841  * Build an RPC by the list of extent @ext_list. The caller must ensure
1842  * that the total pages in this list are NOT over max pages per RPC.
1843  * Extents in the list must be in OES_RPC state.
1844  */
1845 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1846                   struct list_head *ext_list, int cmd)
1847 {
1848         struct ptlrpc_request           *req = NULL;
1849         struct osc_extent               *ext;
1850         struct brw_page                 **pga = NULL;
1851         struct osc_brw_async_args       *aa = NULL;
1852         struct obdo                     *oa = NULL;
1853         struct osc_async_page           *oap;
1854         struct osc_object               *obj = NULL;
1855         struct cl_req_attr              *crattr = NULL;
1856         loff_t                          starting_offset = OBD_OBJECT_EOF;
1857         loff_t                          ending_offset = 0;
1858         int                             mpflag = 0;
1859         int                             mem_tight = 0;
1860         int                             page_count = 0;
1861         bool                            soft_sync = false;
1862         bool                            interrupted = false;
1863         int                             i;
1864         int                             grant = 0;
1865         int                             rc;
1866         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1867         struct ost_body                 *body;
1868         ENTRY;
1869         LASSERT(!list_empty(ext_list));
1870
1871         /* add pages into rpc_list to build BRW rpc */
1872         list_for_each_entry(ext, ext_list, oe_link) {
1873                 LASSERT(ext->oe_state == OES_RPC);
1874                 mem_tight |= ext->oe_memalloc;
1875                 grant += ext->oe_grants;
1876                 page_count += ext->oe_nr_pages;
1877                 if (obj == NULL)
1878                         obj = ext->oe_obj;
1879         }
1880
1881         soft_sync = osc_over_unstable_soft_limit(cli);
1882         if (mem_tight)
1883                 mpflag = cfs_memory_pressure_get_and_set();
1884
1885         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1886         if (pga == NULL)
1887                 GOTO(out, rc = -ENOMEM);
1888
1889         OBDO_ALLOC(oa);
1890         if (oa == NULL)
1891                 GOTO(out, rc = -ENOMEM);
1892
1893         i = 0;
1894         list_for_each_entry(ext, ext_list, oe_link) {
1895                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1896                         if (mem_tight)
1897                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1898                         if (soft_sync)
1899                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1900                         pga[i] = &oap->oap_brw_page;
1901                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1902                         i++;
1903
1904                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1905                         if (starting_offset == OBD_OBJECT_EOF ||
1906                             starting_offset > oap->oap_obj_off)
1907                                 starting_offset = oap->oap_obj_off;
1908                         else
1909                                 LASSERT(oap->oap_page_off == 0);
1910                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1911                                 ending_offset = oap->oap_obj_off +
1912                                                 oap->oap_count;
1913                         else
1914                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1915                                         PAGE_SIZE);
1916                         if (oap->oap_interrupted)
1917                                 interrupted = true;
1918                 }
1919         }
1920
1921         /* first page in the list */
1922         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1923
1924         crattr = &osc_env_info(env)->oti_req_attr;
1925         memset(crattr, 0, sizeof(*crattr));
1926         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1927         crattr->cra_flags = ~0ULL;
1928         crattr->cra_page = oap2cl_page(oap);
1929         crattr->cra_oa = oa;
1930         cl_req_attr_set(env, osc2cl(obj), crattr);
1931
1932         if (cmd == OBD_BRW_WRITE)
1933                 oa->o_grant_used = grant;
1934
1935         sort_brw_pages(pga, page_count);
1936         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1937         if (rc != 0) {
1938                 CERROR("prep_req failed: %d\n", rc);
1939                 GOTO(out, rc);
1940         }
1941
1942         req->rq_commit_cb = brw_commit;
1943         req->rq_interpret_reply = brw_interpret;
1944         req->rq_memalloc = mem_tight != 0;
1945         oap->oap_request = ptlrpc_request_addref(req);
1946         if (interrupted && !req->rq_intr)
1947                 ptlrpc_mark_interrupted(req);
1948
1949         /* Need to update the timestamps after the request is built in case
1950          * we race with setattr (locally or in queue at OST).  If OST gets
1951          * later setattr before earlier BRW (as determined by the request xid),
1952          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1953          * way to do this in a single call.  bug 10150 */
1954         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1955         crattr->cra_oa = &body->oa;
1956         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1957         cl_req_attr_set(env, osc2cl(obj), crattr);
1958         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1959
1960         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1961         aa = ptlrpc_req_async_args(req);
1962         INIT_LIST_HEAD(&aa->aa_oaps);
1963         list_splice_init(&rpc_list, &aa->aa_oaps);
1964         INIT_LIST_HEAD(&aa->aa_exts);
1965         list_splice_init(ext_list, &aa->aa_exts);
1966
1967         spin_lock(&cli->cl_loi_list_lock);
1968         starting_offset >>= PAGE_SHIFT;
1969         if (cmd == OBD_BRW_READ) {
1970                 cli->cl_r_in_flight++;
1971                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1972                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1973                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1974                                       starting_offset + 1);
1975         } else {
1976                 cli->cl_w_in_flight++;
1977                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1978                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1979                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1980                                       starting_offset + 1);
1981         }
1982         spin_unlock(&cli->cl_loi_list_lock);
1983
1984         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1985                   page_count, aa, cli->cl_r_in_flight,
1986                   cli->cl_w_in_flight);
1987         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1988
1989         ptlrpcd_add_req(req);
1990         rc = 0;
1991         EXIT;
1992
1993 out:
1994         if (mem_tight != 0)
1995                 cfs_memory_pressure_restore(mpflag);
1996
1997         if (rc != 0) {
1998                 LASSERT(req == NULL);
1999
2000                 if (oa)
2001                         OBDO_FREE(oa);
2002                 if (pga)
2003                         OBD_FREE(pga, sizeof(*pga) * page_count);
2004                 /* this should happen rarely and is pretty bad, it makes the
2005                  * pending list not follow the dirty order */
2006                 while (!list_empty(ext_list)) {
2007                         ext = list_entry(ext_list->next, struct osc_extent,
2008                                          oe_link);
2009                         list_del_init(&ext->oe_link);
2010                         osc_extent_finish(env, ext, 0, rc);
2011                 }
2012         }
2013         RETURN(rc);
2014 }
2015
2016 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2017 {
2018         int set = 0;
2019
2020         LASSERT(lock != NULL);
2021
2022         lock_res_and_lock(lock);
2023
2024         if (lock->l_ast_data == NULL)
2025                 lock->l_ast_data = data;
2026         if (lock->l_ast_data == data)
2027                 set = 1;
2028
2029         unlock_res_and_lock(lock);
2030
2031         return set;
2032 }
2033
2034 static int osc_enqueue_fini(struct ptlrpc_request *req,
2035                             osc_enqueue_upcall_f upcall, void *cookie,
2036                             struct lustre_handle *lockh, enum ldlm_mode mode,
2037                             __u64 *flags, bool speculative, int errcode)
2038 {
2039         bool intent = *flags & LDLM_FL_HAS_INTENT;
2040         int rc;
2041         ENTRY;
2042
2043         /* The request was created before ldlm_cli_enqueue call. */
2044         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2045                 struct ldlm_reply *rep;
2046
2047                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2048                 LASSERT(rep != NULL);
2049
2050                 rep->lock_policy_res1 =
2051                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2052                 if (rep->lock_policy_res1)
2053                         errcode = rep->lock_policy_res1;
2054                 if (!speculative)
2055                         *flags |= LDLM_FL_LVB_READY;
2056         } else if (errcode == ELDLM_OK) {
2057                 *flags |= LDLM_FL_LVB_READY;
2058         }
2059
2060         /* Call the update callback. */
2061         rc = (*upcall)(cookie, lockh, errcode);
2062
2063         /* release the reference taken in ldlm_cli_enqueue() */
2064         if (errcode == ELDLM_LOCK_MATCHED)
2065                 errcode = ELDLM_OK;
2066         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2067                 ldlm_lock_decref(lockh, mode);
2068
2069         RETURN(rc);
2070 }
2071
2072 static int osc_enqueue_interpret(const struct lu_env *env,
2073                                  struct ptlrpc_request *req,
2074                                  struct osc_enqueue_args *aa, int rc)
2075 {
2076         struct ldlm_lock *lock;
2077         struct lustre_handle *lockh = &aa->oa_lockh;
2078         enum ldlm_mode mode = aa->oa_mode;
2079         struct ost_lvb *lvb = aa->oa_lvb;
2080         __u32 lvb_len = sizeof(*lvb);
2081         __u64 flags = 0;
2082
2083         ENTRY;
2084
2085         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2086          * be valid. */
2087         lock = ldlm_handle2lock(lockh);
2088         LASSERTF(lock != NULL,
2089                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2090                  lockh->cookie, req, aa);
2091
2092         /* Take an additional reference so that a blocking AST that
2093          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2094          * to arrive after an upcall has been executed by
2095          * osc_enqueue_fini(). */
2096         ldlm_lock_addref(lockh, mode);
2097
2098         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2099         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2100
2101         /* Let CP AST to grant the lock first. */
2102         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2103
2104         if (aa->oa_speculative) {
2105                 LASSERT(aa->oa_lvb == NULL);
2106                 LASSERT(aa->oa_flags == NULL);
2107                 aa->oa_flags = &flags;
2108         }
2109
2110         /* Complete obtaining the lock procedure. */
2111         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2112                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2113                                    lockh, rc);
2114         /* Complete osc stuff. */
2115         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2116                               aa->oa_flags, aa->oa_speculative, rc);
2117
2118         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2119
2120         ldlm_lock_decref(lockh, mode);
2121         LDLM_LOCK_PUT(lock);
2122         RETURN(rc);
2123 }
2124
2125 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2126
2127 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2128  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2129  * other synchronous requests, however keeping some locks and trying to obtain
2130  * others may take a considerable amount of time in a case of ost failure; and
2131  * when other sync requests do not get released lock from a client, the client
2132  * is evicted from the cluster -- such scenarious make the life difficult, so
2133  * release locks just after they are obtained. */
2134 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2135                      __u64 *flags, union ldlm_policy_data *policy,
2136                      struct ost_lvb *lvb, int kms_valid,
2137                      osc_enqueue_upcall_f upcall, void *cookie,
2138                      struct ldlm_enqueue_info *einfo,
2139                      struct ptlrpc_request_set *rqset, int async,
2140                      bool speculative)
2141 {
2142         struct obd_device *obd = exp->exp_obd;
2143         struct lustre_handle lockh = { 0 };
2144         struct ptlrpc_request *req = NULL;
2145         int intent = *flags & LDLM_FL_HAS_INTENT;
2146         __u64 match_flags = *flags;
2147         enum ldlm_mode mode;
2148         int rc;
2149         ENTRY;
2150
2151         /* Filesystem lock extents are extended to page boundaries so that
2152          * dealing with the page cache is a little smoother.  */
2153         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2154         policy->l_extent.end |= ~PAGE_MASK;
2155
2156         /*
2157          * kms is not valid when either object is completely fresh (so that no
2158          * locks are cached), or object was evicted. In the latter case cached
2159          * lock cannot be used, because it would prime inode state with
2160          * potentially stale LVB.
2161          */
2162         if (!kms_valid)
2163                 goto no_match;
2164
2165         /* Next, search for already existing extent locks that will cover us */
2166         /* If we're trying to read, we also search for an existing PW lock.  The
2167          * VFS and page cache already protect us locally, so lots of readers/
2168          * writers can share a single PW lock.
2169          *
2170          * There are problems with conversion deadlocks, so instead of
2171          * converting a read lock to a write lock, we'll just enqueue a new
2172          * one.
2173          *
2174          * At some point we should cancel the read lock instead of making them
2175          * send us a blocking callback, but there are problems with canceling
2176          * locks out from other users right now, too. */
2177         mode = einfo->ei_mode;
2178         if (einfo->ei_mode == LCK_PR)
2179                 mode |= LCK_PW;
2180         /* Normal lock requests must wait for the LVB to be ready before
2181          * matching a lock; speculative lock requests do not need to,
2182          * because they will not actually use the lock. */
2183         if (!speculative)
2184                 match_flags |= LDLM_FL_LVB_READY;
2185         if (intent != 0)
2186                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2187         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2188                                einfo->ei_type, policy, mode, &lockh, 0);
2189         if (mode) {
2190                 struct ldlm_lock *matched;
2191
2192                 if (*flags & LDLM_FL_TEST_LOCK)
2193                         RETURN(ELDLM_OK);
2194
2195                 matched = ldlm_handle2lock(&lockh);
2196                 if (speculative) {
2197                         /* This DLM lock request is speculative, and does not
2198                          * have an associated IO request. Therefore if there
2199                          * is already a DLM lock, it wll just inform the
2200                          * caller to cancel the request for this stripe.*/
2201                         lock_res_and_lock(matched);
2202                         if (ldlm_extent_equal(&policy->l_extent,
2203                             &matched->l_policy_data.l_extent))
2204                                 rc = -EEXIST;
2205                         else
2206                                 rc = -ECANCELED;
2207                         unlock_res_and_lock(matched);
2208
2209                         ldlm_lock_decref(&lockh, mode);
2210                         LDLM_LOCK_PUT(matched);
2211                         RETURN(rc);
2212                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2213                         *flags |= LDLM_FL_LVB_READY;
2214
2215                         /* We already have a lock, and it's referenced. */
2216                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2217
2218                         ldlm_lock_decref(&lockh, mode);
2219                         LDLM_LOCK_PUT(matched);
2220                         RETURN(ELDLM_OK);
2221                 } else {
2222                         ldlm_lock_decref(&lockh, mode);
2223                         LDLM_LOCK_PUT(matched);
2224                 }
2225         }
2226
2227 no_match:
2228         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2229                 RETURN(-ENOLCK);
2230
2231         if (intent) {
2232                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2233                                            &RQF_LDLM_ENQUEUE_LVB);
2234                 if (req == NULL)
2235                         RETURN(-ENOMEM);
2236
2237                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2238                 if (rc) {
2239                         ptlrpc_request_free(req);
2240                         RETURN(rc);
2241                 }
2242
2243                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2244                                      sizeof *lvb);
2245                 ptlrpc_request_set_replen(req);
2246         }
2247
2248         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2249         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2250
2251         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2252                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2253         if (async) {
2254                 if (!rc) {
2255                         struct osc_enqueue_args *aa;
2256                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2257                         aa = ptlrpc_req_async_args(req);
2258                         aa->oa_exp         = exp;
2259                         aa->oa_mode        = einfo->ei_mode;
2260                         aa->oa_type        = einfo->ei_type;
2261                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2262                         aa->oa_upcall      = upcall;
2263                         aa->oa_cookie      = cookie;
2264                         aa->oa_speculative = speculative;
2265                         if (!speculative) {
2266                                 aa->oa_flags  = flags;
2267                                 aa->oa_lvb    = lvb;
2268                         } else {
2269                                 /* speculative locks are essentially to enqueue
2270                                  * a DLM lock  in advance, so we don't care
2271                                  * about the result of the enqueue. */
2272                                 aa->oa_lvb    = NULL;
2273                                 aa->oa_flags  = NULL;
2274                         }
2275
2276                         req->rq_interpret_reply =
2277                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2278                         if (rqset == PTLRPCD_SET)
2279                                 ptlrpcd_add_req(req);
2280                         else
2281                                 ptlrpc_set_add_req(rqset, req);
2282                 } else if (intent) {
2283                         ptlrpc_req_finished(req);
2284                 }
2285                 RETURN(rc);
2286         }
2287
2288         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2289                               flags, speculative, rc);
2290         if (intent)
2291                 ptlrpc_req_finished(req);
2292
2293         RETURN(rc);
2294 }
2295
2296 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2297                    enum ldlm_type type, union ldlm_policy_data *policy,
2298                    enum ldlm_mode mode, __u64 *flags, void *data,
2299                    struct lustre_handle *lockh, int unref)
2300 {
2301         struct obd_device *obd = exp->exp_obd;
2302         __u64 lflags = *flags;
2303         enum ldlm_mode rc;
2304         ENTRY;
2305
2306         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2307                 RETURN(-EIO);
2308
2309         /* Filesystem lock extents are extended to page boundaries so that
2310          * dealing with the page cache is a little smoother */
2311         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2312         policy->l_extent.end |= ~PAGE_MASK;
2313
2314         /* Next, search for already existing extent locks that will cover us */
2315         /* If we're trying to read, we also search for an existing PW lock.  The
2316          * VFS and page cache already protect us locally, so lots of readers/
2317          * writers can share a single PW lock. */
2318         rc = mode;
2319         if (mode == LCK_PR)
2320                 rc |= LCK_PW;
2321         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2322                              res_id, type, policy, rc, lockh, unref);
2323         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2324                 RETURN(rc);
2325
2326         if (data != NULL) {
2327                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2328
2329                 LASSERT(lock != NULL);
2330                 if (!osc_set_lock_data(lock, data)) {
2331                         ldlm_lock_decref(lockh, rc);
2332                         rc = 0;
2333                 }
2334                 LDLM_LOCK_PUT(lock);
2335         }
2336         RETURN(rc);
2337 }
2338
2339 static int osc_statfs_interpret(const struct lu_env *env,
2340                                 struct ptlrpc_request *req,
2341                                 struct osc_async_args *aa, int rc)
2342 {
2343         struct obd_statfs *msfs;
2344         ENTRY;
2345
2346         if (rc == -EBADR)
2347                 /* The request has in fact never been sent
2348                  * due to issues at a higher level (LOV).
2349                  * Exit immediately since the caller is
2350                  * aware of the problem and takes care
2351                  * of the clean up */
2352                  RETURN(rc);
2353
2354         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2355             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2356                 GOTO(out, rc = 0);
2357
2358         if (rc != 0)
2359                 GOTO(out, rc);
2360
2361         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2362         if (msfs == NULL) {
2363                 GOTO(out, rc = -EPROTO);
2364         }
2365
2366         *aa->aa_oi->oi_osfs = *msfs;
2367 out:
2368         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2369         RETURN(rc);
2370 }
2371
2372 static int osc_statfs_async(struct obd_export *exp,
2373                             struct obd_info *oinfo, __u64 max_age,
2374                             struct ptlrpc_request_set *rqset)
2375 {
2376         struct obd_device     *obd = class_exp2obd(exp);
2377         struct ptlrpc_request *req;
2378         struct osc_async_args *aa;
2379         int                    rc;
2380         ENTRY;
2381
2382         /* We could possibly pass max_age in the request (as an absolute
2383          * timestamp or a "seconds.usec ago") so the target can avoid doing
2384          * extra calls into the filesystem if that isn't necessary (e.g.
2385          * during mount that would help a bit).  Having relative timestamps
2386          * is not so great if request processing is slow, while absolute
2387          * timestamps are not ideal because they need time synchronization. */
2388         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2389         if (req == NULL)
2390                 RETURN(-ENOMEM);
2391
2392         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2393         if (rc) {
2394                 ptlrpc_request_free(req);
2395                 RETURN(rc);
2396         }
2397         ptlrpc_request_set_replen(req);
2398         req->rq_request_portal = OST_CREATE_PORTAL;
2399         ptlrpc_at_set_req_timeout(req);
2400
2401         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2402                 /* procfs requests not want stat in wait for avoid deadlock */
2403                 req->rq_no_resend = 1;
2404                 req->rq_no_delay = 1;
2405         }
2406
2407         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2408         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2409         aa = ptlrpc_req_async_args(req);
2410         aa->aa_oi = oinfo;
2411
2412         ptlrpc_set_add_req(rqset, req);
2413         RETURN(0);
2414 }
2415
2416 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2417                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2418 {
2419         struct obd_device     *obd = class_exp2obd(exp);
2420         struct obd_statfs     *msfs;
2421         struct ptlrpc_request *req;
2422         struct obd_import     *imp = NULL;
2423         int rc;
2424         ENTRY;
2425
2426         /*Since the request might also come from lprocfs, so we need
2427          *sync this with client_disconnect_export Bug15684*/
2428         down_read(&obd->u.cli.cl_sem);
2429         if (obd->u.cli.cl_import)
2430                 imp = class_import_get(obd->u.cli.cl_import);
2431         up_read(&obd->u.cli.cl_sem);
2432         if (!imp)
2433                 RETURN(-ENODEV);
2434
2435         /* We could possibly pass max_age in the request (as an absolute
2436          * timestamp or a "seconds.usec ago") so the target can avoid doing
2437          * extra calls into the filesystem if that isn't necessary (e.g.
2438          * during mount that would help a bit).  Having relative timestamps
2439          * is not so great if request processing is slow, while absolute
2440          * timestamps are not ideal because they need time synchronization. */
2441         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2442
2443         class_import_put(imp);
2444
2445         if (req == NULL)
2446                 RETURN(-ENOMEM);
2447
2448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2449         if (rc) {
2450                 ptlrpc_request_free(req);
2451                 RETURN(rc);
2452         }
2453         ptlrpc_request_set_replen(req);
2454         req->rq_request_portal = OST_CREATE_PORTAL;
2455         ptlrpc_at_set_req_timeout(req);
2456
2457         if (flags & OBD_STATFS_NODELAY) {
2458                 /* procfs requests not want stat in wait for avoid deadlock */
2459                 req->rq_no_resend = 1;
2460                 req->rq_no_delay = 1;
2461         }
2462
2463         rc = ptlrpc_queue_wait(req);
2464         if (rc)
2465                 GOTO(out, rc);
2466
2467         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2468         if (msfs == NULL) {
2469                 GOTO(out, rc = -EPROTO);
2470         }
2471
2472         *osfs = *msfs;
2473
2474         EXIT;
2475  out:
2476         ptlrpc_req_finished(req);
2477         return rc;
2478 }
2479
2480 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2481                          void *karg, void __user *uarg)
2482 {
2483         struct obd_device *obd = exp->exp_obd;
2484         struct obd_ioctl_data *data = karg;
2485         int err = 0;
2486         ENTRY;
2487
2488         if (!try_module_get(THIS_MODULE)) {
2489                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2490                        module_name(THIS_MODULE));
2491                 return -EINVAL;
2492         }
2493         switch (cmd) {
2494         case OBD_IOC_CLIENT_RECOVER:
2495                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2496                                             data->ioc_inlbuf1, 0);
2497                 if (err > 0)
2498                         err = 0;
2499                 GOTO(out, err);
2500         case IOC_OSC_SET_ACTIVE:
2501                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2502                                                data->ioc_offset);
2503                 GOTO(out, err);
2504         case OBD_IOC_PING_TARGET:
2505                 err = ptlrpc_obd_ping(obd);
2506                 GOTO(out, err);
2507         default:
2508                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2509                        cmd, current_comm());
2510                 GOTO(out, err = -ENOTTY);
2511         }
2512 out:
2513         module_put(THIS_MODULE);
2514         return err;
2515 }
2516
2517 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2518                        u32 keylen, void *key, u32 vallen, void *val,
2519                        struct ptlrpc_request_set *set)
2520 {
2521         struct ptlrpc_request *req;
2522         struct obd_device     *obd = exp->exp_obd;
2523         struct obd_import     *imp = class_exp2cliimp(exp);
2524         char                  *tmp;
2525         int                    rc;
2526         ENTRY;
2527
2528         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2529
2530         if (KEY_IS(KEY_CHECKSUM)) {
2531                 if (vallen != sizeof(int))
2532                         RETURN(-EINVAL);
2533                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2534                 RETURN(0);
2535         }
2536
2537         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2538                 sptlrpc_conf_client_adapt(obd);
2539                 RETURN(0);
2540         }
2541
2542         if (KEY_IS(KEY_FLUSH_CTX)) {
2543                 sptlrpc_import_flush_my_ctx(imp);
2544                 RETURN(0);
2545         }
2546
2547         if (KEY_IS(KEY_CACHE_SET)) {
2548                 struct client_obd *cli = &obd->u.cli;
2549
2550                 LASSERT(cli->cl_cache == NULL); /* only once */
2551                 cli->cl_cache = (struct cl_client_cache *)val;
2552                 cl_cache_incref(cli->cl_cache);
2553                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2554
2555                 /* add this osc into entity list */
2556                 LASSERT(list_empty(&cli->cl_lru_osc));
2557                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2558                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2559                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2560
2561                 RETURN(0);
2562         }
2563
2564         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2565                 struct client_obd *cli = &obd->u.cli;
2566                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2567                 long target = *(long *)val;
2568
2569                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2570                 *(long *)val -= nr;
2571                 RETURN(0);
2572         }
2573
2574         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2575                 RETURN(-EINVAL);
2576
2577         /* We pass all other commands directly to OST. Since nobody calls osc
2578            methods directly and everybody is supposed to go through LOV, we
2579            assume lov checked invalid values for us.
2580            The only recognised values so far are evict_by_nid and mds_conn.
2581            Even if something bad goes through, we'd get a -EINVAL from OST
2582            anyway. */
2583
2584         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2585                                                 &RQF_OST_SET_GRANT_INFO :
2586                                                 &RQF_OBD_SET_INFO);
2587         if (req == NULL)
2588                 RETURN(-ENOMEM);
2589
2590         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2591                              RCL_CLIENT, keylen);
2592         if (!KEY_IS(KEY_GRANT_SHRINK))
2593                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2594                                      RCL_CLIENT, vallen);
2595         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2596         if (rc) {
2597                 ptlrpc_request_free(req);
2598                 RETURN(rc);
2599         }
2600
2601         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2602         memcpy(tmp, key, keylen);
2603         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2604                                                         &RMF_OST_BODY :
2605                                                         &RMF_SETINFO_VAL);
2606         memcpy(tmp, val, vallen);
2607
2608         if (KEY_IS(KEY_GRANT_SHRINK)) {
2609                 struct osc_grant_args *aa;
2610                 struct obdo *oa;
2611
2612                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2613                 aa = ptlrpc_req_async_args(req);
2614                 OBDO_ALLOC(oa);
2615                 if (!oa) {
2616                         ptlrpc_req_finished(req);
2617                         RETURN(-ENOMEM);
2618                 }
2619                 *oa = ((struct ost_body *)val)->oa;
2620                 aa->aa_oa = oa;
2621                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2622         }
2623
2624         ptlrpc_request_set_replen(req);
2625         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2626                 LASSERT(set != NULL);
2627                 ptlrpc_set_add_req(set, req);
2628                 ptlrpc_check_set(NULL, set);
2629         } else {
2630                 ptlrpcd_add_req(req);
2631         }
2632
2633         RETURN(0);
2634 }
2635 EXPORT_SYMBOL(osc_set_info_async);
2636
2637 static int osc_reconnect(const struct lu_env *env,
2638                          struct obd_export *exp, struct obd_device *obd,
2639                          struct obd_uuid *cluuid,
2640                          struct obd_connect_data *data,
2641                          void *localdata)
2642 {
2643         struct client_obd *cli = &obd->u.cli;
2644
2645         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2646                 long lost_grant;
2647                 long grant;
2648
2649                 spin_lock(&cli->cl_loi_list_lock);
2650                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2651                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2652                         grant += cli->cl_dirty_grant;
2653                 else
2654                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2655                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2656                 lost_grant = cli->cl_lost_grant;
2657                 cli->cl_lost_grant = 0;
2658                 spin_unlock(&cli->cl_loi_list_lock);
2659
2660                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2661                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2662                        data->ocd_version, data->ocd_grant, lost_grant);
2663         }
2664
2665         RETURN(0);
2666 }
2667
2668 static int osc_disconnect(struct obd_export *exp)
2669 {
2670         struct obd_device *obd = class_exp2obd(exp);
2671         int rc;
2672
2673         rc = client_disconnect_export(exp);
2674         /**
2675          * Initially we put del_shrink_grant before disconnect_export, but it
2676          * causes the following problem if setup (connect) and cleanup
2677          * (disconnect) are tangled together.
2678          *      connect p1                     disconnect p2
2679          *   ptlrpc_connect_import
2680          *     ...............               class_manual_cleanup
2681          *                                     osc_disconnect
2682          *                                     del_shrink_grant
2683          *   ptlrpc_connect_interrupt
2684          *     init_grant_shrink
2685          *   add this client to shrink list
2686          *                                      cleanup_osc
2687          * Bang! pinger trigger the shrink.
2688          * So the osc should be disconnected from the shrink list, after we
2689          * are sure the import has been destroyed. BUG18662
2690          */
2691         if (obd->u.cli.cl_import == NULL)
2692                 osc_del_shrink_grant(&obd->u.cli);
2693         return rc;
2694 }
2695
2696 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2697                                  struct hlist_node *hnode, void *arg)
2698 {
2699         struct lu_env *env = arg;
2700         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2701         struct ldlm_lock *lock;
2702         struct osc_object *osc = NULL;
2703         ENTRY;
2704
2705         lock_res(res);
2706         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2707                 if (lock->l_ast_data != NULL && osc == NULL) {
2708                         osc = lock->l_ast_data;
2709                         cl_object_get(osc2cl(osc));
2710                 }
2711
2712                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2713                  * by the 2nd round of ldlm_namespace_clean() call in
2714                  * osc_import_event(). */
2715                 ldlm_clear_cleaned(lock);
2716         }
2717         unlock_res(res);
2718
2719         if (osc != NULL) {
2720                 osc_object_invalidate(env, osc);
2721                 cl_object_put(env, osc2cl(osc));
2722         }
2723
2724         RETURN(0);
2725 }
2726 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2727
2728 static int osc_import_event(struct obd_device *obd,
2729                             struct obd_import *imp,
2730                             enum obd_import_event event)
2731 {
2732         struct client_obd *cli;
2733         int rc = 0;
2734
2735         ENTRY;
2736         LASSERT(imp->imp_obd == obd);
2737
2738         switch (event) {
2739         case IMP_EVENT_DISCON: {
2740                 cli = &obd->u.cli;
2741                 spin_lock(&cli->cl_loi_list_lock);
2742                 cli->cl_avail_grant = 0;
2743                 cli->cl_lost_grant = 0;
2744                 spin_unlock(&cli->cl_loi_list_lock);
2745                 break;
2746         }
2747         case IMP_EVENT_INACTIVE: {
2748                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2749                 break;
2750         }
2751         case IMP_EVENT_INVALIDATE: {
2752                 struct ldlm_namespace *ns = obd->obd_namespace;
2753                 struct lu_env         *env;
2754                 __u16                  refcheck;
2755
2756                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2757
2758                 env = cl_env_get(&refcheck);
2759                 if (!IS_ERR(env)) {
2760                         osc_io_unplug(env, &obd->u.cli, NULL);
2761
2762                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2763                                                  osc_ldlm_resource_invalidate,
2764                                                  env, 0);
2765                         cl_env_put(env, &refcheck);
2766
2767                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2768                 } else
2769                         rc = PTR_ERR(env);
2770                 break;
2771         }
2772         case IMP_EVENT_ACTIVE: {
2773                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2774                 break;
2775         }
2776         case IMP_EVENT_OCD: {
2777                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2778
2779                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2780                         osc_init_grant(&obd->u.cli, ocd);
2781
2782                 /* See bug 7198 */
2783                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2784                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2785
2786                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2787                 break;
2788         }
2789         case IMP_EVENT_DEACTIVATE: {
2790                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2791                 break;
2792         }
2793         case IMP_EVENT_ACTIVATE: {
2794                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2795                 break;
2796         }
2797         default:
2798                 CERROR("Unknown import event %d\n", event);
2799                 LBUG();
2800         }
2801         RETURN(rc);
2802 }
2803
2804 /**
2805  * Determine whether the lock can be canceled before replaying the lock
2806  * during recovery, see bug16774 for detailed information.
2807  *
2808  * \retval zero the lock can't be canceled
2809  * \retval other ok to cancel
2810  */
2811 static int osc_cancel_weight(struct ldlm_lock *lock)
2812 {
2813         /*
2814          * Cancel all unused and granted extent lock.
2815          */
2816         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2817             lock->l_granted_mode == lock->l_req_mode &&
2818             osc_ldlm_weigh_ast(lock) == 0)
2819                 RETURN(1);
2820
2821         RETURN(0);
2822 }
2823
2824 static int brw_queue_work(const struct lu_env *env, void *data)
2825 {
2826         struct client_obd *cli = data;
2827
2828         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2829
2830         osc_io_unplug(env, cli, NULL);
2831         RETURN(0);
2832 }
2833
2834 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2835 {
2836         struct client_obd *cli = &obd->u.cli;
2837         void *handler;
2838         int rc;
2839
2840         ENTRY;
2841
2842         rc = ptlrpcd_addref();
2843         if (rc)
2844                 RETURN(rc);
2845
2846         rc = client_obd_setup(obd, lcfg);
2847         if (rc)
2848                 GOTO(out_ptlrpcd, rc);
2849
2850
2851         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2852         if (IS_ERR(handler))
2853                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2854         cli->cl_writeback_work = handler;
2855
2856         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2857         if (IS_ERR(handler))
2858                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2859         cli->cl_lru_work = handler;
2860
2861         rc = osc_quota_setup(obd);
2862         if (rc)
2863                 GOTO(out_ptlrpcd_work, rc);
2864
2865         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2866
2867         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2868         RETURN(rc);
2869
2870 out_ptlrpcd_work:
2871         if (cli->cl_writeback_work != NULL) {
2872                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2873                 cli->cl_writeback_work = NULL;
2874         }
2875         if (cli->cl_lru_work != NULL) {
2876                 ptlrpcd_destroy_work(cli->cl_lru_work);
2877                 cli->cl_lru_work = NULL;
2878         }
2879         client_obd_cleanup(obd);
2880 out_ptlrpcd:
2881         ptlrpcd_decref();
2882         RETURN(rc);
2883 }
2884 EXPORT_SYMBOL(osc_setup_common);
2885
2886 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2887 {
2888         struct client_obd *cli = &obd->u.cli;
2889         struct obd_type   *type;
2890         int                adding;
2891         int                added;
2892         int                req_count;
2893         int                rc;
2894
2895         ENTRY;
2896
2897         rc = osc_setup_common(obd, lcfg);
2898         if (rc < 0)
2899                 RETURN(rc);
2900
2901 #ifdef CONFIG_PROC_FS
2902         obd->obd_vars = lprocfs_osc_obd_vars;
2903 #endif
2904         /* If this is true then both client (osc) and server (osp) are on the
2905          * same node. The osp layer if loaded first will register the osc proc
2906          * directory. In that case this obd_device will be attached its proc
2907          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2908          */
2909         type = class_search_type(LUSTRE_OSP_NAME);
2910         if (type && type->typ_procsym) {
2911                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2912                                                        type->typ_procsym,
2913                                                        obd->obd_vars, obd);
2914                 if (IS_ERR(obd->obd_proc_entry)) {
2915                         rc = PTR_ERR(obd->obd_proc_entry);
2916                         CERROR("error %d setting up lprocfs for %s\n", rc,
2917                                obd->obd_name);
2918                         obd->obd_proc_entry = NULL;
2919                 }
2920         }
2921
2922         rc = lprocfs_obd_setup(obd, false);
2923         if (!rc) {
2924                 /* If the basic OSC proc tree construction succeeded then
2925                  * lets do the rest.
2926                  */
2927                 lproc_osc_attach_seqstat(obd);
2928                 sptlrpc_lprocfs_cliobd_attach(obd);
2929                 ptlrpc_lprocfs_register_obd(obd);
2930         }
2931
2932         /*
2933          * We try to control the total number of requests with a upper limit
2934          * osc_reqpool_maxreqcount. There might be some race which will cause
2935          * over-limit allocation, but it is fine.
2936          */
2937         req_count = atomic_read(&osc_pool_req_count);
2938         if (req_count < osc_reqpool_maxreqcount) {
2939                 adding = cli->cl_max_rpcs_in_flight + 2;
2940                 if (req_count + adding > osc_reqpool_maxreqcount)
2941                         adding = osc_reqpool_maxreqcount - req_count;
2942
2943                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2944                 atomic_add(added, &osc_pool_req_count);
2945         }
2946
2947         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2948         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2949
2950         spin_lock(&osc_shrink_lock);
2951         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2952         spin_unlock(&osc_shrink_lock);
2953
2954         RETURN(0);
2955 }
2956
2957 int osc_precleanup_common(struct obd_device *obd)
2958 {
2959         struct client_obd *cli = &obd->u.cli;
2960         ENTRY;
2961
2962         /* LU-464
2963          * for echo client, export may be on zombie list, wait for
2964          * zombie thread to cull it, because cli.cl_import will be
2965          * cleared in client_disconnect_export():
2966          *   class_export_destroy() -> obd_cleanup() ->
2967          *   echo_device_free() -> echo_client_cleanup() ->
2968          *   obd_disconnect() -> osc_disconnect() ->
2969          *   client_disconnect_export()
2970          */
2971         obd_zombie_barrier();
2972         if (cli->cl_writeback_work) {
2973                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2974                 cli->cl_writeback_work = NULL;
2975         }
2976
2977         if (cli->cl_lru_work) {
2978                 ptlrpcd_destroy_work(cli->cl_lru_work);
2979                 cli->cl_lru_work = NULL;
2980         }
2981
2982         obd_cleanup_client_import(obd);
2983         RETURN(0);
2984 }
2985 EXPORT_SYMBOL(osc_precleanup_common);
2986
2987 static int osc_precleanup(struct obd_device *obd)
2988 {
2989         ENTRY;
2990
2991         osc_precleanup_common(obd);
2992
2993         ptlrpc_lprocfs_unregister_obd(obd);
2994         lprocfs_obd_cleanup(obd);
2995         RETURN(0);
2996 }
2997
2998 int osc_cleanup_common(struct obd_device *obd)
2999 {
3000         struct client_obd *cli = &obd->u.cli;
3001         int rc;
3002
3003         ENTRY;
3004
3005         spin_lock(&osc_shrink_lock);
3006         list_del(&cli->cl_shrink_list);
3007         spin_unlock(&osc_shrink_lock);
3008
3009         /* lru cleanup */
3010         if (cli->cl_cache != NULL) {
3011                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3012                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3013                 list_del_init(&cli->cl_lru_osc);
3014                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3015                 cli->cl_lru_left = NULL;
3016                 cl_cache_decref(cli->cl_cache);
3017                 cli->cl_cache = NULL;
3018         }
3019
3020         /* free memory of osc quota cache */
3021         osc_quota_cleanup(obd);
3022
3023         rc = client_obd_cleanup(obd);
3024
3025         ptlrpcd_decref();
3026         RETURN(rc);
3027 }
3028 EXPORT_SYMBOL(osc_cleanup_common);
3029
3030 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3031 {
3032         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3033         return rc > 0 ? 0: rc;
3034 }
3035
3036 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3037 {
3038         return osc_process_config_base(obd, buf);
3039 }
3040
3041 static struct obd_ops osc_obd_ops = {
3042         .o_owner                = THIS_MODULE,
3043         .o_setup                = osc_setup,
3044         .o_precleanup           = osc_precleanup,
3045         .o_cleanup              = osc_cleanup_common,
3046         .o_add_conn             = client_import_add_conn,
3047         .o_del_conn             = client_import_del_conn,
3048         .o_connect              = client_connect_import,
3049         .o_reconnect            = osc_reconnect,
3050         .o_disconnect           = osc_disconnect,
3051         .o_statfs               = osc_statfs,
3052         .o_statfs_async         = osc_statfs_async,
3053         .o_create               = osc_create,
3054         .o_destroy              = osc_destroy,
3055         .o_getattr              = osc_getattr,
3056         .o_setattr              = osc_setattr,
3057         .o_iocontrol            = osc_iocontrol,
3058         .o_set_info_async       = osc_set_info_async,
3059         .o_import_event         = osc_import_event,
3060         .o_process_config       = osc_process_config,
3061         .o_quotactl             = osc_quotactl,
3062 };
3063
3064 static struct shrinker *osc_cache_shrinker;
3065 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3066 DEFINE_SPINLOCK(osc_shrink_lock);
3067
3068 #ifndef HAVE_SHRINKER_COUNT
3069 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3070 {
3071         struct shrink_control scv = {
3072                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3073                 .gfp_mask   = shrink_param(sc, gfp_mask)
3074         };
3075 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3076         struct shrinker *shrinker = NULL;
3077 #endif
3078
3079         (void)osc_cache_shrink_scan(shrinker, &scv);
3080
3081         return osc_cache_shrink_count(shrinker, &scv);
3082 }
3083 #endif
3084
3085 static int __init osc_init(void)
3086 {
3087         bool enable_proc = true;
3088         struct obd_type *type;
3089         unsigned int reqpool_size;
3090         unsigned int reqsize;
3091         int rc;
3092         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3093                          osc_cache_shrink_count, osc_cache_shrink_scan);
3094         ENTRY;
3095
3096         /* print an address of _any_ initialized kernel symbol from this
3097          * module, to allow debugging with gdb that doesn't support data
3098          * symbols from modules.*/
3099         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3100
3101         rc = lu_kmem_init(osc_caches);
3102         if (rc)
3103                 RETURN(rc);
3104
3105         type = class_search_type(LUSTRE_OSP_NAME);
3106         if (type != NULL && type->typ_procsym != NULL)
3107                 enable_proc = false;
3108
3109         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3110                                  LUSTRE_OSC_NAME, &osc_device_type);
3111         if (rc)
3112                 GOTO(out_kmem, rc);
3113
3114         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3115
3116         /* This is obviously too much memory, only prevent overflow here */
3117         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3118                 GOTO(out_type, rc = -EINVAL);
3119
3120         reqpool_size = osc_reqpool_mem_max << 20;
3121
3122         reqsize = 1;
3123         while (reqsize < OST_IO_MAXREQSIZE)
3124                 reqsize = reqsize << 1;
3125
3126         /*
3127          * We don't enlarge the request count in OSC pool according to
3128          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3129          * tried after normal allocation failed. So a small OSC pool won't
3130          * cause much performance degression in most of cases.
3131          */
3132         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3133
3134         atomic_set(&osc_pool_req_count, 0);
3135         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3136                                           ptlrpc_add_rqs_to_pool);
3137
3138         if (osc_rq_pool != NULL)
3139                 GOTO(out, rc);
3140         rc = -ENOMEM;
3141 out_type:
3142         class_unregister_type(LUSTRE_OSC_NAME);
3143 out_kmem:
3144         lu_kmem_fini(osc_caches);
3145 out:
3146         RETURN(rc);
3147 }
3148
3149 static void __exit osc_exit(void)
3150 {
3151         remove_shrinker(osc_cache_shrinker);
3152         class_unregister_type(LUSTRE_OSC_NAME);
3153         lu_kmem_fini(osc_caches);
3154         ptlrpc_free_rq_pool(osc_rq_pool);
3155 }
3156
3157 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3158 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3159 MODULE_VERSION(LUSTRE_VERSION_STRING);
3160 MODULE_LICENSE("GPL");
3161
3162 module_init(osc_init);
3163 module_exit(osc_exit);