Whamcloud - gitweb
LU-5710 all: third batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct cl_req            *aa_clerq;
66 };
67
68 #define osc_grant_args osc_brw_async_args
69
70 struct osc_setattr_args {
71         struct obdo             *sa_oa;
72         obd_enqueue_update_f     sa_upcall;
73         void                    *sa_cookie;
74 };
75
76 struct osc_fsync_args {
77         struct obdo             *fa_oa;
78         obd_enqueue_update_f     fa_upcall;
79         void                    *fa_cookie;
80 };
81
82 struct osc_enqueue_args {
83         struct obd_export       *oa_exp;
84         ldlm_type_t             oa_type;
85         ldlm_mode_t             oa_mode;
86         __u64                   *oa_flags;
87         osc_enqueue_upcall_f    oa_upcall;
88         void                    *oa_cookie;
89         struct ost_lvb          *oa_lvb;
90         struct lustre_handle    oa_lockh;
91         unsigned int            oa_agl:1;
92 };
93
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
96                          void *data, int rc);
97
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
99 {
100         struct ost_body *body;
101
102         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
103         LASSERT(body);
104
105         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
106 }
107
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
109                        struct obdo *oa)
110 {
111         struct ptlrpc_request   *req;
112         struct ost_body         *body;
113         int                      rc;
114
115         ENTRY;
116         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
117         if (req == NULL)
118                 RETURN(-ENOMEM);
119
120         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
121         if (rc) {
122                 ptlrpc_request_free(req);
123                 RETURN(rc);
124         }
125
126         osc_pack_req_body(req, oa);
127
128         ptlrpc_request_set_replen(req);
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc)
132                 GOTO(out, rc);
133
134         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
135         if (body == NULL)
136                 GOTO(out, rc = -EPROTO);
137
138         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
140
141         oa->o_blksize = cli_brw_size(exp->exp_obd);
142         oa->o_valid |= OBD_MD_FLBLKSZ;
143
144         EXIT;
145 out:
146         ptlrpc_req_finished(req);
147
148         return rc;
149 }
150
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
152                        struct obdo *oa)
153 {
154         struct ptlrpc_request   *req;
155         struct ost_body         *body;
156         int                      rc;
157
158         ENTRY;
159         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
160
161         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
162         if (req == NULL)
163                 RETURN(-ENOMEM);
164
165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
166         if (rc) {
167                 ptlrpc_request_free(req);
168                 RETURN(rc);
169         }
170
171         osc_pack_req_body(req, oa);
172
173         ptlrpc_request_set_replen(req);
174
175         rc = ptlrpc_queue_wait(req);
176         if (rc)
177                 GOTO(out, rc);
178
179         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
180         if (body == NULL)
181                 GOTO(out, rc = -EPROTO);
182
183         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
184
185         EXIT;
186 out:
187         ptlrpc_req_finished(req);
188
189         RETURN(rc);
190 }
191
192 static int osc_setattr_interpret(const struct lu_env *env,
193                                  struct ptlrpc_request *req,
194                                  struct osc_setattr_args *sa, int rc)
195 {
196         struct ost_body *body;
197         ENTRY;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body == NULL)
204                 GOTO(out, rc = -EPROTO);
205
206         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
207                              &body->oa);
208 out:
209         rc = sa->sa_upcall(sa->sa_cookie, rc);
210         RETURN(rc);
211 }
212
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214                       obd_enqueue_update_f upcall, void *cookie,
215                       struct ptlrpc_request_set *rqset)
216 {
217         struct ptlrpc_request   *req;
218         struct osc_setattr_args *sa;
219         int                      rc;
220
221         ENTRY;
222
223         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
224         if (req == NULL)
225                 RETURN(-ENOMEM);
226
227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228         if (rc) {
229                 ptlrpc_request_free(req);
230                 RETURN(rc);
231         }
232
233         osc_pack_req_body(req, oa);
234
235         ptlrpc_request_set_replen(req);
236
237         /* do mds to ost setattr asynchronously */
238         if (!rqset) {
239                 /* Do not wait for response. */
240                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
241         } else {
242                 req->rq_interpret_reply =
243                         (ptlrpc_interpterer_t)osc_setattr_interpret;
244
245                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246                 sa = ptlrpc_req_async_args(req);
247                 sa->sa_oa = oa;
248                 sa->sa_upcall = upcall;
249                 sa->sa_cookie = cookie;
250
251                 if (rqset == PTLRPCD_SET)
252                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
253                 else
254                         ptlrpc_set_add_req(rqset, req);
255         }
256
257         RETURN(0);
258 }
259
260 static int osc_create(const struct lu_env *env, struct obd_export *exp,
261                       struct obdo *oa)
262 {
263         struct ptlrpc_request *req;
264         struct ost_body       *body;
265         int                    rc;
266         ENTRY;
267
268         LASSERT(oa != NULL);
269         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
270         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
273         if (req == NULL)
274                 GOTO(out, rc = -ENOMEM);
275
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 GOTO(out, rc);
280         }
281
282         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
283         LASSERT(body);
284
285         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out_req, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out_req, rc = -EPROTO);
296
297         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
299
300         oa->o_blksize = cli_brw_size(exp->exp_obd);
301         oa->o_valid |= OBD_MD_FLBLKSZ;
302
303         CDEBUG(D_HA, "transno: "LPD64"\n",
304                lustre_msg_get_transno(req->rq_repmsg));
305 out_req:
306         ptlrpc_req_finished(req);
307 out:
308         RETURN(rc);
309 }
310
311 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
312                    obd_enqueue_update_f upcall, void *cookie,
313                    struct ptlrpc_request_set *rqset)
314 {
315         struct ptlrpc_request   *req;
316         struct osc_setattr_args *sa;
317         struct ost_body         *body;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
331         ptlrpc_at_set_req_timeout(req);
332
333         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334         LASSERT(body);
335         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
336
337         ptlrpc_request_set_replen(req);
338
339         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
340         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
341         sa = ptlrpc_req_async_args(req);
342         sa->sa_oa = oa;
343         sa->sa_upcall = upcall;
344         sa->sa_cookie = cookie;
345         if (rqset == PTLRPCD_SET)
346                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
347         else
348                 ptlrpc_set_add_req(rqset, req);
349
350         RETURN(0);
351 }
352
353 static int osc_sync_interpret(const struct lu_env *env,
354                               struct ptlrpc_request *req,
355                               void *arg, int rc)
356 {
357         struct osc_fsync_args *fa = arg;
358         struct ost_body *body;
359         ENTRY;
360
361         if (rc)
362                 GOTO(out, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL) {
366                 CERROR ("can't unpack ost_body\n");
367                 GOTO(out, rc = -EPROTO);
368         }
369
370         *fa->fa_oa = body->oa;
371 out:
372         rc = fa->fa_upcall(fa->fa_cookie, rc);
373         RETURN(rc);
374 }
375
376 int osc_sync_base(struct obd_export *exp, struct obdo *oa,
377                   obd_enqueue_update_f upcall, void *cookie,
378                   struct ptlrpc_request_set *rqset)
379 {
380         struct ptlrpc_request *req;
381         struct ost_body       *body;
382         struct osc_fsync_args *fa;
383         int                    rc;
384         ENTRY;
385
386         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
387         if (req == NULL)
388                 RETURN(-ENOMEM);
389
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 RETURN(rc);
394         }
395
396         /* overload the size and blocks fields in the oa with start/end */
397         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
398         LASSERT(body);
399         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
400
401         ptlrpc_request_set_replen(req);
402         req->rq_interpret_reply = osc_sync_interpret;
403
404         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
405         fa = ptlrpc_req_async_args(req);
406         fa->fa_oa = oa;
407         fa->fa_upcall = upcall;
408         fa->fa_cookie = cookie;
409
410         if (rqset == PTLRPCD_SET)
411                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412         else
413                 ptlrpc_set_add_req(rqset, req);
414
415         RETURN (0);
416 }
417
418 /* Find and cancel locally locks matched by @mode in the resource found by
419  * @objid. Found locks are added into @cancel list. Returns the amount of
420  * locks added to @cancels list. */
421 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
422                                    struct list_head *cancels,
423                                    ldlm_mode_t mode, __u64 lock_flags)
424 {
425         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
426         struct ldlm_res_id res_id;
427         struct ldlm_resource *res;
428         int count;
429         ENTRY;
430
431         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
432          * export) but disabled through procfs (flag in NS).
433          *
434          * This distinguishes from a case when ELC is not supported originally,
435          * when we still want to cancel locks in advance and just cancel them
436          * locally, without sending any RPC. */
437         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
438                 RETURN(0);
439
440         ostid_build_res_name(&oa->o_oi, &res_id);
441         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
442         if (IS_ERR(res))
443                 RETURN(0);
444
445         LDLM_RESOURCE_ADDREF(res);
446         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
447                                            lock_flags, 0, NULL);
448         LDLM_RESOURCE_DELREF(res);
449         ldlm_resource_putref(res);
450         RETURN(count);
451 }
452
453 static int osc_destroy_interpret(const struct lu_env *env,
454                                  struct ptlrpc_request *req, void *data,
455                                  int rc)
456 {
457         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
458
459         atomic_dec(&cli->cl_destroy_in_flight);
460         wake_up(&cli->cl_destroy_waitq);
461         return 0;
462 }
463
464 static int osc_can_send_destroy(struct client_obd *cli)
465 {
466         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
467             cli->cl_max_rpcs_in_flight) {
468                 /* The destroy request can be sent */
469                 return 1;
470         }
471         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
472             cli->cl_max_rpcs_in_flight) {
473                 /*
474                  * The counter has been modified between the two atomic
475                  * operations.
476                  */
477                 wake_up(&cli->cl_destroy_waitq);
478         }
479         return 0;
480 }
481
482 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
483                        struct obdo *oa)
484 {
485         struct client_obd     *cli = &exp->exp_obd->u.cli;
486         struct ptlrpc_request *req;
487         struct ost_body       *body;
488         struct list_head       cancels = LIST_HEAD_INIT(cancels);
489         int rc, count;
490         ENTRY;
491
492         if (!oa) {
493                 CDEBUG(D_INFO, "oa NULL\n");
494                 RETURN(-EINVAL);
495         }
496
497         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
498                                         LDLM_FL_DISCARD_DATA);
499
500         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
501         if (req == NULL) {
502                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
503                 RETURN(-ENOMEM);
504         }
505
506         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
507                                0, &cancels, count);
508         if (rc) {
509                 ptlrpc_request_free(req);
510                 RETURN(rc);
511         }
512
513         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
514         ptlrpc_at_set_req_timeout(req);
515
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521
522         req->rq_interpret_reply = osc_destroy_interpret;
523         if (!osc_can_send_destroy(cli)) {
524                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
525
526                 /*
527                  * Wait until the number of on-going destroy RPCs drops
528                  * under max_rpc_in_flight
529                  */
530                 l_wait_event_exclusive(cli->cl_destroy_waitq,
531                                        osc_can_send_destroy(cli), &lwi);
532         }
533
534         /* Do not wait for response */
535         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
536         RETURN(0);
537 }
538
539 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
540                                 long writing_bytes)
541 {
542         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
543
544         LASSERT(!(oa->o_valid & bits));
545
546         oa->o_valid |= bits;
547         spin_lock(&cli->cl_loi_list_lock);
548         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
549         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
550                      cli->cl_dirty_max_pages)) {
551                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
552                        cli->cl_dirty_pages, cli->cl_dirty_transit,
553                        cli->cl_dirty_max_pages);
554                 oa->o_undirty = 0;
555         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
556                             atomic_long_read(&obd_dirty_transit_pages) >
557                             (obd_max_dirty_pages + 1))) {
558                 /* The atomic_read() allowing the atomic_inc() are
559                  * not covered by a lock thus they may safely race and trip
560                  * this CERROR() unless we add in a small fudge factor (+1). */
561                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
562                        cli->cl_import->imp_obd->obd_name,
563                        atomic_long_read(&obd_dirty_pages),
564                        atomic_long_read(&obd_dirty_transit_pages),
565                        obd_max_dirty_pages);
566                 oa->o_undirty = 0;
567         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
568                             0x7fffffff)) {
569                 CERROR("dirty %lu - dirty_max %lu too big???\n",
570                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
571                 oa->o_undirty = 0;
572         } else {
573                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
574                                       PAGE_CACHE_SHIFT) *
575                                      (cli->cl_max_rpcs_in_flight + 1);
576                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
577                                     max_in_flight);
578         }
579         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
580         oa->o_dropped = cli->cl_lost_grant;
581         cli->cl_lost_grant = 0;
582         spin_unlock(&cli->cl_loi_list_lock);
583         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
584                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
585
586 }
587
588 void osc_update_next_shrink(struct client_obd *cli)
589 {
590         cli->cl_next_shrink_grant =
591                 cfs_time_shift(cli->cl_grant_shrink_interval);
592         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
593                cli->cl_next_shrink_grant);
594 }
595
596 static void __osc_update_grant(struct client_obd *cli, u64 grant)
597 {
598         spin_lock(&cli->cl_loi_list_lock);
599         cli->cl_avail_grant += grant;
600         spin_unlock(&cli->cl_loi_list_lock);
601 }
602
603 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
604 {
605         if (body->oa.o_valid & OBD_MD_FLGRANT) {
606                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
607                 __osc_update_grant(cli, body->oa.o_grant);
608         }
609 }
610
611 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
612                               u32 keylen, void *key,
613                               u32 vallen, void *val,
614                               struct ptlrpc_request_set *set);
615
616 static int osc_shrink_grant_interpret(const struct lu_env *env,
617                                       struct ptlrpc_request *req,
618                                       void *aa, int rc)
619 {
620         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
621         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
622         struct ost_body *body;
623
624         if (rc != 0) {
625                 __osc_update_grant(cli, oa->o_grant);
626                 GOTO(out, rc);
627         }
628
629         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
630         LASSERT(body);
631         osc_update_grant(cli, body);
632 out:
633         OBDO_FREE(oa);
634         return rc;
635 }
636
637 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
638 {
639         spin_lock(&cli->cl_loi_list_lock);
640         oa->o_grant = cli->cl_avail_grant / 4;
641         cli->cl_avail_grant -= oa->o_grant;
642         spin_unlock(&cli->cl_loi_list_lock);
643         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
644                 oa->o_valid |= OBD_MD_FLFLAGS;
645                 oa->o_flags = 0;
646         }
647         oa->o_flags |= OBD_FL_SHRINK_GRANT;
648         osc_update_next_shrink(cli);
649 }
650
651 /* Shrink the current grant, either from some large amount to enough for a
652  * full set of in-flight RPCs, or if we have already shrunk to that limit
653  * then to enough for a single RPC.  This avoids keeping more grant than
654  * needed, and avoids shrinking the grant piecemeal. */
655 static int osc_shrink_grant(struct client_obd *cli)
656 {
657         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
658                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
659
660         spin_lock(&cli->cl_loi_list_lock);
661         if (cli->cl_avail_grant <= target_bytes)
662                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
663         spin_unlock(&cli->cl_loi_list_lock);
664
665         return osc_shrink_grant_to_target(cli, target_bytes);
666 }
667
668 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
669 {
670         int                     rc = 0;
671         struct ost_body        *body;
672         ENTRY;
673
674         spin_lock(&cli->cl_loi_list_lock);
675         /* Don't shrink if we are already above or below the desired limit
676          * We don't want to shrink below a single RPC, as that will negatively
677          * impact block allocation and long-term performance. */
678         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
679                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
680
681         if (target_bytes >= cli->cl_avail_grant) {
682                 spin_unlock(&cli->cl_loi_list_lock);
683                 RETURN(0);
684         }
685         spin_unlock(&cli->cl_loi_list_lock);
686
687         OBD_ALLOC_PTR(body);
688         if (!body)
689                 RETURN(-ENOMEM);
690
691         osc_announce_cached(cli, &body->oa, 0);
692
693         spin_lock(&cli->cl_loi_list_lock);
694         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
695         cli->cl_avail_grant = target_bytes;
696         spin_unlock(&cli->cl_loi_list_lock);
697         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
698                 body->oa.o_valid |= OBD_MD_FLFLAGS;
699                 body->oa.o_flags = 0;
700         }
701         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
702         osc_update_next_shrink(cli);
703
704         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
705                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
706                                 sizeof(*body), body, NULL);
707         if (rc != 0)
708                 __osc_update_grant(cli, body->oa.o_grant);
709         OBD_FREE_PTR(body);
710         RETURN(rc);
711 }
712
713 static int osc_should_shrink_grant(struct client_obd *client)
714 {
715         cfs_time_t time = cfs_time_current();
716         cfs_time_t next_shrink = client->cl_next_shrink_grant;
717
718         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
719              OBD_CONNECT_GRANT_SHRINK) == 0)
720                 return 0;
721
722         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
723                 /* Get the current RPC size directly, instead of going via:
724                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
725                  * Keep comment here so that it can be found by searching. */
726                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
727
728                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
729                     client->cl_avail_grant > brw_size)
730                         return 1;
731                 else
732                         osc_update_next_shrink(client);
733         }
734         return 0;
735 }
736
737 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
738 {
739         struct client_obd *client;
740
741         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
742                 if (osc_should_shrink_grant(client))
743                         osc_shrink_grant(client);
744         }
745         return 0;
746 }
747
748 static int osc_add_shrink_grant(struct client_obd *client)
749 {
750         int rc;
751
752         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
753                                        TIMEOUT_GRANT,
754                                        osc_grant_shrink_grant_cb, NULL,
755                                        &client->cl_grant_shrink_list);
756         if (rc) {
757                 CERROR("add grant client %s error %d\n",
758                         client->cl_import->imp_obd->obd_name, rc);
759                 return rc;
760         }
761         CDEBUG(D_CACHE, "add grant client %s \n",
762                client->cl_import->imp_obd->obd_name);
763         osc_update_next_shrink(client);
764         return 0;
765 }
766
767 static int osc_del_shrink_grant(struct client_obd *client)
768 {
769         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
770                                          TIMEOUT_GRANT);
771 }
772
773 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
774 {
775         /*
776          * ocd_grant is the total grant amount we're expect to hold: if we've
777          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
778          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
779          * dirty.
780          *
781          * race is tolerable here: if we're evicted, but imp_state already
782          * left EVICTED state, then cl_dirty_pages must be 0 already.
783          */
784         spin_lock(&cli->cl_loi_list_lock);
785         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
786                 cli->cl_avail_grant = ocd->ocd_grant;
787         else
788                 cli->cl_avail_grant = ocd->ocd_grant -
789                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
790
791         if (cli->cl_avail_grant < 0) {
792                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
793                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
794                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
795                 /* workaround for servers which do not have the patch from
796                  * LU-2679 */
797                 cli->cl_avail_grant = ocd->ocd_grant;
798         }
799
800         /* determine the appropriate chunk size used by osc_extent. */
801         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
802         spin_unlock(&cli->cl_loi_list_lock);
803
804         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
805                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
806                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
807
808         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
809             list_empty(&cli->cl_grant_shrink_list))
810                 osc_add_shrink_grant(cli);
811 }
812
813 /* We assume that the reason this OSC got a short read is because it read
814  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
815  * via the LOV, and it _knows_ it's reading inside the file, it's just that
816  * this stripe never got written at or beyond this stripe offset yet. */
817 static void handle_short_read(int nob_read, size_t page_count,
818                               struct brw_page **pga)
819 {
820         char *ptr;
821         int i = 0;
822
823         /* skip bytes read OK */
824         while (nob_read > 0) {
825                 LASSERT (page_count > 0);
826
827                 if (pga[i]->count > nob_read) {
828                         /* EOF inside this page */
829                         ptr = kmap(pga[i]->pg) +
830                                 (pga[i]->off & ~PAGE_MASK);
831                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
832                         kunmap(pga[i]->pg);
833                         page_count--;
834                         i++;
835                         break;
836                 }
837
838                 nob_read -= pga[i]->count;
839                 page_count--;
840                 i++;
841         }
842
843         /* zero remaining pages */
844         while (page_count-- > 0) {
845                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
846                 memset(ptr, 0, pga[i]->count);
847                 kunmap(pga[i]->pg);
848                 i++;
849         }
850 }
851
852 static int check_write_rcs(struct ptlrpc_request *req,
853                            int requested_nob, int niocount,
854                            size_t page_count, struct brw_page **pga)
855 {
856         int     i;
857         __u32   *remote_rcs;
858
859         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
860                                                   sizeof(*remote_rcs) *
861                                                   niocount);
862         if (remote_rcs == NULL) {
863                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
864                 return(-EPROTO);
865         }
866
867         /* return error if any niobuf was in error */
868         for (i = 0; i < niocount; i++) {
869                 if ((int)remote_rcs[i] < 0)
870                         return(remote_rcs[i]);
871
872                 if (remote_rcs[i] != 0) {
873                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
874                                 i, remote_rcs[i], req);
875                         return(-EPROTO);
876                 }
877         }
878
879         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881                        req->rq_bulk->bd_nob_transferred, requested_nob);
882                 return(-EPROTO);
883         }
884
885         return (0);
886 }
887
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 {
890         if (p1->flag != p2->flag) {
891                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
892                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
893                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
894
895                 /* warn if we try to combine flags that we don't know to be
896                  * safe to combine */
897                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
898                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
899                               "report this at https://jira.hpdd.intel.com/\n",
900                               p1->flag, p2->flag);
901                 }
902                 return 0;
903         }
904
905         return (p1->off + p1->count == p2->off);
906 }
907
908 static u32 osc_checksum_bulk(int nob, size_t pg_count,
909                              struct brw_page **pga, int opc,
910                              cksum_type_t cksum_type)
911 {
912         u32                             cksum;
913         int                             i = 0;
914         struct cfs_crypto_hash_desc     *hdesc;
915         unsigned int                    bufsize;
916         int                             err;
917         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
918
919         LASSERT(pg_count > 0);
920
921         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
922         if (IS_ERR(hdesc)) {
923                 CERROR("Unable to initialize checksum hash %s\n",
924                        cfs_crypto_hash_name(cfs_alg));
925                 return PTR_ERR(hdesc);
926         }
927
928         while (nob > 0 && pg_count > 0) {
929                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
930
931                 /* corrupt the data before we compute the checksum, to
932                  * simulate an OST->client data error */
933                 if (i == 0 && opc == OST_READ &&
934                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
935                         unsigned char *ptr = kmap(pga[i]->pg);
936                         int off = pga[i]->off & ~PAGE_MASK;
937
938                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
939                         kunmap(pga[i]->pg);
940                 }
941                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
942                                             pga[i]->off & ~PAGE_MASK,
943                                             count);
944                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
945                                (int)(pga[i]->off & ~PAGE_MASK));
946
947                 nob -= pga[i]->count;
948                 pg_count--;
949                 i++;
950         }
951
952         bufsize = sizeof(cksum);
953         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
954
955         /* For sending we only compute the wrong checksum instead
956          * of corrupting the data so it is still correct on a redo */
957         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
958                 cksum++;
959
960         return cksum;
961 }
962
963 static int
964 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
965                      u32 page_count, struct brw_page **pga,
966                      struct ptlrpc_request **reqp, int resend)
967 {
968         struct ptlrpc_request   *req;
969         struct ptlrpc_bulk_desc *desc;
970         struct ost_body         *body;
971         struct obd_ioobj        *ioobj;
972         struct niobuf_remote    *niobuf;
973         int niocount, i, requested_nob, opc, rc;
974         struct osc_brw_async_args *aa;
975         struct req_capsule      *pill;
976         struct brw_page *pg_prev;
977
978         ENTRY;
979         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
980                 RETURN(-ENOMEM); /* Recoverable */
981         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
982                 RETURN(-EINVAL); /* Fatal */
983
984         if ((cmd & OBD_BRW_WRITE) != 0) {
985                 opc = OST_WRITE;
986                 req = ptlrpc_request_alloc_pool(cli->cl_import,
987                                                 cli->cl_import->imp_rq_pool,
988                                                 &RQF_OST_BRW_WRITE);
989         } else {
990                 opc = OST_READ;
991                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
992         }
993         if (req == NULL)
994                 RETURN(-ENOMEM);
995
996         for (niocount = i = 1; i < page_count; i++) {
997                 if (!can_merge_pages(pga[i - 1], pga[i]))
998                         niocount++;
999         }
1000
1001         pill = &req->rq_pill;
1002         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1003                              sizeof(*ioobj));
1004         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1005                              niocount * sizeof(*niobuf));
1006
1007         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1008         if (rc) {
1009                 ptlrpc_request_free(req);
1010                 RETURN(rc);
1011         }
1012         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1013         ptlrpc_at_set_req_timeout(req);
1014         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1015          * retry logic */
1016         req->rq_no_retry_einprogress = 1;
1017
1018         desc = ptlrpc_prep_bulk_imp(req, page_count,
1019                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1020                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1021                 OST_BULK_PORTAL);
1022
1023         if (desc == NULL)
1024                 GOTO(out, rc = -ENOMEM);
1025         /* NB request now owns desc and will free it when it gets freed */
1026
1027         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1028         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1029         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1030         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1031
1032         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1033
1034         obdo_to_ioobj(oa, ioobj);
1035         ioobj->ioo_bufcnt = niocount;
1036         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1037          * that might be send for this request.  The actual number is decided
1038          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1039          * "max - 1" for old client compatibility sending "0", and also so the
1040          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1041         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1042         LASSERT(page_count > 0);
1043         pg_prev = pga[0];
1044         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1045                 struct brw_page *pg = pga[i];
1046                 int poff = pg->off & ~PAGE_MASK;
1047
1048                 LASSERT(pg->count > 0);
1049                 /* make sure there is no gap in the middle of page array */
1050                 LASSERTF(page_count == 1 ||
1051                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1052                           ergo(i > 0 && i < page_count - 1,
1053                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1054                           ergo(i == page_count - 1, poff == 0)),
1055                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1056                          i, page_count, pg, pg->off, pg->count);
1057                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1058                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1059                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1060                          i, page_count,
1061                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1062                          pg_prev->pg, page_private(pg_prev->pg),
1063                          pg_prev->pg->index, pg_prev->off);
1064                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1065                         (pg->flag & OBD_BRW_SRVLOCK));
1066
1067                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1068                 requested_nob += pg->count;
1069
1070                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1071                         niobuf--;
1072                         niobuf->rnb_len += pg->count;
1073                 } else {
1074                         niobuf->rnb_offset = pg->off;
1075                         niobuf->rnb_len    = pg->count;
1076                         niobuf->rnb_flags  = pg->flag;
1077                 }
1078                 pg_prev = pg;
1079         }
1080
1081         LASSERTF((void *)(niobuf - niocount) ==
1082                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1083                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1084                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1085
1086         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1087         if (resend) {
1088                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1089                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1090                         body->oa.o_flags = 0;
1091                 }
1092                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1093         }
1094
1095         if (osc_should_shrink_grant(cli))
1096                 osc_shrink_grant_local(cli, &body->oa);
1097
1098         /* size[REQ_REC_OFF] still sizeof (*body) */
1099         if (opc == OST_WRITE) {
1100                 if (cli->cl_checksum &&
1101                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1102                         /* store cl_cksum_type in a local variable since
1103                          * it can be changed via lprocfs */
1104                         cksum_type_t cksum_type = cli->cl_cksum_type;
1105
1106                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1107                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1108                                 body->oa.o_flags = 0;
1109                         }
1110                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1111                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1112                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1113                                                              page_count, pga,
1114                                                              OST_WRITE,
1115                                                              cksum_type);
1116                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1117                                body->oa.o_cksum);
1118                         /* save this in 'oa', too, for later checking */
1119                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1120                         oa->o_flags |= cksum_type_pack(cksum_type);
1121                 } else {
1122                         /* clear out the checksum flag, in case this is a
1123                          * resend but cl_checksum is no longer set. b=11238 */
1124                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1125                 }
1126                 oa->o_cksum = body->oa.o_cksum;
1127                 /* 1 RC per niobuf */
1128                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1129                                      sizeof(__u32) * niocount);
1130         } else {
1131                 if (cli->cl_checksum &&
1132                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1133                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1134                                 body->oa.o_flags = 0;
1135                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1136                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1137                 }
1138         }
1139         ptlrpc_request_set_replen(req);
1140
1141         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1142         aa = ptlrpc_req_async_args(req);
1143         aa->aa_oa = oa;
1144         aa->aa_requested_nob = requested_nob;
1145         aa->aa_nio_count = niocount;
1146         aa->aa_page_count = page_count;
1147         aa->aa_resends = 0;
1148         aa->aa_ppga = pga;
1149         aa->aa_cli = cli;
1150         INIT_LIST_HEAD(&aa->aa_oaps);
1151
1152         *reqp = req;
1153         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1154         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1155                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1156                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1157         RETURN(0);
1158
1159  out:
1160         ptlrpc_req_finished(req);
1161         RETURN(rc);
1162 }
1163
1164 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1165                                 __u32 client_cksum, __u32 server_cksum, int nob,
1166                                 size_t page_count, struct brw_page **pga,
1167                                 cksum_type_t client_cksum_type)
1168 {
1169         __u32 new_cksum;
1170         char *msg;
1171         cksum_type_t cksum_type;
1172
1173         if (server_cksum == client_cksum) {
1174                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1175                 return 0;
1176         }
1177
1178         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1179                                        oa->o_flags : 0);
1180         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1181                                       cksum_type);
1182
1183         if (cksum_type != client_cksum_type)
1184                 msg = "the server did not use the checksum type specified in "
1185                       "the original request - likely a protocol problem";
1186         else if (new_cksum == server_cksum)
1187                 msg = "changed on the client after we checksummed it - "
1188                       "likely false positive due to mmap IO (bug 11742)";
1189         else if (new_cksum == client_cksum)
1190                 msg = "changed in transit before arrival at OST";
1191         else
1192                 msg = "changed in transit AND doesn't match the original - "
1193                       "likely false positive due to mmap IO (bug 11742)";
1194
1195         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1196                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1197                            msg, libcfs_nid2str(peer->nid),
1198                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1199                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1200                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1201                            POSTID(&oa->o_oi), pga[0]->off,
1202                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1203         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1204                "client csum now %x\n", client_cksum, client_cksum_type,
1205                server_cksum, cksum_type, new_cksum);
1206         return 1;
1207 }
1208
1209 /* Note rc enters this function as number of bytes transferred */
1210 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1211 {
1212         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1213         const lnet_process_id_t *peer =
1214                         &req->rq_import->imp_connection->c_peer;
1215         struct client_obd *cli = aa->aa_cli;
1216         struct ost_body *body;
1217         u32 client_cksum = 0;
1218         ENTRY;
1219
1220         if (rc < 0 && rc != -EDQUOT) {
1221                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1222                 RETURN(rc);
1223         }
1224
1225         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1226         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1227         if (body == NULL) {
1228                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1229                 RETURN(-EPROTO);
1230         }
1231
1232         /* set/clear over quota flag for a uid/gid */
1233         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1234             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1235                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1236
1237                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1238                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1239                        body->oa.o_flags);
1240                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1241         }
1242
1243         osc_update_grant(cli, body);
1244
1245         if (rc < 0)
1246                 RETURN(rc);
1247
1248         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1249                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1250
1251         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1252                 if (rc > 0) {
1253                         CERROR("Unexpected +ve rc %d\n", rc);
1254                         RETURN(-EPROTO);
1255                 }
1256                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1257
1258                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1259                         RETURN(-EAGAIN);
1260
1261                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1262                     check_write_checksum(&body->oa, peer, client_cksum,
1263                                          body->oa.o_cksum, aa->aa_requested_nob,
1264                                          aa->aa_page_count, aa->aa_ppga,
1265                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1266                         RETURN(-EAGAIN);
1267
1268                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1269                                      aa->aa_page_count, aa->aa_ppga);
1270                 GOTO(out, rc);
1271         }
1272
1273         /* The rest of this function executes only for OST_READs */
1274
1275         /* if unwrap_bulk failed, return -EAGAIN to retry */
1276         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1277         if (rc < 0)
1278                 GOTO(out, rc = -EAGAIN);
1279
1280         if (rc > aa->aa_requested_nob) {
1281                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1282                        aa->aa_requested_nob);
1283                 RETURN(-EPROTO);
1284         }
1285
1286         if (rc != req->rq_bulk->bd_nob_transferred) {
1287                 CERROR ("Unexpected rc %d (%d transferred)\n",
1288                         rc, req->rq_bulk->bd_nob_transferred);
1289                 return (-EPROTO);
1290         }
1291
1292         if (rc < aa->aa_requested_nob)
1293                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1294
1295         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1296                 static int cksum_counter;
1297                 u32        server_cksum = body->oa.o_cksum;
1298                 char      *via = "";
1299                 char      *router = "";
1300                 cksum_type_t cksum_type;
1301
1302                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1303                                                body->oa.o_flags : 0);
1304                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1305                                                  aa->aa_ppga, OST_READ,
1306                                                  cksum_type);
1307
1308                 if (peer->nid != req->rq_bulk->bd_sender) {
1309                         via = " via ";
1310                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1311                 }
1312
1313                 if (server_cksum != client_cksum) {
1314                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1315                                            "%s%s%s inode "DFID" object "DOSTID
1316                                            " extent ["LPU64"-"LPU64"]\n",
1317                                            req->rq_import->imp_obd->obd_name,
1318                                            libcfs_nid2str(peer->nid),
1319                                            via, router,
1320                                            body->oa.o_valid & OBD_MD_FLFID ?
1321                                                 body->oa.o_parent_seq : (__u64)0,
1322                                            body->oa.o_valid & OBD_MD_FLFID ?
1323                                                 body->oa.o_parent_oid : 0,
1324                                            body->oa.o_valid & OBD_MD_FLFID ?
1325                                                 body->oa.o_parent_ver : 0,
1326                                            POSTID(&body->oa.o_oi),
1327                                            aa->aa_ppga[0]->off,
1328                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1329                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1330                                                                         1);
1331                         CERROR("client %x, server %x, cksum_type %x\n",
1332                                client_cksum, server_cksum, cksum_type);
1333                         cksum_counter = 0;
1334                         aa->aa_oa->o_cksum = client_cksum;
1335                         rc = -EAGAIN;
1336                 } else {
1337                         cksum_counter++;
1338                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1339                         rc = 0;
1340                 }
1341         } else if (unlikely(client_cksum)) {
1342                 static int cksum_missed;
1343
1344                 cksum_missed++;
1345                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1346                         CERROR("Checksum %u requested from %s but not sent\n",
1347                                cksum_missed, libcfs_nid2str(peer->nid));
1348         } else {
1349                 rc = 0;
1350         }
1351 out:
1352         if (rc >= 0)
1353                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1354                                      aa->aa_oa, &body->oa);
1355
1356         RETURN(rc);
1357 }
1358
1359 static int osc_brw_redo_request(struct ptlrpc_request *request,
1360                                 struct osc_brw_async_args *aa, int rc)
1361 {
1362         struct ptlrpc_request *new_req;
1363         struct osc_brw_async_args *new_aa;
1364         struct osc_async_page *oap;
1365         ENTRY;
1366
1367         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1368                   "redo for recoverable error %d", rc);
1369
1370         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1371                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1372                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1373                                   aa->aa_ppga, &new_req, 1);
1374         if (rc)
1375                 RETURN(rc);
1376
1377         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1378                 if (oap->oap_request != NULL) {
1379                         LASSERTF(request == oap->oap_request,
1380                                  "request %p != oap_request %p\n",
1381                                  request, oap->oap_request);
1382                         if (oap->oap_interrupted) {
1383                                 ptlrpc_req_finished(new_req);
1384                                 RETURN(-EINTR);
1385                         }
1386                 }
1387         }
1388         /* New request takes over pga and oaps from old request.
1389          * Note that copying a list_head doesn't work, need to move it... */
1390         aa->aa_resends++;
1391         new_req->rq_interpret_reply = request->rq_interpret_reply;
1392         new_req->rq_async_args = request->rq_async_args;
1393         new_req->rq_commit_cb = request->rq_commit_cb;
1394         /* cap resend delay to the current request timeout, this is similar to
1395          * what ptlrpc does (see after_reply()) */
1396         if (aa->aa_resends > new_req->rq_timeout)
1397                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1398         else
1399                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1400         new_req->rq_generation_set = 1;
1401         new_req->rq_import_generation = request->rq_import_generation;
1402
1403         new_aa = ptlrpc_req_async_args(new_req);
1404
1405         INIT_LIST_HEAD(&new_aa->aa_oaps);
1406         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1407         INIT_LIST_HEAD(&new_aa->aa_exts);
1408         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1409         new_aa->aa_resends = aa->aa_resends;
1410
1411         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1412                 if (oap->oap_request) {
1413                         ptlrpc_req_finished(oap->oap_request);
1414                         oap->oap_request = ptlrpc_request_addref(new_req);
1415                 }
1416         }
1417
1418         /* XXX: This code will run into problem if we're going to support
1419          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1420          * and wait for all of them to be finished. We should inherit request
1421          * set from old request. */
1422         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1423
1424         DEBUG_REQ(D_INFO, new_req, "new request");
1425         RETURN(0);
1426 }
1427
1428 /*
1429  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1430  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1431  * fine for our small page arrays and doesn't require allocation.  its an
1432  * insertion sort that swaps elements that are strides apart, shrinking the
1433  * stride down until its '1' and the array is sorted.
1434  */
1435 static void sort_brw_pages(struct brw_page **array, int num)
1436 {
1437         int stride, i, j;
1438         struct brw_page *tmp;
1439
1440         if (num == 1)
1441                 return;
1442         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1443                 ;
1444
1445         do {
1446                 stride /= 3;
1447                 for (i = stride ; i < num ; i++) {
1448                         tmp = array[i];
1449                         j = i;
1450                         while (j >= stride && array[j - stride]->off > tmp->off) {
1451                                 array[j] = array[j - stride];
1452                                 j -= stride;
1453                         }
1454                         array[j] = tmp;
1455                 }
1456         } while (stride > 1);
1457 }
1458
1459 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1460 {
1461         LASSERT(ppga != NULL);
1462         OBD_FREE(ppga, sizeof(*ppga) * count);
1463 }
1464
1465 static int brw_interpret(const struct lu_env *env,
1466                          struct ptlrpc_request *req, void *data, int rc)
1467 {
1468         struct osc_brw_async_args *aa = data;
1469         struct osc_extent *ext;
1470         struct osc_extent *tmp;
1471         struct client_obd *cli = aa->aa_cli;
1472         ENTRY;
1473
1474         rc = osc_brw_fini_request(req, rc);
1475         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1476         /* When server return -EINPROGRESS, client should always retry
1477          * regardless of the number of times the bulk was resent already. */
1478         if (osc_recoverable_error(rc)) {
1479                 if (req->rq_import_generation !=
1480                     req->rq_import->imp_generation) {
1481                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1482                                ""DOSTID", rc = %d.\n",
1483                                req->rq_import->imp_obd->obd_name,
1484                                POSTID(&aa->aa_oa->o_oi), rc);
1485                 } else if (rc == -EINPROGRESS ||
1486                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1487                         rc = osc_brw_redo_request(req, aa, rc);
1488                 } else {
1489                         CERROR("%s: too many resent retries for object: "
1490                                ""LPU64":"LPU64", rc = %d.\n",
1491                                req->rq_import->imp_obd->obd_name,
1492                                POSTID(&aa->aa_oa->o_oi), rc);
1493                 }
1494
1495                 if (rc == 0)
1496                         RETURN(0);
1497                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1498                         rc = -EIO;
1499         }
1500
1501         if (rc == 0) {
1502                 struct obdo *oa = aa->aa_oa;
1503                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1504                 unsigned long valid = 0;
1505                 struct cl_object *obj;
1506                 struct osc_async_page *last;
1507
1508                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1509                 obj = osc2cl(last->oap_obj);
1510
1511                 cl_object_attr_lock(obj);
1512                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1513                         attr->cat_blocks = oa->o_blocks;
1514                         valid |= CAT_BLOCKS;
1515                 }
1516                 if (oa->o_valid & OBD_MD_FLMTIME) {
1517                         attr->cat_mtime = oa->o_mtime;
1518                         valid |= CAT_MTIME;
1519                 }
1520                 if (oa->o_valid & OBD_MD_FLATIME) {
1521                         attr->cat_atime = oa->o_atime;
1522                         valid |= CAT_ATIME;
1523                 }
1524                 if (oa->o_valid & OBD_MD_FLCTIME) {
1525                         attr->cat_ctime = oa->o_ctime;
1526                         valid |= CAT_CTIME;
1527                 }
1528
1529                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1530                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1531                         loff_t last_off = last->oap_count + last->oap_obj_off +
1532                                 last->oap_page_off;
1533
1534                         /* Change file size if this is an out of quota or
1535                          * direct IO write and it extends the file size */
1536                         if (loi->loi_lvb.lvb_size < last_off) {
1537                                 attr->cat_size = last_off;
1538                                 valid |= CAT_SIZE;
1539                         }
1540                         /* Extend KMS if it's not a lockless write */
1541                         if (loi->loi_kms < last_off &&
1542                             oap2osc_page(last)->ops_srvlock == 0) {
1543                                 attr->cat_kms = last_off;
1544                                 valid |= CAT_KMS;
1545                         }
1546                 }
1547
1548                 if (valid != 0)
1549                         cl_object_attr_update(env, obj, attr, valid);
1550                 cl_object_attr_unlock(obj);
1551         }
1552         OBDO_FREE(aa->aa_oa);
1553
1554         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1555                 osc_inc_unstable_pages(req);
1556
1557         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1558                 list_del_init(&ext->oe_link);
1559                 osc_extent_finish(env, ext, 1, rc);
1560         }
1561         LASSERT(list_empty(&aa->aa_exts));
1562         LASSERT(list_empty(&aa->aa_oaps));
1563
1564         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1565                           req->rq_bulk->bd_nob_transferred);
1566         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1567         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1568
1569         spin_lock(&cli->cl_loi_list_lock);
1570         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1571          * is called so we know whether to go to sync BRWs or wait for more
1572          * RPCs to complete */
1573         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1574                 cli->cl_w_in_flight--;
1575         else
1576                 cli->cl_r_in_flight--;
1577         osc_wake_cache_waiters(cli);
1578         spin_unlock(&cli->cl_loi_list_lock);
1579
1580         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1581         RETURN(rc);
1582 }
1583
1584 static void brw_commit(struct ptlrpc_request *req)
1585 {
1586         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1587          * this called via the rq_commit_cb, I need to ensure
1588          * osc_dec_unstable_pages is still called. Otherwise unstable
1589          * pages may be leaked. */
1590         spin_lock(&req->rq_lock);
1591         if (likely(req->rq_unstable)) {
1592                 req->rq_unstable = 0;
1593                 spin_unlock(&req->rq_lock);
1594
1595                 osc_dec_unstable_pages(req);
1596         } else {
1597                 req->rq_committed = 1;
1598                 spin_unlock(&req->rq_lock);
1599         }
1600 }
1601
1602 /**
1603  * Build an RPC by the list of extent @ext_list. The caller must ensure
1604  * that the total pages in this list are NOT over max pages per RPC.
1605  * Extents in the list must be in OES_RPC state.
1606  */
1607 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1608                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1609 {
1610         struct ptlrpc_request           *req = NULL;
1611         struct osc_extent               *ext;
1612         struct brw_page                 **pga = NULL;
1613         struct osc_brw_async_args       *aa = NULL;
1614         struct obdo                     *oa = NULL;
1615         struct osc_async_page           *oap;
1616         struct osc_async_page           *tmp;
1617         struct cl_req                   *clerq = NULL;
1618         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1619                                                                       CRT_READ;
1620         struct cl_req_attr              *crattr = NULL;
1621         loff_t                          starting_offset = OBD_OBJECT_EOF;
1622         loff_t                          ending_offset = 0;
1623         int                             mpflag = 0;
1624         int                             mem_tight = 0;
1625         int                             page_count = 0;
1626         bool                            soft_sync = false;
1627         int                             i;
1628         int                             rc;
1629         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1630         struct ost_body                 *body;
1631         ENTRY;
1632         LASSERT(!list_empty(ext_list));
1633
1634         /* add pages into rpc_list to build BRW rpc */
1635         list_for_each_entry(ext, ext_list, oe_link) {
1636                 LASSERT(ext->oe_state == OES_RPC);
1637                 mem_tight |= ext->oe_memalloc;
1638                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1639                         ++page_count;
1640                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1641                         if (starting_offset == OBD_OBJECT_EOF ||
1642                             starting_offset > oap->oap_obj_off)
1643                                 starting_offset = oap->oap_obj_off;
1644                         else
1645                                 LASSERT(oap->oap_page_off == 0);
1646                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1647                                 ending_offset = oap->oap_obj_off +
1648                                                 oap->oap_count;
1649                         else
1650                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1651                                         PAGE_CACHE_SIZE);
1652                 }
1653         }
1654
1655         soft_sync = osc_over_unstable_soft_limit(cli);
1656         if (mem_tight)
1657                 mpflag = cfs_memory_pressure_get_and_set();
1658
1659         OBD_ALLOC(crattr, sizeof(*crattr));
1660         if (crattr == NULL)
1661                 GOTO(out, rc = -ENOMEM);
1662
1663         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1664         if (pga == NULL)
1665                 GOTO(out, rc = -ENOMEM);
1666
1667         OBDO_ALLOC(oa);
1668         if (oa == NULL)
1669                 GOTO(out, rc = -ENOMEM);
1670
1671         i = 0;
1672         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1673                 struct cl_page *page = oap2cl_page(oap);
1674                 if (clerq == NULL) {
1675                         clerq = cl_req_alloc(env, page, crt,
1676                                              1 /* only 1-object rpcs for now */);
1677                         if (IS_ERR(clerq))
1678                                 GOTO(out, rc = PTR_ERR(clerq));
1679                 }
1680                 if (mem_tight)
1681                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1682                 if (soft_sync)
1683                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1684                 pga[i] = &oap->oap_brw_page;
1685                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1686                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1687                        pga[i]->pg, page_index(oap->oap_page), oap,
1688                        pga[i]->flag);
1689                 i++;
1690                 cl_req_page_add(env, clerq, page);
1691         }
1692
1693         /* always get the data for the obdo for the rpc */
1694         LASSERT(clerq != NULL);
1695         crattr->cra_oa = oa;
1696         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1697
1698         rc = cl_req_prep(env, clerq);
1699         if (rc != 0) {
1700                 CERROR("cl_req_prep failed: %d\n", rc);
1701                 GOTO(out, rc);
1702         }
1703
1704         sort_brw_pages(pga, page_count);
1705         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1706         if (rc != 0) {
1707                 CERROR("prep_req failed: %d\n", rc);
1708                 GOTO(out, rc);
1709         }
1710
1711         req->rq_commit_cb = brw_commit;
1712         req->rq_interpret_reply = brw_interpret;
1713
1714         if (mem_tight != 0)
1715                 req->rq_memalloc = 1;
1716
1717         /* Need to update the timestamps after the request is built in case
1718          * we race with setattr (locally or in queue at OST).  If OST gets
1719          * later setattr before earlier BRW (as determined by the request xid),
1720          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1721          * way to do this in a single call.  bug 10150 */
1722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1723         crattr->cra_oa = &body->oa;
1724         cl_req_attr_set(env, clerq, crattr,
1725                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1726
1727         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1728
1729         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1730         aa = ptlrpc_req_async_args(req);
1731         INIT_LIST_HEAD(&aa->aa_oaps);
1732         list_splice_init(&rpc_list, &aa->aa_oaps);
1733         INIT_LIST_HEAD(&aa->aa_exts);
1734         list_splice_init(ext_list, &aa->aa_exts);
1735         aa->aa_clerq = clerq;
1736
1737         /* queued sync pages can be torn down while the pages
1738          * were between the pending list and the rpc */
1739         tmp = NULL;
1740         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1741                 /* only one oap gets a request reference */
1742                 if (tmp == NULL)
1743                         tmp = oap;
1744                 if (oap->oap_interrupted && !req->rq_intr) {
1745                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1746                                         oap, req);
1747                         ptlrpc_mark_interrupted(req);
1748                 }
1749         }
1750         if (tmp != NULL)
1751                 tmp->oap_request = ptlrpc_request_addref(req);
1752
1753         spin_lock(&cli->cl_loi_list_lock);
1754         starting_offset >>= PAGE_CACHE_SHIFT;
1755         if (cmd == OBD_BRW_READ) {
1756                 cli->cl_r_in_flight++;
1757                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1758                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1759                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1760                                       starting_offset + 1);
1761         } else {
1762                 cli->cl_w_in_flight++;
1763                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1764                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1765                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1766                                       starting_offset + 1);
1767         }
1768         spin_unlock(&cli->cl_loi_list_lock);
1769
1770         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1771                   page_count, aa, cli->cl_r_in_flight,
1772                   cli->cl_w_in_flight);
1773
1774         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1775          * see which CPU/NUMA node the majority of pages were allocated
1776          * on, and try to assign the async RPC to the CPU core
1777          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1778          *
1779          * But on the other hand, we expect that multiple ptlrpcd
1780          * threads and the initial write sponsor can run in parallel,
1781          * especially when data checksum is enabled, which is CPU-bound
1782          * operation and single ptlrpcd thread cannot process in time.
1783          * So more ptlrpcd threads sharing BRW load
1784          * (with PDL_POLICY_ROUND) seems better.
1785          */
1786         ptlrpcd_add_req(req, pol, -1);
1787         rc = 0;
1788         EXIT;
1789
1790 out:
1791         if (mem_tight != 0)
1792                 cfs_memory_pressure_restore(mpflag);
1793
1794         if (crattr != NULL)
1795                 OBD_FREE(crattr, sizeof(*crattr));
1796
1797         if (rc != 0) {
1798                 LASSERT(req == NULL);
1799
1800                 if (oa)
1801                         OBDO_FREE(oa);
1802                 if (pga)
1803                         OBD_FREE(pga, sizeof(*pga) * page_count);
1804                 /* this should happen rarely and is pretty bad, it makes the
1805                  * pending list not follow the dirty order */
1806                 while (!list_empty(ext_list)) {
1807                         ext = list_entry(ext_list->next, struct osc_extent,
1808                                          oe_link);
1809                         list_del_init(&ext->oe_link);
1810                         osc_extent_finish(env, ext, 0, rc);
1811                 }
1812                 if (clerq && !IS_ERR(clerq))
1813                         cl_req_completion(env, clerq, rc);
1814         }
1815         RETURN(rc);
1816 }
1817
1818 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1819                                         struct ldlm_enqueue_info *einfo)
1820 {
1821         void *data = einfo->ei_cbdata;
1822         int set = 0;
1823
1824         LASSERT(lock != NULL);
1825         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1826         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1827         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1828         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1829
1830         lock_res_and_lock(lock);
1831
1832         if (lock->l_ast_data == NULL)
1833                 lock->l_ast_data = data;
1834         if (lock->l_ast_data == data)
1835                 set = 1;
1836
1837         unlock_res_and_lock(lock);
1838
1839         return set;
1840 }
1841
1842 static int osc_set_data_with_check(struct lustre_handle *lockh,
1843                                    struct ldlm_enqueue_info *einfo)
1844 {
1845         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1846         int set = 0;
1847
1848         if (lock != NULL) {
1849                 set = osc_set_lock_data_with_check(lock, einfo);
1850                 LDLM_LOCK_PUT(lock);
1851         } else
1852                 CERROR("lockh %p, data %p - client evicted?\n",
1853                        lockh, einfo->ei_cbdata);
1854         return set;
1855 }
1856
1857 static int osc_enqueue_fini(struct ptlrpc_request *req,
1858                             osc_enqueue_upcall_f upcall, void *cookie,
1859                             struct lustre_handle *lockh, ldlm_mode_t mode,
1860                             __u64 *flags, int agl, int errcode)
1861 {
1862         bool intent = *flags & LDLM_FL_HAS_INTENT;
1863         int rc;
1864         ENTRY;
1865
1866         /* The request was created before ldlm_cli_enqueue call. */
1867         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1868                 struct ldlm_reply *rep;
1869
1870                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1871                 LASSERT(rep != NULL);
1872
1873                 rep->lock_policy_res1 =
1874                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1875                 if (rep->lock_policy_res1)
1876                         errcode = rep->lock_policy_res1;
1877                 if (!agl)
1878                         *flags |= LDLM_FL_LVB_READY;
1879         } else if (errcode == ELDLM_OK) {
1880                 *flags |= LDLM_FL_LVB_READY;
1881         }
1882
1883         /* Call the update callback. */
1884         rc = (*upcall)(cookie, lockh, errcode);
1885
1886         /* release the reference taken in ldlm_cli_enqueue() */
1887         if (errcode == ELDLM_LOCK_MATCHED)
1888                 errcode = ELDLM_OK;
1889         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1890                 ldlm_lock_decref(lockh, mode);
1891
1892         RETURN(rc);
1893 }
1894
1895 static int osc_enqueue_interpret(const struct lu_env *env,
1896                                  struct ptlrpc_request *req,
1897                                  struct osc_enqueue_args *aa, int rc)
1898 {
1899         struct ldlm_lock *lock;
1900         struct lustre_handle *lockh = &aa->oa_lockh;
1901         ldlm_mode_t mode = aa->oa_mode;
1902         struct ost_lvb *lvb = aa->oa_lvb;
1903         __u32 lvb_len = sizeof(*lvb);
1904         __u64 flags = 0;
1905
1906         ENTRY;
1907
1908         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1909          * be valid. */
1910         lock = ldlm_handle2lock(lockh);
1911         LASSERTF(lock != NULL,
1912                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1913                  lockh->cookie, req, aa);
1914
1915         /* Take an additional reference so that a blocking AST that
1916          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1917          * to arrive after an upcall has been executed by
1918          * osc_enqueue_fini(). */
1919         ldlm_lock_addref(lockh, mode);
1920
1921         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1922         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1923
1924         /* Let CP AST to grant the lock first. */
1925         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1926
1927         if (aa->oa_agl) {
1928                 LASSERT(aa->oa_lvb == NULL);
1929                 LASSERT(aa->oa_flags == NULL);
1930                 aa->oa_flags = &flags;
1931         }
1932
1933         /* Complete obtaining the lock procedure. */
1934         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1935                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1936                                    lockh, rc);
1937         /* Complete osc stuff. */
1938         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1939                               aa->oa_flags, aa->oa_agl, rc);
1940
1941         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1942
1943         ldlm_lock_decref(lockh, mode);
1944         LDLM_LOCK_PUT(lock);
1945         RETURN(rc);
1946 }
1947
1948 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1949
1950 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1951  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1952  * other synchronous requests, however keeping some locks and trying to obtain
1953  * others may take a considerable amount of time in a case of ost failure; and
1954  * when other sync requests do not get released lock from a client, the client
1955  * is evicted from the cluster -- such scenarious make the life difficult, so
1956  * release locks just after they are obtained. */
1957 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1958                      __u64 *flags, ldlm_policy_data_t *policy,
1959                      struct ost_lvb *lvb, int kms_valid,
1960                      osc_enqueue_upcall_f upcall, void *cookie,
1961                      struct ldlm_enqueue_info *einfo,
1962                      struct ptlrpc_request_set *rqset, int async, int agl)
1963 {
1964         struct obd_device *obd = exp->exp_obd;
1965         struct lustre_handle lockh = { 0 };
1966         struct ptlrpc_request *req = NULL;
1967         int intent = *flags & LDLM_FL_HAS_INTENT;
1968         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1969         ldlm_mode_t mode;
1970         int rc;
1971         ENTRY;
1972
1973         /* Filesystem lock extents are extended to page boundaries so that
1974          * dealing with the page cache is a little smoother.  */
1975         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1976         policy->l_extent.end |= ~PAGE_MASK;
1977
1978         /*
1979          * kms is not valid when either object is completely fresh (so that no
1980          * locks are cached), or object was evicted. In the latter case cached
1981          * lock cannot be used, because it would prime inode state with
1982          * potentially stale LVB.
1983          */
1984         if (!kms_valid)
1985                 goto no_match;
1986
1987         /* Next, search for already existing extent locks that will cover us */
1988         /* If we're trying to read, we also search for an existing PW lock.  The
1989          * VFS and page cache already protect us locally, so lots of readers/
1990          * writers can share a single PW lock.
1991          *
1992          * There are problems with conversion deadlocks, so instead of
1993          * converting a read lock to a write lock, we'll just enqueue a new
1994          * one.
1995          *
1996          * At some point we should cancel the read lock instead of making them
1997          * send us a blocking callback, but there are problems with canceling
1998          * locks out from other users right now, too. */
1999         mode = einfo->ei_mode;
2000         if (einfo->ei_mode == LCK_PR)
2001                 mode |= LCK_PW;
2002         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2003                                einfo->ei_type, policy, mode, &lockh, 0);
2004         if (mode) {
2005                 struct ldlm_lock *matched;
2006
2007                 if (*flags & LDLM_FL_TEST_LOCK)
2008                         RETURN(ELDLM_OK);
2009
2010                 matched = ldlm_handle2lock(&lockh);
2011                 if (agl) {
2012                         /* AGL enqueues DLM locks speculatively. Therefore if
2013                          * it already exists a DLM lock, it wll just inform the
2014                          * caller to cancel the AGL process for this stripe. */
2015                         ldlm_lock_decref(&lockh, mode);
2016                         LDLM_LOCK_PUT(matched);
2017                         RETURN(-ECANCELED);
2018                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2019                         *flags |= LDLM_FL_LVB_READY;
2020
2021                         /* We already have a lock, and it's referenced. */
2022                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2023
2024                         ldlm_lock_decref(&lockh, mode);
2025                         LDLM_LOCK_PUT(matched);
2026                         RETURN(ELDLM_OK);
2027                 } else {
2028                         ldlm_lock_decref(&lockh, mode);
2029                         LDLM_LOCK_PUT(matched);
2030                 }
2031         }
2032
2033 no_match:
2034         if (*flags & LDLM_FL_TEST_LOCK)
2035                 RETURN(-ENOLCK);
2036
2037         if (intent) {
2038                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2039                                            &RQF_LDLM_ENQUEUE_LVB);
2040                 if (req == NULL)
2041                         RETURN(-ENOMEM);
2042
2043                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2044                 if (rc < 0) {
2045                         ptlrpc_request_free(req);
2046                         RETURN(rc);
2047                 }
2048
2049                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2050                                      sizeof *lvb);
2051                 ptlrpc_request_set_replen(req);
2052         }
2053
2054         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2055         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2056
2057         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2058                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2059         if (async) {
2060                 if (!rc) {
2061                         struct osc_enqueue_args *aa;
2062                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2063                         aa = ptlrpc_req_async_args(req);
2064                         aa->oa_exp    = exp;
2065                         aa->oa_mode   = einfo->ei_mode;
2066                         aa->oa_type   = einfo->ei_type;
2067                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2068                         aa->oa_upcall = upcall;
2069                         aa->oa_cookie = cookie;
2070                         aa->oa_agl    = !!agl;
2071                         if (!agl) {
2072                                 aa->oa_flags  = flags;
2073                                 aa->oa_lvb    = lvb;
2074                         } else {
2075                                 /* AGL is essentially to enqueue an DLM lock
2076                                  * in advance, so we don't care about the
2077                                  * result of AGL enqueue. */
2078                                 aa->oa_lvb    = NULL;
2079                                 aa->oa_flags  = NULL;
2080                         }
2081
2082                         req->rq_interpret_reply =
2083                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2084                         if (rqset == PTLRPCD_SET)
2085                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2086                         else
2087                                 ptlrpc_set_add_req(rqset, req);
2088                 } else if (intent) {
2089                         ptlrpc_req_finished(req);
2090                 }
2091                 RETURN(rc);
2092         }
2093
2094         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2095                               flags, agl, rc);
2096         if (intent)
2097                 ptlrpc_req_finished(req);
2098
2099         RETURN(rc);
2100 }
2101
2102 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2103                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2104                    __u64 *flags, void *data, struct lustre_handle *lockh,
2105                    int unref)
2106 {
2107         struct obd_device *obd = exp->exp_obd;
2108         __u64 lflags = *flags;
2109         ldlm_mode_t rc;
2110         ENTRY;
2111
2112         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2113                 RETURN(-EIO);
2114
2115         /* Filesystem lock extents are extended to page boundaries so that
2116          * dealing with the page cache is a little smoother */
2117         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2118         policy->l_extent.end |= ~PAGE_MASK;
2119
2120         /* Next, search for already existing extent locks that will cover us */
2121         /* If we're trying to read, we also search for an existing PW lock.  The
2122          * VFS and page cache already protect us locally, so lots of readers/
2123          * writers can share a single PW lock. */
2124         rc = mode;
2125         if (mode == LCK_PR)
2126                 rc |= LCK_PW;
2127         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2128                              res_id, type, policy, rc, lockh, unref);
2129         if (rc) {
2130                 if (data != NULL) {
2131                         if (!osc_set_data_with_check(lockh, data)) {
2132                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2133                                         ldlm_lock_decref(lockh, rc);
2134                                 RETURN(0);
2135                         }
2136                 }
2137                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2138                         ldlm_lock_addref(lockh, LCK_PR);
2139                         ldlm_lock_decref(lockh, LCK_PW);
2140                 }
2141                 RETURN(rc);
2142         }
2143         RETURN(rc);
2144 }
2145
2146 static int osc_statfs_interpret(const struct lu_env *env,
2147                                 struct ptlrpc_request *req,
2148                                 struct osc_async_args *aa, int rc)
2149 {
2150         struct obd_statfs *msfs;
2151         ENTRY;
2152
2153         if (rc == -EBADR)
2154                 /* The request has in fact never been sent
2155                  * due to issues at a higher level (LOV).
2156                  * Exit immediately since the caller is
2157                  * aware of the problem and takes care
2158                  * of the clean up */
2159                  RETURN(rc);
2160
2161         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2162             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2163                 GOTO(out, rc = 0);
2164
2165         if (rc != 0)
2166                 GOTO(out, rc);
2167
2168         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2169         if (msfs == NULL) {
2170                 GOTO(out, rc = -EPROTO);
2171         }
2172
2173         *aa->aa_oi->oi_osfs = *msfs;
2174 out:
2175         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2176         RETURN(rc);
2177 }
2178
2179 static int osc_statfs_async(struct obd_export *exp,
2180                             struct obd_info *oinfo, __u64 max_age,
2181                             struct ptlrpc_request_set *rqset)
2182 {
2183         struct obd_device     *obd = class_exp2obd(exp);
2184         struct ptlrpc_request *req;
2185         struct osc_async_args *aa;
2186         int                    rc;
2187         ENTRY;
2188
2189         /* We could possibly pass max_age in the request (as an absolute
2190          * timestamp or a "seconds.usec ago") so the target can avoid doing
2191          * extra calls into the filesystem if that isn't necessary (e.g.
2192          * during mount that would help a bit).  Having relative timestamps
2193          * is not so great if request processing is slow, while absolute
2194          * timestamps are not ideal because they need time synchronization. */
2195         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2196         if (req == NULL)
2197                 RETURN(-ENOMEM);
2198
2199         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2200         if (rc) {
2201                 ptlrpc_request_free(req);
2202                 RETURN(rc);
2203         }
2204         ptlrpc_request_set_replen(req);
2205         req->rq_request_portal = OST_CREATE_PORTAL;
2206         ptlrpc_at_set_req_timeout(req);
2207
2208         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2209                 /* procfs requests not want stat in wait for avoid deadlock */
2210                 req->rq_no_resend = 1;
2211                 req->rq_no_delay = 1;
2212         }
2213
2214         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2215         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2216         aa = ptlrpc_req_async_args(req);
2217         aa->aa_oi = oinfo;
2218
2219         ptlrpc_set_add_req(rqset, req);
2220         RETURN(0);
2221 }
2222
2223 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2224                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2225 {
2226         struct obd_device     *obd = class_exp2obd(exp);
2227         struct obd_statfs     *msfs;
2228         struct ptlrpc_request *req;
2229         struct obd_import     *imp = NULL;
2230         int rc;
2231         ENTRY;
2232
2233         /*Since the request might also come from lprocfs, so we need
2234          *sync this with client_disconnect_export Bug15684*/
2235         down_read(&obd->u.cli.cl_sem);
2236         if (obd->u.cli.cl_import)
2237                 imp = class_import_get(obd->u.cli.cl_import);
2238         up_read(&obd->u.cli.cl_sem);
2239         if (!imp)
2240                 RETURN(-ENODEV);
2241
2242         /* We could possibly pass max_age in the request (as an absolute
2243          * timestamp or a "seconds.usec ago") so the target can avoid doing
2244          * extra calls into the filesystem if that isn't necessary (e.g.
2245          * during mount that would help a bit).  Having relative timestamps
2246          * is not so great if request processing is slow, while absolute
2247          * timestamps are not ideal because they need time synchronization. */
2248         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2249
2250         class_import_put(imp);
2251
2252         if (req == NULL)
2253                 RETURN(-ENOMEM);
2254
2255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2256         if (rc) {
2257                 ptlrpc_request_free(req);
2258                 RETURN(rc);
2259         }
2260         ptlrpc_request_set_replen(req);
2261         req->rq_request_portal = OST_CREATE_PORTAL;
2262         ptlrpc_at_set_req_timeout(req);
2263
2264         if (flags & OBD_STATFS_NODELAY) {
2265                 /* procfs requests not want stat in wait for avoid deadlock */
2266                 req->rq_no_resend = 1;
2267                 req->rq_no_delay = 1;
2268         }
2269
2270         rc = ptlrpc_queue_wait(req);
2271         if (rc)
2272                 GOTO(out, rc);
2273
2274         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2275         if (msfs == NULL) {
2276                 GOTO(out, rc = -EPROTO);
2277         }
2278
2279         *osfs = *msfs;
2280
2281         EXIT;
2282  out:
2283         ptlrpc_req_finished(req);
2284         return rc;
2285 }
2286
2287 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2288                          void *karg, void *uarg)
2289 {
2290         struct obd_device *obd = exp->exp_obd;
2291         struct obd_ioctl_data *data = karg;
2292         int err = 0;
2293         ENTRY;
2294
2295         if (!try_module_get(THIS_MODULE)) {
2296                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2297                        module_name(THIS_MODULE));
2298                 return -EINVAL;
2299         }
2300         switch (cmd) {
2301         case OBD_IOC_CLIENT_RECOVER:
2302                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2303                                             data->ioc_inlbuf1, 0);
2304                 if (err > 0)
2305                         err = 0;
2306                 GOTO(out, err);
2307         case IOC_OSC_SET_ACTIVE:
2308                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2309                                                data->ioc_offset);
2310                 GOTO(out, err);
2311         case OBD_IOC_PING_TARGET:
2312                 err = ptlrpc_obd_ping(obd);
2313                 GOTO(out, err);
2314         default:
2315                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2316                        cmd, current_comm());
2317                 GOTO(out, err = -ENOTTY);
2318         }
2319 out:
2320         module_put(THIS_MODULE);
2321         return err;
2322 }
2323
2324 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2325                               u32 keylen, void *key,
2326                               u32 vallen, void *val,
2327                               struct ptlrpc_request_set *set)
2328 {
2329         struct ptlrpc_request *req;
2330         struct obd_device     *obd = exp->exp_obd;
2331         struct obd_import     *imp = class_exp2cliimp(exp);
2332         char                  *tmp;
2333         int                    rc;
2334         ENTRY;
2335
2336         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2337
2338         if (KEY_IS(KEY_CHECKSUM)) {
2339                 if (vallen != sizeof(int))
2340                         RETURN(-EINVAL);
2341                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2342                 RETURN(0);
2343         }
2344
2345         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2346                 sptlrpc_conf_client_adapt(obd);
2347                 RETURN(0);
2348         }
2349
2350         if (KEY_IS(KEY_FLUSH_CTX)) {
2351                 sptlrpc_import_flush_my_ctx(imp);
2352                 RETURN(0);
2353         }
2354
2355         if (KEY_IS(KEY_CACHE_SET)) {
2356                 struct client_obd *cli = &obd->u.cli;
2357
2358                 LASSERT(cli->cl_cache == NULL); /* only once */
2359                 cli->cl_cache = (struct cl_client_cache *)val;
2360                 cl_cache_incref(cli->cl_cache);
2361                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2362
2363                 /* add this osc into entity list */
2364                 LASSERT(list_empty(&cli->cl_lru_osc));
2365                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2366                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2367                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2368
2369                 RETURN(0);
2370         }
2371
2372         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2373                 struct client_obd *cli = &obd->u.cli;
2374                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2375                 long target = *(long *)val;
2376
2377                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2378                 *(long *)val -= nr;
2379                 RETURN(0);
2380         }
2381
2382         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2383                 RETURN(-EINVAL);
2384
2385         /* We pass all other commands directly to OST. Since nobody calls osc
2386            methods directly and everybody is supposed to go through LOV, we
2387            assume lov checked invalid values for us.
2388            The only recognised values so far are evict_by_nid and mds_conn.
2389            Even if something bad goes through, we'd get a -EINVAL from OST
2390            anyway. */
2391
2392         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2393                                                 &RQF_OST_SET_GRANT_INFO :
2394                                                 &RQF_OBD_SET_INFO);
2395         if (req == NULL)
2396                 RETURN(-ENOMEM);
2397
2398         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2399                              RCL_CLIENT, keylen);
2400         if (!KEY_IS(KEY_GRANT_SHRINK))
2401                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2402                                      RCL_CLIENT, vallen);
2403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2404         if (rc) {
2405                 ptlrpc_request_free(req);
2406                 RETURN(rc);
2407         }
2408
2409         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2410         memcpy(tmp, key, keylen);
2411         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2412                                                         &RMF_OST_BODY :
2413                                                         &RMF_SETINFO_VAL);
2414         memcpy(tmp, val, vallen);
2415
2416         if (KEY_IS(KEY_GRANT_SHRINK)) {
2417                 struct osc_grant_args *aa;
2418                 struct obdo *oa;
2419
2420                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2421                 aa = ptlrpc_req_async_args(req);
2422                 OBDO_ALLOC(oa);
2423                 if (!oa) {
2424                         ptlrpc_req_finished(req);
2425                         RETURN(-ENOMEM);
2426                 }
2427                 *oa = ((struct ost_body *)val)->oa;
2428                 aa->aa_oa = oa;
2429                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2430         }
2431
2432         ptlrpc_request_set_replen(req);
2433         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2434                 LASSERT(set != NULL);
2435                 ptlrpc_set_add_req(set, req);
2436                 ptlrpc_check_set(NULL, set);
2437         } else
2438                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2439
2440         RETURN(0);
2441 }
2442
2443 static int osc_reconnect(const struct lu_env *env,
2444                          struct obd_export *exp, struct obd_device *obd,
2445                          struct obd_uuid *cluuid,
2446                          struct obd_connect_data *data,
2447                          void *localdata)
2448 {
2449         struct client_obd *cli = &obd->u.cli;
2450
2451         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2452                 long lost_grant;
2453
2454                 spin_lock(&cli->cl_loi_list_lock);
2455                 data->ocd_grant = (cli->cl_avail_grant +
2456                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2457                                   2 * cli_brw_size(obd);
2458                 lost_grant = cli->cl_lost_grant;
2459                 cli->cl_lost_grant = 0;
2460                 spin_unlock(&cli->cl_loi_list_lock);
2461
2462                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2463                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2464                        data->ocd_version, data->ocd_grant, lost_grant);
2465         }
2466
2467         RETURN(0);
2468 }
2469
2470 static int osc_disconnect(struct obd_export *exp)
2471 {
2472         struct obd_device *obd = class_exp2obd(exp);
2473         int rc;
2474
2475         rc = client_disconnect_export(exp);
2476         /**
2477          * Initially we put del_shrink_grant before disconnect_export, but it
2478          * causes the following problem if setup (connect) and cleanup
2479          * (disconnect) are tangled together.
2480          *      connect p1                     disconnect p2
2481          *   ptlrpc_connect_import
2482          *     ...............               class_manual_cleanup
2483          *                                     osc_disconnect
2484          *                                     del_shrink_grant
2485          *   ptlrpc_connect_interrupt
2486          *     init_grant_shrink
2487          *   add this client to shrink list
2488          *                                      cleanup_osc
2489          * Bang! pinger trigger the shrink.
2490          * So the osc should be disconnected from the shrink list, after we
2491          * are sure the import has been destroyed. BUG18662
2492          */
2493         if (obd->u.cli.cl_import == NULL)
2494                 osc_del_shrink_grant(&obd->u.cli);
2495         return rc;
2496 }
2497
2498 static int osc_import_event(struct obd_device *obd,
2499                             struct obd_import *imp,
2500                             enum obd_import_event event)
2501 {
2502         struct client_obd *cli;
2503         int rc = 0;
2504
2505         ENTRY;
2506         LASSERT(imp->imp_obd == obd);
2507
2508         switch (event) {
2509         case IMP_EVENT_DISCON: {
2510                 cli = &obd->u.cli;
2511                 spin_lock(&cli->cl_loi_list_lock);
2512                 cli->cl_avail_grant = 0;
2513                 cli->cl_lost_grant = 0;
2514                 spin_unlock(&cli->cl_loi_list_lock);
2515                 break;
2516         }
2517         case IMP_EVENT_INACTIVE: {
2518                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2519                 break;
2520         }
2521         case IMP_EVENT_INVALIDATE: {
2522                 struct ldlm_namespace *ns = obd->obd_namespace;
2523                 struct lu_env         *env;
2524                 int                    refcheck;
2525
2526                 env = cl_env_get(&refcheck);
2527                 if (!IS_ERR(env)) {
2528                         /* Reset grants */
2529                         cli = &obd->u.cli;
2530                         /* all pages go to failing rpcs due to the invalid
2531                          * import */
2532                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2533
2534                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2535                         cl_env_put(env, &refcheck);
2536                 } else
2537                         rc = PTR_ERR(env);
2538                 break;
2539         }
2540         case IMP_EVENT_ACTIVE: {
2541                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2542                 break;
2543         }
2544         case IMP_EVENT_OCD: {
2545                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2546
2547                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2548                         osc_init_grant(&obd->u.cli, ocd);
2549
2550                 /* See bug 7198 */
2551                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2552                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2553
2554                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2555                 break;
2556         }
2557         case IMP_EVENT_DEACTIVATE: {
2558                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2559                 break;
2560         }
2561         case IMP_EVENT_ACTIVATE: {
2562                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2563                 break;
2564         }
2565         default:
2566                 CERROR("Unknown import event %d\n", event);
2567                 LBUG();
2568         }
2569         RETURN(rc);
2570 }
2571
2572 /**
2573  * Determine whether the lock can be canceled before replaying the lock
2574  * during recovery, see bug16774 for detailed information.
2575  *
2576  * \retval zero the lock can't be canceled
2577  * \retval other ok to cancel
2578  */
2579 static int osc_cancel_weight(struct ldlm_lock *lock)
2580 {
2581         /*
2582          * Cancel all unused and granted extent lock.
2583          */
2584         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2585             lock->l_granted_mode == lock->l_req_mode &&
2586             osc_ldlm_weigh_ast(lock) == 0)
2587                 RETURN(1);
2588
2589         RETURN(0);
2590 }
2591
2592 static int brw_queue_work(const struct lu_env *env, void *data)
2593 {
2594         struct client_obd *cli = data;
2595
2596         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2597
2598         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2599         RETURN(0);
2600 }
2601
2602 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2603 {
2604         struct client_obd *cli = &obd->u.cli;
2605         struct obd_type   *type;
2606         void              *handler;
2607         int                rc;
2608         ENTRY;
2609
2610         rc = ptlrpcd_addref();
2611         if (rc)
2612                 RETURN(rc);
2613
2614         rc = client_obd_setup(obd, lcfg);
2615         if (rc)
2616                 GOTO(out_ptlrpcd, rc);
2617
2618         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2619         if (IS_ERR(handler))
2620                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2621         cli->cl_writeback_work = handler;
2622
2623         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2624         if (IS_ERR(handler))
2625                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2626         cli->cl_lru_work = handler;
2627
2628         rc = osc_quota_setup(obd);
2629         if (rc)
2630                 GOTO(out_ptlrpcd_work, rc);
2631
2632         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2633
2634 #ifdef CONFIG_PROC_FS
2635         obd->obd_vars = lprocfs_osc_obd_vars;
2636 #endif
2637         /* If this is true then both client (osc) and server (osp) are on the
2638          * same node. The osp layer if loaded first will register the osc proc
2639          * directory. In that case this obd_device will be attached its proc
2640          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2641         type = class_search_type(LUSTRE_OSP_NAME);
2642         if (type && type->typ_procsym) {
2643                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2644                                                        type->typ_procsym,
2645                                                        obd->obd_vars, obd);
2646                 if (IS_ERR(obd->obd_proc_entry)) {
2647                         rc = PTR_ERR(obd->obd_proc_entry);
2648                         CERROR("error %d setting up lprocfs for %s\n", rc,
2649                                obd->obd_name);
2650                         obd->obd_proc_entry = NULL;
2651                 }
2652         } else {
2653                 rc = lprocfs_obd_setup(obd);
2654         }
2655
2656         /* If the basic OSC proc tree construction succeeded then
2657          * lets do the rest. */
2658         if (rc == 0) {
2659                 lproc_osc_attach_seqstat(obd);
2660                 sptlrpc_lprocfs_cliobd_attach(obd);
2661                 ptlrpc_lprocfs_register_obd(obd);
2662         }
2663
2664         /* We need to allocate a few requests more, because
2665          * brw_interpret tries to create new requests before freeing
2666          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2667          * reserved, but I'm afraid that might be too much wasted RAM
2668          * in fact, so 2 is just my guess and still should work. */
2669         cli->cl_import->imp_rq_pool =
2670                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2671                                     OST_MAXREQSIZE,
2672                                     ptlrpc_add_rqs_to_pool);
2673
2674         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2675         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2676         RETURN(0);
2677
2678 out_ptlrpcd_work:
2679         if (cli->cl_writeback_work != NULL) {
2680                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2681                 cli->cl_writeback_work = NULL;
2682         }
2683         if (cli->cl_lru_work != NULL) {
2684                 ptlrpcd_destroy_work(cli->cl_lru_work);
2685                 cli->cl_lru_work = NULL;
2686         }
2687 out_client_setup:
2688         client_obd_cleanup(obd);
2689 out_ptlrpcd:
2690         ptlrpcd_decref();
2691         RETURN(rc);
2692 }
2693
2694 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2695 {
2696         int rc = 0;
2697         ENTRY;
2698
2699         switch (stage) {
2700         case OBD_CLEANUP_EARLY: {
2701                 struct obd_import *imp;
2702                 imp = obd->u.cli.cl_import;
2703                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2704                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2705                 ptlrpc_deactivate_import(imp);
2706                 spin_lock(&imp->imp_lock);
2707                 imp->imp_pingable = 0;
2708                 spin_unlock(&imp->imp_lock);
2709                 break;
2710         }
2711         case OBD_CLEANUP_EXPORTS: {
2712                 struct client_obd *cli = &obd->u.cli;
2713                 /* LU-464
2714                  * for echo client, export may be on zombie list, wait for
2715                  * zombie thread to cull it, because cli.cl_import will be
2716                  * cleared in client_disconnect_export():
2717                  *   class_export_destroy() -> obd_cleanup() ->
2718                  *   echo_device_free() -> echo_client_cleanup() ->
2719                  *   obd_disconnect() -> osc_disconnect() ->
2720                  *   client_disconnect_export()
2721                  */
2722                 obd_zombie_barrier();
2723                 if (cli->cl_writeback_work) {
2724                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2725                         cli->cl_writeback_work = NULL;
2726                 }
2727                 if (cli->cl_lru_work) {
2728                         ptlrpcd_destroy_work(cli->cl_lru_work);
2729                         cli->cl_lru_work = NULL;
2730                 }
2731                 obd_cleanup_client_import(obd);
2732                 ptlrpc_lprocfs_unregister_obd(obd);
2733                 lprocfs_obd_cleanup(obd);
2734                 break;
2735                 }
2736         }
2737         RETURN(rc);
2738 }
2739
2740 int osc_cleanup(struct obd_device *obd)
2741 {
2742         struct client_obd *cli = &obd->u.cli;
2743         int rc;
2744
2745         ENTRY;
2746
2747         /* lru cleanup */
2748         if (cli->cl_cache != NULL) {
2749                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2750                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2751                 list_del_init(&cli->cl_lru_osc);
2752                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2753                 cli->cl_lru_left = NULL;
2754                 cl_cache_decref(cli->cl_cache);
2755                 cli->cl_cache = NULL;
2756         }
2757
2758         /* free memory of osc quota cache */
2759         osc_quota_cleanup(obd);
2760
2761         rc = client_obd_cleanup(obd);
2762
2763         ptlrpcd_decref();
2764         RETURN(rc);
2765 }
2766
2767 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2768 {
2769         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2770         return rc > 0 ? 0: rc;
2771 }
2772
2773 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2774 {
2775         return osc_process_config_base(obd, buf);
2776 }
2777
2778 static struct obd_ops osc_obd_ops = {
2779         .o_owner                = THIS_MODULE,
2780         .o_setup                = osc_setup,
2781         .o_precleanup           = osc_precleanup,
2782         .o_cleanup              = osc_cleanup,
2783         .o_add_conn             = client_import_add_conn,
2784         .o_del_conn             = client_import_del_conn,
2785         .o_connect              = client_connect_import,
2786         .o_reconnect            = osc_reconnect,
2787         .o_disconnect           = osc_disconnect,
2788         .o_statfs               = osc_statfs,
2789         .o_statfs_async         = osc_statfs_async,
2790         .o_create               = osc_create,
2791         .o_destroy              = osc_destroy,
2792         .o_getattr              = osc_getattr,
2793         .o_setattr              = osc_setattr,
2794         .o_iocontrol            = osc_iocontrol,
2795         .o_set_info_async       = osc_set_info_async,
2796         .o_import_event         = osc_import_event,
2797         .o_process_config       = osc_process_config,
2798         .o_quotactl             = osc_quotactl,
2799 };
2800
2801 static int __init osc_init(void)
2802 {
2803         bool enable_proc = true;
2804         struct obd_type *type;
2805         int rc;
2806         ENTRY;
2807
2808         /* print an address of _any_ initialized kernel symbol from this
2809          * module, to allow debugging with gdb that doesn't support data
2810          * symbols from modules.*/
2811         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2812
2813         rc = lu_kmem_init(osc_caches);
2814         if (rc)
2815                 RETURN(rc);
2816
2817         type = class_search_type(LUSTRE_OSP_NAME);
2818         if (type != NULL && type->typ_procsym != NULL)
2819                 enable_proc = false;
2820
2821         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2822                                  LUSTRE_OSC_NAME, &osc_device_type);
2823         if (rc) {
2824                 lu_kmem_fini(osc_caches);
2825                 RETURN(rc);
2826         }
2827
2828         RETURN(rc);
2829 }
2830
2831 static void /*__exit*/ osc_exit(void)
2832 {
2833         class_unregister_type(LUSTRE_OSC_NAME);
2834         lu_kmem_fini(osc_caches);
2835 }
2836
2837 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2838 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2839 MODULE_VERSION(LUSTRE_VERSION_STRING);
2840 MODULE_LICENSE("GPL");
2841
2842 module_init(osc_init);
2843 module_exit(osc_exit);