Whamcloud - gitweb
LU-7034 obd: Remove dead code in precleanup
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include <obd.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66         struct obdo              *aa_oa;
67         int                       aa_requested_nob;
68         int                       aa_nio_count;
69         u32                       aa_page_count;
70         int                       aa_resends;
71         struct brw_page **aa_ppga;
72         struct client_obd        *aa_cli;
73         struct list_head          aa_oaps;
74         struct list_head          aa_exts;
75 };
76
77 #define osc_grant_args osc_brw_async_args
78
79 struct osc_setattr_args {
80         struct obdo             *sa_oa;
81         obd_enqueue_update_f     sa_upcall;
82         void                    *sa_cookie;
83 };
84
85 struct osc_fsync_args {
86         struct osc_object       *fa_obj;
87         struct obdo             *fa_oa;
88         obd_enqueue_update_f    fa_upcall;
89         void                    *fa_cookie;
90 };
91
92 struct osc_enqueue_args {
93         struct obd_export       *oa_exp;
94         enum ldlm_type          oa_type;
95         enum ldlm_mode          oa_mode;
96         __u64                   *oa_flags;
97         osc_enqueue_upcall_f    oa_upcall;
98         void                    *oa_cookie;
99         struct ost_lvb          *oa_lvb;
100         struct lustre_handle    oa_lockh;
101         unsigned int            oa_agl:1;
102 };
103
104 static void osc_release_ppga(struct brw_page **ppga, size_t count);
105 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
106                          void *data, int rc);
107
108 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
109 {
110         struct ost_body *body;
111
112         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
113         LASSERT(body);
114
115         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
116 }
117
118 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
119                        struct obdo *oa)
120 {
121         struct ptlrpc_request   *req;
122         struct ost_body         *body;
123         int                      rc;
124
125         ENTRY;
126         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
127         if (req == NULL)
128                 RETURN(-ENOMEM);
129
130         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
131         if (rc) {
132                 ptlrpc_request_free(req);
133                 RETURN(rc);
134         }
135
136         osc_pack_req_body(req, oa);
137
138         ptlrpc_request_set_replen(req);
139
140         rc = ptlrpc_queue_wait(req);
141         if (rc)
142                 GOTO(out, rc);
143
144         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
145         if (body == NULL)
146                 GOTO(out, rc = -EPROTO);
147
148         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
149         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
150
151         oa->o_blksize = cli_brw_size(exp->exp_obd);
152         oa->o_valid |= OBD_MD_FLBLKSZ;
153
154         EXIT;
155 out:
156         ptlrpc_req_finished(req);
157
158         return rc;
159 }
160
161 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
162                        struct obdo *oa)
163 {
164         struct ptlrpc_request   *req;
165         struct ost_body         *body;
166         int                      rc;
167
168         ENTRY;
169         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
170
171         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
172         if (req == NULL)
173                 RETURN(-ENOMEM);
174
175         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
176         if (rc) {
177                 ptlrpc_request_free(req);
178                 RETURN(rc);
179         }
180
181         osc_pack_req_body(req, oa);
182
183         ptlrpc_request_set_replen(req);
184
185         rc = ptlrpc_queue_wait(req);
186         if (rc)
187                 GOTO(out, rc);
188
189         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
190         if (body == NULL)
191                 GOTO(out, rc = -EPROTO);
192
193         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
194
195         EXIT;
196 out:
197         ptlrpc_req_finished(req);
198
199         RETURN(rc);
200 }
201
202 static int osc_setattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_setattr_args *sa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body == NULL)
214                 GOTO(out, rc = -EPROTO);
215
216         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
217                              &body->oa);
218 out:
219         rc = sa->sa_upcall(sa->sa_cookie, rc);
220         RETURN(rc);
221 }
222
223 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
224                       obd_enqueue_update_f upcall, void *cookie,
225                       struct ptlrpc_request_set *rqset)
226 {
227         struct ptlrpc_request   *req;
228         struct osc_setattr_args *sa;
229         int                      rc;
230
231         ENTRY;
232
233         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
234         if (req == NULL)
235                 RETURN(-ENOMEM);
236
237         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
238         if (rc) {
239                 ptlrpc_request_free(req);
240                 RETURN(rc);
241         }
242
243         osc_pack_req_body(req, oa);
244
245         ptlrpc_request_set_replen(req);
246
247         /* do mds to ost setattr asynchronously */
248         if (!rqset) {
249                 /* Do not wait for response. */
250                 ptlrpcd_add_req(req);
251         } else {
252                 req->rq_interpret_reply =
253                         (ptlrpc_interpterer_t)osc_setattr_interpret;
254
255                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
256                 sa = ptlrpc_req_async_args(req);
257                 sa->sa_oa = oa;
258                 sa->sa_upcall = upcall;
259                 sa->sa_cookie = cookie;
260
261                 if (rqset == PTLRPCD_SET)
262                         ptlrpcd_add_req(req);
263                 else
264                         ptlrpc_set_add_req(rqset, req);
265         }
266
267         RETURN(0);
268 }
269
270 static int osc_create(const struct lu_env *env, struct obd_export *exp,
271                       struct obdo *oa)
272 {
273         struct ptlrpc_request *req;
274         struct ost_body       *body;
275         int                    rc;
276         ENTRY;
277
278         LASSERT(oa != NULL);
279         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
280         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
283         if (req == NULL)
284                 GOTO(out, rc = -ENOMEM);
285
286         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
287         if (rc) {
288                 ptlrpc_request_free(req);
289                 GOTO(out, rc);
290         }
291
292         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
293         LASSERT(body);
294
295         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
296
297         ptlrpc_request_set_replen(req);
298
299         rc = ptlrpc_queue_wait(req);
300         if (rc)
301                 GOTO(out_req, rc);
302
303         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
304         if (body == NULL)
305                 GOTO(out_req, rc = -EPROTO);
306
307         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
308         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
309
310         oa->o_blksize = cli_brw_size(exp->exp_obd);
311         oa->o_valid |= OBD_MD_FLBLKSZ;
312
313         CDEBUG(D_HA, "transno: "LPD64"\n",
314                lustre_msg_get_transno(req->rq_repmsg));
315 out_req:
316         ptlrpc_req_finished(req);
317 out:
318         RETURN(rc);
319 }
320
321 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
322                    obd_enqueue_update_f upcall, void *cookie,
323                    struct ptlrpc_request_set *rqset)
324 {
325         struct ptlrpc_request   *req;
326         struct osc_setattr_args *sa;
327         struct ost_body         *body;
328         int                      rc;
329         ENTRY;
330
331         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
332         if (req == NULL)
333                 RETURN(-ENOMEM);
334
335         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
336         if (rc) {
337                 ptlrpc_request_free(req);
338                 RETURN(rc);
339         }
340         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
341         ptlrpc_at_set_req_timeout(req);
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
346
347         ptlrpc_request_set_replen(req);
348
349         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
350         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
351         sa = ptlrpc_req_async_args(req);
352         sa->sa_oa = oa;
353         sa->sa_upcall = upcall;
354         sa->sa_cookie = cookie;
355         if (rqset == PTLRPCD_SET)
356                 ptlrpcd_add_req(req);
357         else
358                 ptlrpc_set_add_req(rqset, req);
359
360         RETURN(0);
361 }
362
363 static int osc_sync_interpret(const struct lu_env *env,
364                               struct ptlrpc_request *req,
365                               void *arg, int rc)
366 {
367         struct osc_fsync_args   *fa = arg;
368         struct ost_body         *body;
369         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
370         unsigned long           valid = 0;
371         struct cl_object        *obj;
372         ENTRY;
373
374         if (rc != 0)
375                 GOTO(out, rc);
376
377         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
378         if (body == NULL) {
379                 CERROR("can't unpack ost_body\n");
380                 GOTO(out, rc = -EPROTO);
381         }
382
383         *fa->fa_oa = body->oa;
384         obj = osc2cl(fa->fa_obj);
385
386         /* Update osc object's blocks attribute */
387         cl_object_attr_lock(obj);
388         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
389                 attr->cat_blocks = body->oa.o_blocks;
390                 valid |= CAT_BLOCKS;
391         }
392
393         if (valid != 0)
394                 cl_object_attr_update(env, obj, attr, valid);
395         cl_object_attr_unlock(obj);
396
397 out:
398         rc = fa->fa_upcall(fa->fa_cookie, rc);
399         RETURN(rc);
400 }
401
402 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
403                   obd_enqueue_update_f upcall, void *cookie,
404                   struct ptlrpc_request_set *rqset)
405 {
406         struct obd_export     *exp = osc_export(obj);
407         struct ptlrpc_request *req;
408         struct ost_body       *body;
409         struct osc_fsync_args *fa;
410         int                    rc;
411         ENTRY;
412
413         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
414         if (req == NULL)
415                 RETURN(-ENOMEM);
416
417         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(rc);
421         }
422
423         /* overload the size and blocks fields in the oa with start/end */
424         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
425         LASSERT(body);
426         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
427
428         ptlrpc_request_set_replen(req);
429         req->rq_interpret_reply = osc_sync_interpret;
430
431         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
432         fa = ptlrpc_req_async_args(req);
433         fa->fa_obj = obj;
434         fa->fa_oa = oa;
435         fa->fa_upcall = upcall;
436         fa->fa_cookie = cookie;
437
438         if (rqset == PTLRPCD_SET)
439                 ptlrpcd_add_req(req);
440         else
441                 ptlrpc_set_add_req(rqset, req);
442
443         RETURN (0);
444 }
445
446 /* Find and cancel locally locks matched by @mode in the resource found by
447  * @objid. Found locks are added into @cancel list. Returns the amount of
448  * locks added to @cancels list. */
449 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
450                                    struct list_head *cancels,
451                                    enum ldlm_mode mode, __u64 lock_flags)
452 {
453         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
454         struct ldlm_res_id res_id;
455         struct ldlm_resource *res;
456         int count;
457         ENTRY;
458
459         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
460          * export) but disabled through procfs (flag in NS).
461          *
462          * This distinguishes from a case when ELC is not supported originally,
463          * when we still want to cancel locks in advance and just cancel them
464          * locally, without sending any RPC. */
465         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
466                 RETURN(0);
467
468         ostid_build_res_name(&oa->o_oi, &res_id);
469         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
470         if (IS_ERR(res))
471                 RETURN(0);
472
473         LDLM_RESOURCE_ADDREF(res);
474         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
475                                            lock_flags, 0, NULL);
476         LDLM_RESOURCE_DELREF(res);
477         ldlm_resource_putref(res);
478         RETURN(count);
479 }
480
481 static int osc_destroy_interpret(const struct lu_env *env,
482                                  struct ptlrpc_request *req, void *data,
483                                  int rc)
484 {
485         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
486
487         atomic_dec(&cli->cl_destroy_in_flight);
488         wake_up(&cli->cl_destroy_waitq);
489         return 0;
490 }
491
492 static int osc_can_send_destroy(struct client_obd *cli)
493 {
494         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
495             cli->cl_max_rpcs_in_flight) {
496                 /* The destroy request can be sent */
497                 return 1;
498         }
499         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
500             cli->cl_max_rpcs_in_flight) {
501                 /*
502                  * The counter has been modified between the two atomic
503                  * operations.
504                  */
505                 wake_up(&cli->cl_destroy_waitq);
506         }
507         return 0;
508 }
509
510 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
511                        struct obdo *oa)
512 {
513         struct client_obd     *cli = &exp->exp_obd->u.cli;
514         struct ptlrpc_request *req;
515         struct ost_body       *body;
516         struct list_head       cancels = LIST_HEAD_INIT(cancels);
517         int rc, count;
518         ENTRY;
519
520         if (!oa) {
521                 CDEBUG(D_INFO, "oa NULL\n");
522                 RETURN(-EINVAL);
523         }
524
525         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
526                                         LDLM_FL_DISCARD_DATA);
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
529         if (req == NULL) {
530                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
531                 RETURN(-ENOMEM);
532         }
533
534         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
535                                0, &cancels, count);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 RETURN(rc);
539         }
540
541         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542         ptlrpc_at_set_req_timeout(req);
543
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549
550         req->rq_interpret_reply = osc_destroy_interpret;
551         if (!osc_can_send_destroy(cli)) {
552                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
553
554                 /*
555                  * Wait until the number of on-going destroy RPCs drops
556                  * under max_rpc_in_flight
557                  */
558                 l_wait_event_exclusive(cli->cl_destroy_waitq,
559                                        osc_can_send_destroy(cli), &lwi);
560         }
561
562         /* Do not wait for response */
563         ptlrpcd_add_req(req);
564         RETURN(0);
565 }
566
567 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
568                                 long writing_bytes)
569 {
570         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
571
572         LASSERT(!(oa->o_valid & bits));
573
574         oa->o_valid |= bits;
575         spin_lock(&cli->cl_loi_list_lock);
576         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
577         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
578                      cli->cl_dirty_max_pages)) {
579                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
580                        cli->cl_dirty_pages, cli->cl_dirty_transit,
581                        cli->cl_dirty_max_pages);
582                 oa->o_undirty = 0;
583         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
584                             atomic_long_read(&obd_dirty_transit_pages) >
585                             (long)(obd_max_dirty_pages + 1))) {
586                 /* The atomic_read() allowing the atomic_inc() are
587                  * not covered by a lock thus they may safely race and trip
588                  * this CERROR() unless we add in a small fudge factor (+1). */
589                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
590                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
591                        atomic_long_read(&obd_dirty_transit_pages),
592                        obd_max_dirty_pages);
593                 oa->o_undirty = 0;
594         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
595                             0x7fffffff)) {
596                 CERROR("dirty %lu - dirty_max %lu too big???\n",
597                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
598                 oa->o_undirty = 0;
599         } else {
600                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
601                                       PAGE_CACHE_SHIFT) *
602                                      (cli->cl_max_rpcs_in_flight + 1);
603                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
604                                     max_in_flight);
605         }
606         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
607         oa->o_dropped = cli->cl_lost_grant;
608         cli->cl_lost_grant = 0;
609         spin_unlock(&cli->cl_loi_list_lock);
610         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
611                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
612
613 }
614
615 void osc_update_next_shrink(struct client_obd *cli)
616 {
617         cli->cl_next_shrink_grant =
618                 cfs_time_shift(cli->cl_grant_shrink_interval);
619         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
620                cli->cl_next_shrink_grant);
621 }
622
623 static void __osc_update_grant(struct client_obd *cli, u64 grant)
624 {
625         spin_lock(&cli->cl_loi_list_lock);
626         cli->cl_avail_grant += grant;
627         spin_unlock(&cli->cl_loi_list_lock);
628 }
629
630 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
631 {
632         if (body->oa.o_valid & OBD_MD_FLGRANT) {
633                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
634                 __osc_update_grant(cli, body->oa.o_grant);
635         }
636 }
637
638 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
639                               u32 keylen, void *key,
640                               u32 vallen, void *val,
641                               struct ptlrpc_request_set *set);
642
643 static int osc_shrink_grant_interpret(const struct lu_env *env,
644                                       struct ptlrpc_request *req,
645                                       void *aa, int rc)
646 {
647         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
648         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
649         struct ost_body *body;
650
651         if (rc != 0) {
652                 __osc_update_grant(cli, oa->o_grant);
653                 GOTO(out, rc);
654         }
655
656         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
657         LASSERT(body);
658         osc_update_grant(cli, body);
659 out:
660         OBDO_FREE(oa);
661         return rc;
662 }
663
664 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
665 {
666         spin_lock(&cli->cl_loi_list_lock);
667         oa->o_grant = cli->cl_avail_grant / 4;
668         cli->cl_avail_grant -= oa->o_grant;
669         spin_unlock(&cli->cl_loi_list_lock);
670         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
671                 oa->o_valid |= OBD_MD_FLFLAGS;
672                 oa->o_flags = 0;
673         }
674         oa->o_flags |= OBD_FL_SHRINK_GRANT;
675         osc_update_next_shrink(cli);
676 }
677
678 /* Shrink the current grant, either from some large amount to enough for a
679  * full set of in-flight RPCs, or if we have already shrunk to that limit
680  * then to enough for a single RPC.  This avoids keeping more grant than
681  * needed, and avoids shrinking the grant piecemeal. */
682 static int osc_shrink_grant(struct client_obd *cli)
683 {
684         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
685                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
686
687         spin_lock(&cli->cl_loi_list_lock);
688         if (cli->cl_avail_grant <= target_bytes)
689                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
690         spin_unlock(&cli->cl_loi_list_lock);
691
692         return osc_shrink_grant_to_target(cli, target_bytes);
693 }
694
695 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
696 {
697         int                     rc = 0;
698         struct ost_body        *body;
699         ENTRY;
700
701         spin_lock(&cli->cl_loi_list_lock);
702         /* Don't shrink if we are already above or below the desired limit
703          * We don't want to shrink below a single RPC, as that will negatively
704          * impact block allocation and long-term performance. */
705         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
706                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
707
708         if (target_bytes >= cli->cl_avail_grant) {
709                 spin_unlock(&cli->cl_loi_list_lock);
710                 RETURN(0);
711         }
712         spin_unlock(&cli->cl_loi_list_lock);
713
714         OBD_ALLOC_PTR(body);
715         if (!body)
716                 RETURN(-ENOMEM);
717
718         osc_announce_cached(cli, &body->oa, 0);
719
720         spin_lock(&cli->cl_loi_list_lock);
721         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
722         cli->cl_avail_grant = target_bytes;
723         spin_unlock(&cli->cl_loi_list_lock);
724         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
725                 body->oa.o_valid |= OBD_MD_FLFLAGS;
726                 body->oa.o_flags = 0;
727         }
728         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
729         osc_update_next_shrink(cli);
730
731         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
732                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
733                                 sizeof(*body), body, NULL);
734         if (rc != 0)
735                 __osc_update_grant(cli, body->oa.o_grant);
736         OBD_FREE_PTR(body);
737         RETURN(rc);
738 }
739
740 static int osc_should_shrink_grant(struct client_obd *client)
741 {
742         cfs_time_t time = cfs_time_current();
743         cfs_time_t next_shrink = client->cl_next_shrink_grant;
744
745         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
746              OBD_CONNECT_GRANT_SHRINK) == 0)
747                 return 0;
748
749         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
750                 /* Get the current RPC size directly, instead of going via:
751                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
752                  * Keep comment here so that it can be found by searching. */
753                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
754
755                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
756                     client->cl_avail_grant > brw_size)
757                         return 1;
758                 else
759                         osc_update_next_shrink(client);
760         }
761         return 0;
762 }
763
764 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
765 {
766         struct client_obd *client;
767
768         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
769                 if (osc_should_shrink_grant(client))
770                         osc_shrink_grant(client);
771         }
772         return 0;
773 }
774
775 static int osc_add_shrink_grant(struct client_obd *client)
776 {
777         int rc;
778
779         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
780                                        TIMEOUT_GRANT,
781                                        osc_grant_shrink_grant_cb, NULL,
782                                        &client->cl_grant_shrink_list);
783         if (rc) {
784                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
785                 return rc;
786         }
787         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
788         osc_update_next_shrink(client);
789         return 0;
790 }
791
792 static int osc_del_shrink_grant(struct client_obd *client)
793 {
794         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
795                                          TIMEOUT_GRANT);
796 }
797
798 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
799 {
800         /*
801          * ocd_grant is the total grant amount we're expect to hold: if we've
802          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
803          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
804          * dirty.
805          *
806          * race is tolerable here: if we're evicted, but imp_state already
807          * left EVICTED state, then cl_dirty_pages must be 0 already.
808          */
809         spin_lock(&cli->cl_loi_list_lock);
810         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
811                 cli->cl_avail_grant = ocd->ocd_grant;
812         else
813                 cli->cl_avail_grant = ocd->ocd_grant -
814                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
815
816         if (cli->cl_avail_grant < 0) {
817                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
818                       cli_name(cli), cli->cl_avail_grant,
819                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
820                 /* workaround for servers which do not have the patch from
821                  * LU-2679 */
822                 cli->cl_avail_grant = ocd->ocd_grant;
823         }
824
825         /* determine the appropriate chunk size used by osc_extent. */
826         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
827         spin_unlock(&cli->cl_loi_list_lock);
828
829         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
830                "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
831                cli->cl_lost_grant, cli->cl_chunkbits);
832
833         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
834             list_empty(&cli->cl_grant_shrink_list))
835                 osc_add_shrink_grant(cli);
836 }
837
838 /* We assume that the reason this OSC got a short read is because it read
839  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
840  * via the LOV, and it _knows_ it's reading inside the file, it's just that
841  * this stripe never got written at or beyond this stripe offset yet. */
842 static void handle_short_read(int nob_read, size_t page_count,
843                               struct brw_page **pga)
844 {
845         char *ptr;
846         int i = 0;
847
848         /* skip bytes read OK */
849         while (nob_read > 0) {
850                 LASSERT (page_count > 0);
851
852                 if (pga[i]->count > nob_read) {
853                         /* EOF inside this page */
854                         ptr = kmap(pga[i]->pg) +
855                                 (pga[i]->off & ~PAGE_MASK);
856                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
857                         kunmap(pga[i]->pg);
858                         page_count--;
859                         i++;
860                         break;
861                 }
862
863                 nob_read -= pga[i]->count;
864                 page_count--;
865                 i++;
866         }
867
868         /* zero remaining pages */
869         while (page_count-- > 0) {
870                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
871                 memset(ptr, 0, pga[i]->count);
872                 kunmap(pga[i]->pg);
873                 i++;
874         }
875 }
876
877 static int check_write_rcs(struct ptlrpc_request *req,
878                            int requested_nob, int niocount,
879                            size_t page_count, struct brw_page **pga)
880 {
881         int     i;
882         __u32   *remote_rcs;
883
884         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
885                                                   sizeof(*remote_rcs) *
886                                                   niocount);
887         if (remote_rcs == NULL) {
888                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
889                 return(-EPROTO);
890         }
891
892         /* return error if any niobuf was in error */
893         for (i = 0; i < niocount; i++) {
894                 if ((int)remote_rcs[i] < 0)
895                         return(remote_rcs[i]);
896
897                 if (remote_rcs[i] != 0) {
898                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
899                                 i, remote_rcs[i], req);
900                         return(-EPROTO);
901                 }
902         }
903
904         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
905                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
906                        req->rq_bulk->bd_nob_transferred, requested_nob);
907                 return(-EPROTO);
908         }
909
910         return (0);
911 }
912
913 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
914 {
915         if (p1->flag != p2->flag) {
916                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
917                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
918                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
919
920                 /* warn if we try to combine flags that we don't know to be
921                  * safe to combine */
922                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
923                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
924                               "report this at https://jira.hpdd.intel.com/\n",
925                               p1->flag, p2->flag);
926                 }
927                 return 0;
928         }
929
930         return (p1->off + p1->count == p2->off);
931 }
932
933 static u32 osc_checksum_bulk(int nob, size_t pg_count,
934                              struct brw_page **pga, int opc,
935                              cksum_type_t cksum_type)
936 {
937         u32                             cksum;
938         int                             i = 0;
939         struct cfs_crypto_hash_desc     *hdesc;
940         unsigned int                    bufsize;
941         int                             err;
942         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
943
944         LASSERT(pg_count > 0);
945
946         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
947         if (IS_ERR(hdesc)) {
948                 CERROR("Unable to initialize checksum hash %s\n",
949                        cfs_crypto_hash_name(cfs_alg));
950                 return PTR_ERR(hdesc);
951         }
952
953         while (nob > 0 && pg_count > 0) {
954                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
955
956                 /* corrupt the data before we compute the checksum, to
957                  * simulate an OST->client data error */
958                 if (i == 0 && opc == OST_READ &&
959                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
960                         unsigned char *ptr = kmap(pga[i]->pg);
961                         int off = pga[i]->off & ~PAGE_MASK;
962
963                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
964                         kunmap(pga[i]->pg);
965                 }
966                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
967                                             pga[i]->off & ~PAGE_MASK,
968                                             count);
969                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
970                                (int)(pga[i]->off & ~PAGE_MASK));
971
972                 nob -= pga[i]->count;
973                 pg_count--;
974                 i++;
975         }
976
977         bufsize = sizeof(cksum);
978         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
979
980         /* For sending we only compute the wrong checksum instead
981          * of corrupting the data so it is still correct on a redo */
982         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
983                 cksum++;
984
985         return cksum;
986 }
987
988 static int
989 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
990                      u32 page_count, struct brw_page **pga,
991                      struct ptlrpc_request **reqp, int resend)
992 {
993         struct ptlrpc_request   *req;
994         struct ptlrpc_bulk_desc *desc;
995         struct ost_body         *body;
996         struct obd_ioobj        *ioobj;
997         struct niobuf_remote    *niobuf;
998         int niocount, i, requested_nob, opc, rc;
999         struct osc_brw_async_args *aa;
1000         struct req_capsule      *pill;
1001         struct brw_page *pg_prev;
1002
1003         ENTRY;
1004         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1005                 RETURN(-ENOMEM); /* Recoverable */
1006         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1007                 RETURN(-EINVAL); /* Fatal */
1008
1009         if ((cmd & OBD_BRW_WRITE) != 0) {
1010                 opc = OST_WRITE;
1011                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1012                                                 osc_rq_pool,
1013                                                 &RQF_OST_BRW_WRITE);
1014         } else {
1015                 opc = OST_READ;
1016                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1017         }
1018         if (req == NULL)
1019                 RETURN(-ENOMEM);
1020
1021         for (niocount = i = 1; i < page_count; i++) {
1022                 if (!can_merge_pages(pga[i - 1], pga[i]))
1023                         niocount++;
1024         }
1025
1026         pill = &req->rq_pill;
1027         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1028                              sizeof(*ioobj));
1029         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1030                              niocount * sizeof(*niobuf));
1031
1032         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1033         if (rc) {
1034                 ptlrpc_request_free(req);
1035                 RETURN(rc);
1036         }
1037         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1038         ptlrpc_at_set_req_timeout(req);
1039         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1040          * retry logic */
1041         req->rq_no_retry_einprogress = 1;
1042
1043         desc = ptlrpc_prep_bulk_imp(req, page_count,
1044                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1045                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1046                         PTLRPC_BULK_PUT_SINK) |
1047                         PTLRPC_BULK_BUF_KIOV,
1048                 OST_BULK_PORTAL,
1049                 &ptlrpc_bulk_kiov_pin_ops);
1050
1051         if (desc == NULL)
1052                 GOTO(out, rc = -ENOMEM);
1053         /* NB request now owns desc and will free it when it gets freed */
1054
1055         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1056         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1057         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1058         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1059
1060         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1061
1062         obdo_to_ioobj(oa, ioobj);
1063         ioobj->ioo_bufcnt = niocount;
1064         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1065          * that might be send for this request.  The actual number is decided
1066          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1067          * "max - 1" for old client compatibility sending "0", and also so the
1068          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1069         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1070         LASSERT(page_count > 0);
1071         pg_prev = pga[0];
1072         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1073                 struct brw_page *pg = pga[i];
1074                 int poff = pg->off & ~PAGE_MASK;
1075
1076                 LASSERT(pg->count > 0);
1077                 /* make sure there is no gap in the middle of page array */
1078                 LASSERTF(page_count == 1 ||
1079                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1080                           ergo(i > 0 && i < page_count - 1,
1081                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1082                           ergo(i == page_count - 1, poff == 0)),
1083                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1084                          i, page_count, pg, pg->off, pg->count);
1085                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1086                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1087                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1088                          i, page_count,
1089                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1090                          pg_prev->pg, page_private(pg_prev->pg),
1091                          pg_prev->pg->index, pg_prev->off);
1092                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1093                         (pg->flag & OBD_BRW_SRVLOCK));
1094
1095                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1096                 requested_nob += pg->count;
1097
1098                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1099                         niobuf--;
1100                         niobuf->rnb_len += pg->count;
1101                 } else {
1102                         niobuf->rnb_offset = pg->off;
1103                         niobuf->rnb_len    = pg->count;
1104                         niobuf->rnb_flags  = pg->flag;
1105                 }
1106                 pg_prev = pg;
1107         }
1108
1109         LASSERTF((void *)(niobuf - niocount) ==
1110                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1111                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1112                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1113
1114         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1115         if (resend) {
1116                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1117                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1118                         body->oa.o_flags = 0;
1119                 }
1120                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1121         }
1122
1123         if (osc_should_shrink_grant(cli))
1124                 osc_shrink_grant_local(cli, &body->oa);
1125
1126         /* size[REQ_REC_OFF] still sizeof (*body) */
1127         if (opc == OST_WRITE) {
1128                 if (cli->cl_checksum &&
1129                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1130                         /* store cl_cksum_type in a local variable since
1131                          * it can be changed via lprocfs */
1132                         cksum_type_t cksum_type = cli->cl_cksum_type;
1133
1134                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1135                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1136                                 body->oa.o_flags = 0;
1137                         }
1138                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1139                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1141                                                              page_count, pga,
1142                                                              OST_WRITE,
1143                                                              cksum_type);
1144                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1145                                body->oa.o_cksum);
1146                         /* save this in 'oa', too, for later checking */
1147                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1148                         oa->o_flags |= cksum_type_pack(cksum_type);
1149                 } else {
1150                         /* clear out the checksum flag, in case this is a
1151                          * resend but cl_checksum is no longer set. b=11238 */
1152                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1153                 }
1154                 oa->o_cksum = body->oa.o_cksum;
1155                 /* 1 RC per niobuf */
1156                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1157                                      sizeof(__u32) * niocount);
1158         } else {
1159                 if (cli->cl_checksum &&
1160                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1161                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1162                                 body->oa.o_flags = 0;
1163                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1164                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165                 }
1166         }
1167         ptlrpc_request_set_replen(req);
1168
1169         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1170         aa = ptlrpc_req_async_args(req);
1171         aa->aa_oa = oa;
1172         aa->aa_requested_nob = requested_nob;
1173         aa->aa_nio_count = niocount;
1174         aa->aa_page_count = page_count;
1175         aa->aa_resends = 0;
1176         aa->aa_ppga = pga;
1177         aa->aa_cli = cli;
1178         INIT_LIST_HEAD(&aa->aa_oaps);
1179
1180         *reqp = req;
1181         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1183                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1184                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1185         RETURN(0);
1186
1187  out:
1188         ptlrpc_req_finished(req);
1189         RETURN(rc);
1190 }
1191
1192 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1193                                 __u32 client_cksum, __u32 server_cksum, int nob,
1194                                 size_t page_count, struct brw_page **pga,
1195                                 cksum_type_t client_cksum_type)
1196 {
1197         __u32 new_cksum;
1198         char *msg;
1199         cksum_type_t cksum_type;
1200
1201         if (server_cksum == client_cksum) {
1202                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1203                 return 0;
1204         }
1205
1206         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1207                                        oa->o_flags : 0);
1208         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1209                                       cksum_type);
1210
1211         if (cksum_type != client_cksum_type)
1212                 msg = "the server did not use the checksum type specified in "
1213                       "the original request - likely a protocol problem";
1214         else if (new_cksum == server_cksum)
1215                 msg = "changed on the client after we checksummed it - "
1216                       "likely false positive due to mmap IO (bug 11742)";
1217         else if (new_cksum == client_cksum)
1218                 msg = "changed in transit before arrival at OST";
1219         else
1220                 msg = "changed in transit AND doesn't match the original - "
1221                       "likely false positive due to mmap IO (bug 11742)";
1222
1223         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1224                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1225                            msg, libcfs_nid2str(peer->nid),
1226                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1227                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1228                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1229                            POSTID(&oa->o_oi), pga[0]->off,
1230                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1231         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1232                "client csum now %x\n", client_cksum, client_cksum_type,
1233                server_cksum, cksum_type, new_cksum);
1234         return 1;
1235 }
1236
1237 /* Note rc enters this function as number of bytes transferred */
1238 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1239 {
1240         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1241         const lnet_process_id_t *peer =
1242                         &req->rq_import->imp_connection->c_peer;
1243         struct client_obd *cli = aa->aa_cli;
1244         struct ost_body *body;
1245         u32 client_cksum = 0;
1246         ENTRY;
1247
1248         if (rc < 0 && rc != -EDQUOT) {
1249                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1250                 RETURN(rc);
1251         }
1252
1253         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1254         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1255         if (body == NULL) {
1256                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1257                 RETURN(-EPROTO);
1258         }
1259
1260         /* set/clear over quota flag for a uid/gid */
1261         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1262             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1263                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1264
1265                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1266                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1267                        body->oa.o_flags);
1268                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1269         }
1270
1271         osc_update_grant(cli, body);
1272
1273         if (rc < 0)
1274                 RETURN(rc);
1275
1276         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1277                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1278
1279         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1280                 if (rc > 0) {
1281                         CERROR("Unexpected +ve rc %d\n", rc);
1282                         RETURN(-EPROTO);
1283                 }
1284                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1285
1286                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1287                         RETURN(-EAGAIN);
1288
1289                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1290                     check_write_checksum(&body->oa, peer, client_cksum,
1291                                          body->oa.o_cksum, aa->aa_requested_nob,
1292                                          aa->aa_page_count, aa->aa_ppga,
1293                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1294                         RETURN(-EAGAIN);
1295
1296                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1297                                      aa->aa_page_count, aa->aa_ppga);
1298                 GOTO(out, rc);
1299         }
1300
1301         /* The rest of this function executes only for OST_READs */
1302
1303         /* if unwrap_bulk failed, return -EAGAIN to retry */
1304         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1305         if (rc < 0)
1306                 GOTO(out, rc = -EAGAIN);
1307
1308         if (rc > aa->aa_requested_nob) {
1309                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1310                        aa->aa_requested_nob);
1311                 RETURN(-EPROTO);
1312         }
1313
1314         if (rc != req->rq_bulk->bd_nob_transferred) {
1315                 CERROR ("Unexpected rc %d (%d transferred)\n",
1316                         rc, req->rq_bulk->bd_nob_transferred);
1317                 return (-EPROTO);
1318         }
1319
1320         if (rc < aa->aa_requested_nob)
1321                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1322
1323         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1324                 static int cksum_counter;
1325                 u32        server_cksum = body->oa.o_cksum;
1326                 char      *via = "";
1327                 char      *router = "";
1328                 cksum_type_t cksum_type;
1329
1330                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1331                                                body->oa.o_flags : 0);
1332                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1333                                                  aa->aa_ppga, OST_READ,
1334                                                  cksum_type);
1335
1336                 if (peer->nid != req->rq_bulk->bd_sender) {
1337                         via = " via ";
1338                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1339                 }
1340
1341                 if (server_cksum != client_cksum) {
1342                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1343                                            "%s%s%s inode "DFID" object "DOSTID
1344                                            " extent ["LPU64"-"LPU64"]\n",
1345                                            req->rq_import->imp_obd->obd_name,
1346                                            libcfs_nid2str(peer->nid),
1347                                            via, router,
1348                                            body->oa.o_valid & OBD_MD_FLFID ?
1349                                                 body->oa.o_parent_seq : (__u64)0,
1350                                            body->oa.o_valid & OBD_MD_FLFID ?
1351                                                 body->oa.o_parent_oid : 0,
1352                                            body->oa.o_valid & OBD_MD_FLFID ?
1353                                                 body->oa.o_parent_ver : 0,
1354                                            POSTID(&body->oa.o_oi),
1355                                            aa->aa_ppga[0]->off,
1356                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1357                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1358                                                                         1);
1359                         CERROR("client %x, server %x, cksum_type %x\n",
1360                                client_cksum, server_cksum, cksum_type);
1361                         cksum_counter = 0;
1362                         aa->aa_oa->o_cksum = client_cksum;
1363                         rc = -EAGAIN;
1364                 } else {
1365                         cksum_counter++;
1366                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1367                         rc = 0;
1368                 }
1369         } else if (unlikely(client_cksum)) {
1370                 static int cksum_missed;
1371
1372                 cksum_missed++;
1373                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1374                         CERROR("Checksum %u requested from %s but not sent\n",
1375                                cksum_missed, libcfs_nid2str(peer->nid));
1376         } else {
1377                 rc = 0;
1378         }
1379 out:
1380         if (rc >= 0)
1381                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1382                                      aa->aa_oa, &body->oa);
1383
1384         RETURN(rc);
1385 }
1386
1387 static int osc_brw_redo_request(struct ptlrpc_request *request,
1388                                 struct osc_brw_async_args *aa, int rc)
1389 {
1390         struct ptlrpc_request *new_req;
1391         struct osc_brw_async_args *new_aa;
1392         struct osc_async_page *oap;
1393         ENTRY;
1394
1395         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1396                   "redo for recoverable error %d", rc);
1397
1398         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1399                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1400                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1401                                   aa->aa_ppga, &new_req, 1);
1402         if (rc)
1403                 RETURN(rc);
1404
1405         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1406                 if (oap->oap_request != NULL) {
1407                         LASSERTF(request == oap->oap_request,
1408                                  "request %p != oap_request %p\n",
1409                                  request, oap->oap_request);
1410                         if (oap->oap_interrupted) {
1411                                 ptlrpc_req_finished(new_req);
1412                                 RETURN(-EINTR);
1413                         }
1414                 }
1415         }
1416         /* New request takes over pga and oaps from old request.
1417          * Note that copying a list_head doesn't work, need to move it... */
1418         aa->aa_resends++;
1419         new_req->rq_interpret_reply = request->rq_interpret_reply;
1420         new_req->rq_async_args = request->rq_async_args;
1421         new_req->rq_commit_cb = request->rq_commit_cb;
1422         /* cap resend delay to the current request timeout, this is similar to
1423          * what ptlrpc does (see after_reply()) */
1424         if (aa->aa_resends > new_req->rq_timeout)
1425                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1426         else
1427                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1428         new_req->rq_generation_set = 1;
1429         new_req->rq_import_generation = request->rq_import_generation;
1430
1431         new_aa = ptlrpc_req_async_args(new_req);
1432
1433         INIT_LIST_HEAD(&new_aa->aa_oaps);
1434         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1435         INIT_LIST_HEAD(&new_aa->aa_exts);
1436         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1437         new_aa->aa_resends = aa->aa_resends;
1438
1439         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1440                 if (oap->oap_request) {
1441                         ptlrpc_req_finished(oap->oap_request);
1442                         oap->oap_request = ptlrpc_request_addref(new_req);
1443                 }
1444         }
1445
1446         /* XXX: This code will run into problem if we're going to support
1447          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1448          * and wait for all of them to be finished. We should inherit request
1449          * set from old request. */
1450         ptlrpcd_add_req(new_req);
1451
1452         DEBUG_REQ(D_INFO, new_req, "new request");
1453         RETURN(0);
1454 }
1455
1456 /*
1457  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1458  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1459  * fine for our small page arrays and doesn't require allocation.  its an
1460  * insertion sort that swaps elements that are strides apart, shrinking the
1461  * stride down until its '1' and the array is sorted.
1462  */
1463 static void sort_brw_pages(struct brw_page **array, int num)
1464 {
1465         int stride, i, j;
1466         struct brw_page *tmp;
1467
1468         if (num == 1)
1469                 return;
1470         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1471                 ;
1472
1473         do {
1474                 stride /= 3;
1475                 for (i = stride ; i < num ; i++) {
1476                         tmp = array[i];
1477                         j = i;
1478                         while (j >= stride && array[j - stride]->off > tmp->off) {
1479                                 array[j] = array[j - stride];
1480                                 j -= stride;
1481                         }
1482                         array[j] = tmp;
1483                 }
1484         } while (stride > 1);
1485 }
1486
1487 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1488 {
1489         LASSERT(ppga != NULL);
1490         OBD_FREE(ppga, sizeof(*ppga) * count);
1491 }
1492
1493 static int brw_interpret(const struct lu_env *env,
1494                          struct ptlrpc_request *req, void *data, int rc)
1495 {
1496         struct osc_brw_async_args *aa = data;
1497         struct osc_extent *ext;
1498         struct osc_extent *tmp;
1499         struct client_obd *cli = aa->aa_cli;
1500         ENTRY;
1501
1502         rc = osc_brw_fini_request(req, rc);
1503         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1504         /* When server return -EINPROGRESS, client should always retry
1505          * regardless of the number of times the bulk was resent already. */
1506         if (osc_recoverable_error(rc)) {
1507                 if (req->rq_import_generation !=
1508                     req->rq_import->imp_generation) {
1509                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1510                                ""DOSTID", rc = %d.\n",
1511                                req->rq_import->imp_obd->obd_name,
1512                                POSTID(&aa->aa_oa->o_oi), rc);
1513                 } else if (rc == -EINPROGRESS ||
1514                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1515                         rc = osc_brw_redo_request(req, aa, rc);
1516                 } else {
1517                         CERROR("%s: too many resent retries for object: "
1518                                ""LPU64":"LPU64", rc = %d.\n",
1519                                req->rq_import->imp_obd->obd_name,
1520                                POSTID(&aa->aa_oa->o_oi), rc);
1521                 }
1522
1523                 if (rc == 0)
1524                         RETURN(0);
1525                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1526                         rc = -EIO;
1527         }
1528
1529         if (rc == 0) {
1530                 struct obdo *oa = aa->aa_oa;
1531                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1532                 unsigned long valid = 0;
1533                 struct cl_object *obj;
1534                 struct osc_async_page *last;
1535
1536                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1537                 obj = osc2cl(last->oap_obj);
1538
1539                 cl_object_attr_lock(obj);
1540                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1541                         attr->cat_blocks = oa->o_blocks;
1542                         valid |= CAT_BLOCKS;
1543                 }
1544                 if (oa->o_valid & OBD_MD_FLMTIME) {
1545                         attr->cat_mtime = oa->o_mtime;
1546                         valid |= CAT_MTIME;
1547                 }
1548                 if (oa->o_valid & OBD_MD_FLATIME) {
1549                         attr->cat_atime = oa->o_atime;
1550                         valid |= CAT_ATIME;
1551                 }
1552                 if (oa->o_valid & OBD_MD_FLCTIME) {
1553                         attr->cat_ctime = oa->o_ctime;
1554                         valid |= CAT_CTIME;
1555                 }
1556
1557                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1558                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1559                         loff_t last_off = last->oap_count + last->oap_obj_off +
1560                                 last->oap_page_off;
1561
1562                         /* Change file size if this is an out of quota or
1563                          * direct IO write and it extends the file size */
1564                         if (loi->loi_lvb.lvb_size < last_off) {
1565                                 attr->cat_size = last_off;
1566                                 valid |= CAT_SIZE;
1567                         }
1568                         /* Extend KMS if it's not a lockless write */
1569                         if (loi->loi_kms < last_off &&
1570                             oap2osc_page(last)->ops_srvlock == 0) {
1571                                 attr->cat_kms = last_off;
1572                                 valid |= CAT_KMS;
1573                         }
1574                 }
1575
1576                 if (valid != 0)
1577                         cl_object_attr_update(env, obj, attr, valid);
1578                 cl_object_attr_unlock(obj);
1579         }
1580         OBDO_FREE(aa->aa_oa);
1581
1582         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1583                 osc_inc_unstable_pages(req);
1584
1585         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1586                 list_del_init(&ext->oe_link);
1587                 osc_extent_finish(env, ext, 1, rc);
1588         }
1589         LASSERT(list_empty(&aa->aa_exts));
1590         LASSERT(list_empty(&aa->aa_oaps));
1591
1592         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1593         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1594
1595         spin_lock(&cli->cl_loi_list_lock);
1596         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1597          * is called so we know whether to go to sync BRWs or wait for more
1598          * RPCs to complete */
1599         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1600                 cli->cl_w_in_flight--;
1601         else
1602                 cli->cl_r_in_flight--;
1603         osc_wake_cache_waiters(cli);
1604         spin_unlock(&cli->cl_loi_list_lock);
1605
1606         osc_io_unplug(env, cli, NULL);
1607         RETURN(rc);
1608 }
1609
1610 static void brw_commit(struct ptlrpc_request *req)
1611 {
1612         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1613          * this called via the rq_commit_cb, I need to ensure
1614          * osc_dec_unstable_pages is still called. Otherwise unstable
1615          * pages may be leaked. */
1616         spin_lock(&req->rq_lock);
1617         if (likely(req->rq_unstable)) {
1618                 req->rq_unstable = 0;
1619                 spin_unlock(&req->rq_lock);
1620
1621                 osc_dec_unstable_pages(req);
1622         } else {
1623                 req->rq_committed = 1;
1624                 spin_unlock(&req->rq_lock);
1625         }
1626 }
1627
1628 /**
1629  * Build an RPC by the list of extent @ext_list. The caller must ensure
1630  * that the total pages in this list are NOT over max pages per RPC.
1631  * Extents in the list must be in OES_RPC state.
1632  */
1633 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1634                   struct list_head *ext_list, int cmd)
1635 {
1636         struct ptlrpc_request           *req = NULL;
1637         struct osc_extent               *ext;
1638         struct brw_page                 **pga = NULL;
1639         struct osc_brw_async_args       *aa = NULL;
1640         struct obdo                     *oa = NULL;
1641         struct osc_async_page           *oap;
1642         struct osc_object               *obj = NULL;
1643         struct cl_req_attr              *crattr = NULL;
1644         loff_t                          starting_offset = OBD_OBJECT_EOF;
1645         loff_t                          ending_offset = 0;
1646         int                             mpflag = 0;
1647         int                             mem_tight = 0;
1648         int                             page_count = 0;
1649         bool                            soft_sync = false;
1650         bool                            interrupted = false;
1651         int                             i;
1652         int                             rc;
1653         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1654         struct ost_body                 *body;
1655         ENTRY;
1656         LASSERT(!list_empty(ext_list));
1657
1658         /* add pages into rpc_list to build BRW rpc */
1659         list_for_each_entry(ext, ext_list, oe_link) {
1660                 LASSERT(ext->oe_state == OES_RPC);
1661                 mem_tight |= ext->oe_memalloc;
1662                 page_count += ext->oe_nr_pages;
1663                 if (obj == NULL)
1664                         obj = ext->oe_obj;
1665         }
1666
1667         soft_sync = osc_over_unstable_soft_limit(cli);
1668         if (mem_tight)
1669                 mpflag = cfs_memory_pressure_get_and_set();
1670
1671         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1672         if (pga == NULL)
1673                 GOTO(out, rc = -ENOMEM);
1674
1675         OBDO_ALLOC(oa);
1676         if (oa == NULL)
1677                 GOTO(out, rc = -ENOMEM);
1678
1679         i = 0;
1680         list_for_each_entry(ext, ext_list, oe_link) {
1681                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1682                         if (mem_tight)
1683                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1684                         if (soft_sync)
1685                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1686                         pga[i] = &oap->oap_brw_page;
1687                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1688                         i++;
1689
1690                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1691                         if (starting_offset == OBD_OBJECT_EOF ||
1692                             starting_offset > oap->oap_obj_off)
1693                                 starting_offset = oap->oap_obj_off;
1694                         else
1695                                 LASSERT(oap->oap_page_off == 0);
1696                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1697                                 ending_offset = oap->oap_obj_off +
1698                                                 oap->oap_count;
1699                         else
1700                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1701                                         PAGE_CACHE_SIZE);
1702                         if (oap->oap_interrupted)
1703                                 interrupted = true;
1704                 }
1705         }
1706
1707         /* first page in the list */
1708         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1709
1710         crattr = &osc_env_info(env)->oti_req_attr;
1711         memset(crattr, 0, sizeof(*crattr));
1712         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1713         crattr->cra_flags = ~0ULL;
1714         crattr->cra_page = oap2cl_page(oap);
1715         crattr->cra_oa = oa;
1716         cl_req_attr_set(env, osc2cl(obj), crattr);
1717
1718         sort_brw_pages(pga, page_count);
1719         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1720         if (rc != 0) {
1721                 CERROR("prep_req failed: %d\n", rc);
1722                 GOTO(out, rc);
1723         }
1724
1725         req->rq_commit_cb = brw_commit;
1726         req->rq_interpret_reply = brw_interpret;
1727         req->rq_memalloc = mem_tight != 0;
1728         oap->oap_request = ptlrpc_request_addref(req);
1729         if (interrupted && !req->rq_intr)
1730                 ptlrpc_mark_interrupted(req);
1731
1732         /* Need to update the timestamps after the request is built in case
1733          * we race with setattr (locally or in queue at OST).  If OST gets
1734          * later setattr before earlier BRW (as determined by the request xid),
1735          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1736          * way to do this in a single call.  bug 10150 */
1737         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1738         crattr->cra_oa = &body->oa;
1739         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1740         cl_req_attr_set(env, osc2cl(obj), crattr);
1741         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1742
1743         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1744         aa = ptlrpc_req_async_args(req);
1745         INIT_LIST_HEAD(&aa->aa_oaps);
1746         list_splice_init(&rpc_list, &aa->aa_oaps);
1747         INIT_LIST_HEAD(&aa->aa_exts);
1748         list_splice_init(ext_list, &aa->aa_exts);
1749
1750         spin_lock(&cli->cl_loi_list_lock);
1751         starting_offset >>= PAGE_CACHE_SHIFT;
1752         if (cmd == OBD_BRW_READ) {
1753                 cli->cl_r_in_flight++;
1754                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1755                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1756                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1757                                       starting_offset + 1);
1758         } else {
1759                 cli->cl_w_in_flight++;
1760                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1761                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1762                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1763                                       starting_offset + 1);
1764         }
1765         spin_unlock(&cli->cl_loi_list_lock);
1766
1767         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1768                   page_count, aa, cli->cl_r_in_flight,
1769                   cli->cl_w_in_flight);
1770
1771         ptlrpcd_add_req(req);
1772         rc = 0;
1773         EXIT;
1774
1775 out:
1776         if (mem_tight != 0)
1777                 cfs_memory_pressure_restore(mpflag);
1778
1779         if (rc != 0) {
1780                 LASSERT(req == NULL);
1781
1782                 if (oa)
1783                         OBDO_FREE(oa);
1784                 if (pga)
1785                         OBD_FREE(pga, sizeof(*pga) * page_count);
1786                 /* this should happen rarely and is pretty bad, it makes the
1787                  * pending list not follow the dirty order */
1788                 while (!list_empty(ext_list)) {
1789                         ext = list_entry(ext_list->next, struct osc_extent,
1790                                          oe_link);
1791                         list_del_init(&ext->oe_link);
1792                         osc_extent_finish(env, ext, 0, rc);
1793                 }
1794         }
1795         RETURN(rc);
1796 }
1797
1798 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1799                                         struct ldlm_enqueue_info *einfo)
1800 {
1801         void *data = einfo->ei_cbdata;
1802         int set = 0;
1803
1804         LASSERT(lock != NULL);
1805         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1806         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1807         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1808         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1809
1810         lock_res_and_lock(lock);
1811
1812         if (lock->l_ast_data == NULL)
1813                 lock->l_ast_data = data;
1814         if (lock->l_ast_data == data)
1815                 set = 1;
1816
1817         unlock_res_and_lock(lock);
1818
1819         return set;
1820 }
1821
1822 static int osc_set_data_with_check(struct lustre_handle *lockh,
1823                                    struct ldlm_enqueue_info *einfo)
1824 {
1825         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1826         int set = 0;
1827
1828         if (lock != NULL) {
1829                 set = osc_set_lock_data_with_check(lock, einfo);
1830                 LDLM_LOCK_PUT(lock);
1831         } else
1832                 CERROR("lockh %p, data %p - client evicted?\n",
1833                        lockh, einfo->ei_cbdata);
1834         return set;
1835 }
1836
1837 static int osc_enqueue_fini(struct ptlrpc_request *req,
1838                             osc_enqueue_upcall_f upcall, void *cookie,
1839                             struct lustre_handle *lockh, enum ldlm_mode mode,
1840                             __u64 *flags, int agl, int errcode)
1841 {
1842         bool intent = *flags & LDLM_FL_HAS_INTENT;
1843         int rc;
1844         ENTRY;
1845
1846         /* The request was created before ldlm_cli_enqueue call. */
1847         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1848                 struct ldlm_reply *rep;
1849
1850                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1851                 LASSERT(rep != NULL);
1852
1853                 rep->lock_policy_res1 =
1854                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1855                 if (rep->lock_policy_res1)
1856                         errcode = rep->lock_policy_res1;
1857                 if (!agl)
1858                         *flags |= LDLM_FL_LVB_READY;
1859         } else if (errcode == ELDLM_OK) {
1860                 *flags |= LDLM_FL_LVB_READY;
1861         }
1862
1863         /* Call the update callback. */
1864         rc = (*upcall)(cookie, lockh, errcode);
1865
1866         /* release the reference taken in ldlm_cli_enqueue() */
1867         if (errcode == ELDLM_LOCK_MATCHED)
1868                 errcode = ELDLM_OK;
1869         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1870                 ldlm_lock_decref(lockh, mode);
1871
1872         RETURN(rc);
1873 }
1874
1875 static int osc_enqueue_interpret(const struct lu_env *env,
1876                                  struct ptlrpc_request *req,
1877                                  struct osc_enqueue_args *aa, int rc)
1878 {
1879         struct ldlm_lock *lock;
1880         struct lustre_handle *lockh = &aa->oa_lockh;
1881         enum ldlm_mode mode = aa->oa_mode;
1882         struct ost_lvb *lvb = aa->oa_lvb;
1883         __u32 lvb_len = sizeof(*lvb);
1884         __u64 flags = 0;
1885
1886         ENTRY;
1887
1888         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1889          * be valid. */
1890         lock = ldlm_handle2lock(lockh);
1891         LASSERTF(lock != NULL,
1892                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1893                  lockh->cookie, req, aa);
1894
1895         /* Take an additional reference so that a blocking AST that
1896          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1897          * to arrive after an upcall has been executed by
1898          * osc_enqueue_fini(). */
1899         ldlm_lock_addref(lockh, mode);
1900
1901         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1902         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1903
1904         /* Let CP AST to grant the lock first. */
1905         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1906
1907         if (aa->oa_agl) {
1908                 LASSERT(aa->oa_lvb == NULL);
1909                 LASSERT(aa->oa_flags == NULL);
1910                 aa->oa_flags = &flags;
1911         }
1912
1913         /* Complete obtaining the lock procedure. */
1914         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1915                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1916                                    lockh, rc);
1917         /* Complete osc stuff. */
1918         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1919                               aa->oa_flags, aa->oa_agl, rc);
1920
1921         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1922
1923         ldlm_lock_decref(lockh, mode);
1924         LDLM_LOCK_PUT(lock);
1925         RETURN(rc);
1926 }
1927
1928 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1929
1930 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1931  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1932  * other synchronous requests, however keeping some locks and trying to obtain
1933  * others may take a considerable amount of time in a case of ost failure; and
1934  * when other sync requests do not get released lock from a client, the client
1935  * is evicted from the cluster -- such scenarious make the life difficult, so
1936  * release locks just after they are obtained. */
1937 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1938                      __u64 *flags, union ldlm_policy_data *policy,
1939                      struct ost_lvb *lvb, int kms_valid,
1940                      osc_enqueue_upcall_f upcall, void *cookie,
1941                      struct ldlm_enqueue_info *einfo,
1942                      struct ptlrpc_request_set *rqset, int async, int agl)
1943 {
1944         struct obd_device *obd = exp->exp_obd;
1945         struct lustre_handle lockh = { 0 };
1946         struct ptlrpc_request *req = NULL;
1947         int intent = *flags & LDLM_FL_HAS_INTENT;
1948         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1949         enum ldlm_mode mode;
1950         int rc;
1951         ENTRY;
1952
1953         /* Filesystem lock extents are extended to page boundaries so that
1954          * dealing with the page cache is a little smoother.  */
1955         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1956         policy->l_extent.end |= ~PAGE_MASK;
1957
1958         /*
1959          * kms is not valid when either object is completely fresh (so that no
1960          * locks are cached), or object was evicted. In the latter case cached
1961          * lock cannot be used, because it would prime inode state with
1962          * potentially stale LVB.
1963          */
1964         if (!kms_valid)
1965                 goto no_match;
1966
1967         /* Next, search for already existing extent locks that will cover us */
1968         /* If we're trying to read, we also search for an existing PW lock.  The
1969          * VFS and page cache already protect us locally, so lots of readers/
1970          * writers can share a single PW lock.
1971          *
1972          * There are problems with conversion deadlocks, so instead of
1973          * converting a read lock to a write lock, we'll just enqueue a new
1974          * one.
1975          *
1976          * At some point we should cancel the read lock instead of making them
1977          * send us a blocking callback, but there are problems with canceling
1978          * locks out from other users right now, too. */
1979         mode = einfo->ei_mode;
1980         if (einfo->ei_mode == LCK_PR)
1981                 mode |= LCK_PW;
1982         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
1983                                einfo->ei_type, policy, mode, &lockh, 0);
1984         if (mode) {
1985                 struct ldlm_lock *matched;
1986
1987                 if (*flags & LDLM_FL_TEST_LOCK)
1988                         RETURN(ELDLM_OK);
1989
1990                 matched = ldlm_handle2lock(&lockh);
1991                 if (agl) {
1992                         /* AGL enqueues DLM locks speculatively. Therefore if
1993                          * it already exists a DLM lock, it wll just inform the
1994                          * caller to cancel the AGL process for this stripe. */
1995                         ldlm_lock_decref(&lockh, mode);
1996                         LDLM_LOCK_PUT(matched);
1997                         RETURN(-ECANCELED);
1998                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
1999                         *flags |= LDLM_FL_LVB_READY;
2000
2001                         /* We already have a lock, and it's referenced. */
2002                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2003
2004                         ldlm_lock_decref(&lockh, mode);
2005                         LDLM_LOCK_PUT(matched);
2006                         RETURN(ELDLM_OK);
2007                 } else {
2008                         ldlm_lock_decref(&lockh, mode);
2009                         LDLM_LOCK_PUT(matched);
2010                 }
2011         }
2012
2013 no_match:
2014         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2015                 RETURN(-ENOLCK);
2016
2017         if (intent) {
2018                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2019                                            &RQF_LDLM_ENQUEUE_LVB);
2020                 if (req == NULL)
2021                         RETURN(-ENOMEM);
2022
2023                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2024                 if (rc) {
2025                         ptlrpc_request_free(req);
2026                         RETURN(rc);
2027                 }
2028
2029                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2030                                      sizeof *lvb);
2031                 ptlrpc_request_set_replen(req);
2032         }
2033
2034         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2035         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2036
2037         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2038                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2039         if (async) {
2040                 if (!rc) {
2041                         struct osc_enqueue_args *aa;
2042                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2043                         aa = ptlrpc_req_async_args(req);
2044                         aa->oa_exp    = exp;
2045                         aa->oa_mode   = einfo->ei_mode;
2046                         aa->oa_type   = einfo->ei_type;
2047                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2048                         aa->oa_upcall = upcall;
2049                         aa->oa_cookie = cookie;
2050                         aa->oa_agl    = !!agl;
2051                         if (!agl) {
2052                                 aa->oa_flags  = flags;
2053                                 aa->oa_lvb    = lvb;
2054                         } else {
2055                                 /* AGL is essentially to enqueue an DLM lock
2056                                  * in advance, so we don't care about the
2057                                  * result of AGL enqueue. */
2058                                 aa->oa_lvb    = NULL;
2059                                 aa->oa_flags  = NULL;
2060                         }
2061
2062                         req->rq_interpret_reply =
2063                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2064                         if (rqset == PTLRPCD_SET)
2065                                 ptlrpcd_add_req(req);
2066                         else
2067                                 ptlrpc_set_add_req(rqset, req);
2068                 } else if (intent) {
2069                         ptlrpc_req_finished(req);
2070                 }
2071                 RETURN(rc);
2072         }
2073
2074         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2075                               flags, agl, rc);
2076         if (intent)
2077                 ptlrpc_req_finished(req);
2078
2079         RETURN(rc);
2080 }
2081
2082 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2083                    enum ldlm_type type, union ldlm_policy_data *policy,
2084                    enum ldlm_mode mode, __u64 *flags, void *data,
2085                    struct lustre_handle *lockh, int unref)
2086 {
2087         struct obd_device *obd = exp->exp_obd;
2088         __u64 lflags = *flags;
2089         enum ldlm_mode rc;
2090         ENTRY;
2091
2092         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2093                 RETURN(-EIO);
2094
2095         /* Filesystem lock extents are extended to page boundaries so that
2096          * dealing with the page cache is a little smoother */
2097         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2098         policy->l_extent.end |= ~PAGE_MASK;
2099
2100         /* Next, search for already existing extent locks that will cover us */
2101         /* If we're trying to read, we also search for an existing PW lock.  The
2102          * VFS and page cache already protect us locally, so lots of readers/
2103          * writers can share a single PW lock. */
2104         rc = mode;
2105         if (mode == LCK_PR)
2106                 rc |= LCK_PW;
2107         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2108                              res_id, type, policy, rc, lockh, unref);
2109         if (rc) {
2110                 if (data != NULL) {
2111                         if (!osc_set_data_with_check(lockh, data)) {
2112                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2113                                         ldlm_lock_decref(lockh, rc);
2114                                 RETURN(0);
2115                         }
2116                 }
2117                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2118                         ldlm_lock_addref(lockh, LCK_PR);
2119                         ldlm_lock_decref(lockh, LCK_PW);
2120                 }
2121                 RETURN(rc);
2122         }
2123         RETURN(rc);
2124 }
2125
2126 static int osc_statfs_interpret(const struct lu_env *env,
2127                                 struct ptlrpc_request *req,
2128                                 struct osc_async_args *aa, int rc)
2129 {
2130         struct obd_statfs *msfs;
2131         ENTRY;
2132
2133         if (rc == -EBADR)
2134                 /* The request has in fact never been sent
2135                  * due to issues at a higher level (LOV).
2136                  * Exit immediately since the caller is
2137                  * aware of the problem and takes care
2138                  * of the clean up */
2139                  RETURN(rc);
2140
2141         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2142             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2143                 GOTO(out, rc = 0);
2144
2145         if (rc != 0)
2146                 GOTO(out, rc);
2147
2148         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2149         if (msfs == NULL) {
2150                 GOTO(out, rc = -EPROTO);
2151         }
2152
2153         *aa->aa_oi->oi_osfs = *msfs;
2154 out:
2155         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2156         RETURN(rc);
2157 }
2158
2159 static int osc_statfs_async(struct obd_export *exp,
2160                             struct obd_info *oinfo, __u64 max_age,
2161                             struct ptlrpc_request_set *rqset)
2162 {
2163         struct obd_device     *obd = class_exp2obd(exp);
2164         struct ptlrpc_request *req;
2165         struct osc_async_args *aa;
2166         int                    rc;
2167         ENTRY;
2168
2169         /* We could possibly pass max_age in the request (as an absolute
2170          * timestamp or a "seconds.usec ago") so the target can avoid doing
2171          * extra calls into the filesystem if that isn't necessary (e.g.
2172          * during mount that would help a bit).  Having relative timestamps
2173          * is not so great if request processing is slow, while absolute
2174          * timestamps are not ideal because they need time synchronization. */
2175         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2176         if (req == NULL)
2177                 RETURN(-ENOMEM);
2178
2179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2180         if (rc) {
2181                 ptlrpc_request_free(req);
2182                 RETURN(rc);
2183         }
2184         ptlrpc_request_set_replen(req);
2185         req->rq_request_portal = OST_CREATE_PORTAL;
2186         ptlrpc_at_set_req_timeout(req);
2187
2188         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2189                 /* procfs requests not want stat in wait for avoid deadlock */
2190                 req->rq_no_resend = 1;
2191                 req->rq_no_delay = 1;
2192         }
2193
2194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2195         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2196         aa = ptlrpc_req_async_args(req);
2197         aa->aa_oi = oinfo;
2198
2199         ptlrpc_set_add_req(rqset, req);
2200         RETURN(0);
2201 }
2202
2203 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2204                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2205 {
2206         struct obd_device     *obd = class_exp2obd(exp);
2207         struct obd_statfs     *msfs;
2208         struct ptlrpc_request *req;
2209         struct obd_import     *imp = NULL;
2210         int rc;
2211         ENTRY;
2212
2213         /*Since the request might also come from lprocfs, so we need
2214          *sync this with client_disconnect_export Bug15684*/
2215         down_read(&obd->u.cli.cl_sem);
2216         if (obd->u.cli.cl_import)
2217                 imp = class_import_get(obd->u.cli.cl_import);
2218         up_read(&obd->u.cli.cl_sem);
2219         if (!imp)
2220                 RETURN(-ENODEV);
2221
2222         /* We could possibly pass max_age in the request (as an absolute
2223          * timestamp or a "seconds.usec ago") so the target can avoid doing
2224          * extra calls into the filesystem if that isn't necessary (e.g.
2225          * during mount that would help a bit).  Having relative timestamps
2226          * is not so great if request processing is slow, while absolute
2227          * timestamps are not ideal because they need time synchronization. */
2228         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2229
2230         class_import_put(imp);
2231
2232         if (req == NULL)
2233                 RETURN(-ENOMEM);
2234
2235         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2236         if (rc) {
2237                 ptlrpc_request_free(req);
2238                 RETURN(rc);
2239         }
2240         ptlrpc_request_set_replen(req);
2241         req->rq_request_portal = OST_CREATE_PORTAL;
2242         ptlrpc_at_set_req_timeout(req);
2243
2244         if (flags & OBD_STATFS_NODELAY) {
2245                 /* procfs requests not want stat in wait for avoid deadlock */
2246                 req->rq_no_resend = 1;
2247                 req->rq_no_delay = 1;
2248         }
2249
2250         rc = ptlrpc_queue_wait(req);
2251         if (rc)
2252                 GOTO(out, rc);
2253
2254         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2255         if (msfs == NULL) {
2256                 GOTO(out, rc = -EPROTO);
2257         }
2258
2259         *osfs = *msfs;
2260
2261         EXIT;
2262  out:
2263         ptlrpc_req_finished(req);
2264         return rc;
2265 }
2266
2267 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2268                          void *karg, void *uarg)
2269 {
2270         struct obd_device *obd = exp->exp_obd;
2271         struct obd_ioctl_data *data = karg;
2272         int err = 0;
2273         ENTRY;
2274
2275         if (!try_module_get(THIS_MODULE)) {
2276                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2277                        module_name(THIS_MODULE));
2278                 return -EINVAL;
2279         }
2280         switch (cmd) {
2281         case OBD_IOC_CLIENT_RECOVER:
2282                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2283                                             data->ioc_inlbuf1, 0);
2284                 if (err > 0)
2285                         err = 0;
2286                 GOTO(out, err);
2287         case IOC_OSC_SET_ACTIVE:
2288                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2289                                                data->ioc_offset);
2290                 GOTO(out, err);
2291         case OBD_IOC_PING_TARGET:
2292                 err = ptlrpc_obd_ping(obd);
2293                 GOTO(out, err);
2294         default:
2295                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2296                        cmd, current_comm());
2297                 GOTO(out, err = -ENOTTY);
2298         }
2299 out:
2300         module_put(THIS_MODULE);
2301         return err;
2302 }
2303
2304 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2305                               u32 keylen, void *key,
2306                               u32 vallen, void *val,
2307                               struct ptlrpc_request_set *set)
2308 {
2309         struct ptlrpc_request *req;
2310         struct obd_device     *obd = exp->exp_obd;
2311         struct obd_import     *imp = class_exp2cliimp(exp);
2312         char                  *tmp;
2313         int                    rc;
2314         ENTRY;
2315
2316         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2317
2318         if (KEY_IS(KEY_CHECKSUM)) {
2319                 if (vallen != sizeof(int))
2320                         RETURN(-EINVAL);
2321                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2322                 RETURN(0);
2323         }
2324
2325         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2326                 sptlrpc_conf_client_adapt(obd);
2327                 RETURN(0);
2328         }
2329
2330         if (KEY_IS(KEY_FLUSH_CTX)) {
2331                 sptlrpc_import_flush_my_ctx(imp);
2332                 RETURN(0);
2333         }
2334
2335         if (KEY_IS(KEY_CACHE_SET)) {
2336                 struct client_obd *cli = &obd->u.cli;
2337
2338                 LASSERT(cli->cl_cache == NULL); /* only once */
2339                 cli->cl_cache = (struct cl_client_cache *)val;
2340                 cl_cache_incref(cli->cl_cache);
2341                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2342
2343                 /* add this osc into entity list */
2344                 LASSERT(list_empty(&cli->cl_lru_osc));
2345                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2346                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2347                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2348
2349                 RETURN(0);
2350         }
2351
2352         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2353                 struct client_obd *cli = &obd->u.cli;
2354                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2355                 long target = *(long *)val;
2356
2357                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2358                 *(long *)val -= nr;
2359                 RETURN(0);
2360         }
2361
2362         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2363                 RETURN(-EINVAL);
2364
2365         /* We pass all other commands directly to OST. Since nobody calls osc
2366            methods directly and everybody is supposed to go through LOV, we
2367            assume lov checked invalid values for us.
2368            The only recognised values so far are evict_by_nid and mds_conn.
2369            Even if something bad goes through, we'd get a -EINVAL from OST
2370            anyway. */
2371
2372         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2373                                                 &RQF_OST_SET_GRANT_INFO :
2374                                                 &RQF_OBD_SET_INFO);
2375         if (req == NULL)
2376                 RETURN(-ENOMEM);
2377
2378         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2379                              RCL_CLIENT, keylen);
2380         if (!KEY_IS(KEY_GRANT_SHRINK))
2381                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2382                                      RCL_CLIENT, vallen);
2383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2384         if (rc) {
2385                 ptlrpc_request_free(req);
2386                 RETURN(rc);
2387         }
2388
2389         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2390         memcpy(tmp, key, keylen);
2391         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2392                                                         &RMF_OST_BODY :
2393                                                         &RMF_SETINFO_VAL);
2394         memcpy(tmp, val, vallen);
2395
2396         if (KEY_IS(KEY_GRANT_SHRINK)) {
2397                 struct osc_grant_args *aa;
2398                 struct obdo *oa;
2399
2400                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2401                 aa = ptlrpc_req_async_args(req);
2402                 OBDO_ALLOC(oa);
2403                 if (!oa) {
2404                         ptlrpc_req_finished(req);
2405                         RETURN(-ENOMEM);
2406                 }
2407                 *oa = ((struct ost_body *)val)->oa;
2408                 aa->aa_oa = oa;
2409                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2410         }
2411
2412         ptlrpc_request_set_replen(req);
2413         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2414                 LASSERT(set != NULL);
2415                 ptlrpc_set_add_req(set, req);
2416                 ptlrpc_check_set(NULL, set);
2417         } else {
2418                 ptlrpcd_add_req(req);
2419         }
2420
2421         RETURN(0);
2422 }
2423
2424 static int osc_reconnect(const struct lu_env *env,
2425                          struct obd_export *exp, struct obd_device *obd,
2426                          struct obd_uuid *cluuid,
2427                          struct obd_connect_data *data,
2428                          void *localdata)
2429 {
2430         struct client_obd *cli = &obd->u.cli;
2431
2432         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2433                 long lost_grant;
2434
2435                 spin_lock(&cli->cl_loi_list_lock);
2436                 data->ocd_grant = (cli->cl_avail_grant +
2437                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2438                                   2 * cli_brw_size(obd);
2439                 lost_grant = cli->cl_lost_grant;
2440                 cli->cl_lost_grant = 0;
2441                 spin_unlock(&cli->cl_loi_list_lock);
2442
2443                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2444                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2445                        data->ocd_version, data->ocd_grant, lost_grant);
2446         }
2447
2448         RETURN(0);
2449 }
2450
2451 static int osc_disconnect(struct obd_export *exp)
2452 {
2453         struct obd_device *obd = class_exp2obd(exp);
2454         int rc;
2455
2456         rc = client_disconnect_export(exp);
2457         /**
2458          * Initially we put del_shrink_grant before disconnect_export, but it
2459          * causes the following problem if setup (connect) and cleanup
2460          * (disconnect) are tangled together.
2461          *      connect p1                     disconnect p2
2462          *   ptlrpc_connect_import
2463          *     ...............               class_manual_cleanup
2464          *                                     osc_disconnect
2465          *                                     del_shrink_grant
2466          *   ptlrpc_connect_interrupt
2467          *     init_grant_shrink
2468          *   add this client to shrink list
2469          *                                      cleanup_osc
2470          * Bang! pinger trigger the shrink.
2471          * So the osc should be disconnected from the shrink list, after we
2472          * are sure the import has been destroyed. BUG18662
2473          */
2474         if (obd->u.cli.cl_import == NULL)
2475                 osc_del_shrink_grant(&obd->u.cli);
2476         return rc;
2477 }
2478
2479 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2480         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2481 {
2482         struct lu_env *env = arg;
2483         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2484         struct ldlm_lock *lock;
2485         struct osc_object *osc = NULL;
2486         ENTRY;
2487
2488         lock_res(res);
2489         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2490                 if (lock->l_ast_data != NULL && osc == NULL) {
2491                         osc = lock->l_ast_data;
2492                         cl_object_get(osc2cl(osc));
2493                 }
2494
2495                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2496                  * by the 2nd round of ldlm_namespace_clean() call in
2497                  * osc_import_event(). */
2498                 ldlm_clear_cleaned(lock);
2499         }
2500         unlock_res(res);
2501
2502         if (osc != NULL) {
2503                 osc_object_invalidate(env, osc);
2504                 cl_object_put(env, osc2cl(osc));
2505         }
2506
2507         RETURN(0);
2508 }
2509
2510 static int osc_import_event(struct obd_device *obd,
2511                             struct obd_import *imp,
2512                             enum obd_import_event event)
2513 {
2514         struct client_obd *cli;
2515         int rc = 0;
2516
2517         ENTRY;
2518         LASSERT(imp->imp_obd == obd);
2519
2520         switch (event) {
2521         case IMP_EVENT_DISCON: {
2522                 cli = &obd->u.cli;
2523                 spin_lock(&cli->cl_loi_list_lock);
2524                 cli->cl_avail_grant = 0;
2525                 cli->cl_lost_grant = 0;
2526                 spin_unlock(&cli->cl_loi_list_lock);
2527                 break;
2528         }
2529         case IMP_EVENT_INACTIVE: {
2530                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2531                 break;
2532         }
2533         case IMP_EVENT_INVALIDATE: {
2534                 struct ldlm_namespace *ns = obd->obd_namespace;
2535                 struct lu_env         *env;
2536                 int                    refcheck;
2537
2538                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2539
2540                 env = cl_env_get(&refcheck);
2541                 if (!IS_ERR(env)) {
2542                         osc_io_unplug(env, &obd->u.cli, NULL);
2543
2544                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2545                                                  osc_ldlm_resource_invalidate,
2546                                                  env, 0);
2547                         cl_env_put(env, &refcheck);
2548
2549                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2550                 } else
2551                         rc = PTR_ERR(env);
2552                 break;
2553         }
2554         case IMP_EVENT_ACTIVE: {
2555                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2556                 break;
2557         }
2558         case IMP_EVENT_OCD: {
2559                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2560
2561                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2562                         osc_init_grant(&obd->u.cli, ocd);
2563
2564                 /* See bug 7198 */
2565                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2566                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2567
2568                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2569                 break;
2570         }
2571         case IMP_EVENT_DEACTIVATE: {
2572                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2573                 break;
2574         }
2575         case IMP_EVENT_ACTIVATE: {
2576                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2577                 break;
2578         }
2579         default:
2580                 CERROR("Unknown import event %d\n", event);
2581                 LBUG();
2582         }
2583         RETURN(rc);
2584 }
2585
2586 /**
2587  * Determine whether the lock can be canceled before replaying the lock
2588  * during recovery, see bug16774 for detailed information.
2589  *
2590  * \retval zero the lock can't be canceled
2591  * \retval other ok to cancel
2592  */
2593 static int osc_cancel_weight(struct ldlm_lock *lock)
2594 {
2595         /*
2596          * Cancel all unused and granted extent lock.
2597          */
2598         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2599             lock->l_granted_mode == lock->l_req_mode &&
2600             osc_ldlm_weigh_ast(lock) == 0)
2601                 RETURN(1);
2602
2603         RETURN(0);
2604 }
2605
2606 static int brw_queue_work(const struct lu_env *env, void *data)
2607 {
2608         struct client_obd *cli = data;
2609
2610         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2611
2612         osc_io_unplug(env, cli, NULL);
2613         RETURN(0);
2614 }
2615
2616 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2617 {
2618         struct client_obd *cli = &obd->u.cli;
2619         struct obd_type   *type;
2620         void              *handler;
2621         int                rc;
2622         int                adding;
2623         int                added;
2624         int                req_count;
2625         ENTRY;
2626
2627         rc = ptlrpcd_addref();
2628         if (rc)
2629                 RETURN(rc);
2630
2631         rc = client_obd_setup(obd, lcfg);
2632         if (rc)
2633                 GOTO(out_ptlrpcd, rc);
2634
2635         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2636         if (IS_ERR(handler))
2637                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2638         cli->cl_writeback_work = handler;
2639
2640         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2641         if (IS_ERR(handler))
2642                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2643         cli->cl_lru_work = handler;
2644
2645         rc = osc_quota_setup(obd);
2646         if (rc)
2647                 GOTO(out_ptlrpcd_work, rc);
2648
2649         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2650
2651 #ifdef CONFIG_PROC_FS
2652         obd->obd_vars = lprocfs_osc_obd_vars;
2653 #endif
2654         /* If this is true then both client (osc) and server (osp) are on the
2655          * same node. The osp layer if loaded first will register the osc proc
2656          * directory. In that case this obd_device will be attached its proc
2657          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2658         type = class_search_type(LUSTRE_OSP_NAME);
2659         if (type && type->typ_procsym) {
2660                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2661                                                        type->typ_procsym,
2662                                                        obd->obd_vars, obd);
2663                 if (IS_ERR(obd->obd_proc_entry)) {
2664                         rc = PTR_ERR(obd->obd_proc_entry);
2665                         CERROR("error %d setting up lprocfs for %s\n", rc,
2666                                obd->obd_name);
2667                         obd->obd_proc_entry = NULL;
2668                 }
2669         } else {
2670                 rc = lprocfs_obd_setup(obd);
2671         }
2672
2673         /* If the basic OSC proc tree construction succeeded then
2674          * lets do the rest. */
2675         if (rc == 0) {
2676                 lproc_osc_attach_seqstat(obd);
2677                 sptlrpc_lprocfs_cliobd_attach(obd);
2678                 ptlrpc_lprocfs_register_obd(obd);
2679         }
2680
2681         /*
2682          * We try to control the total number of requests with a upper limit
2683          * osc_reqpool_maxreqcount. There might be some race which will cause
2684          * over-limit allocation, but it is fine.
2685          */
2686         req_count = atomic_read(&osc_pool_req_count);
2687         if (req_count < osc_reqpool_maxreqcount) {
2688                 adding = cli->cl_max_rpcs_in_flight + 2;
2689                 if (req_count + adding > osc_reqpool_maxreqcount)
2690                         adding = osc_reqpool_maxreqcount - req_count;
2691
2692                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2693                 atomic_add(added, &osc_pool_req_count);
2694         }
2695
2696         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2697         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2698         RETURN(0);
2699
2700 out_ptlrpcd_work:
2701         if (cli->cl_writeback_work != NULL) {
2702                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2703                 cli->cl_writeback_work = NULL;
2704         }
2705         if (cli->cl_lru_work != NULL) {
2706                 ptlrpcd_destroy_work(cli->cl_lru_work);
2707                 cli->cl_lru_work = NULL;
2708         }
2709 out_client_setup:
2710         client_obd_cleanup(obd);
2711 out_ptlrpcd:
2712         ptlrpcd_decref();
2713         RETURN(rc);
2714 }
2715
2716 static int osc_precleanup(struct obd_device *obd)
2717 {
2718         struct client_obd *cli = &obd->u.cli;
2719         ENTRY;
2720
2721         /* LU-464
2722          * for echo client, export may be on zombie list, wait for
2723          * zombie thread to cull it, because cli.cl_import will be
2724          * cleared in client_disconnect_export():
2725          *   class_export_destroy() -> obd_cleanup() ->
2726          *   echo_device_free() -> echo_client_cleanup() ->
2727          *   obd_disconnect() -> osc_disconnect() ->
2728          *   client_disconnect_export()
2729          */
2730         obd_zombie_barrier();
2731         if (cli->cl_writeback_work) {
2732                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2733                 cli->cl_writeback_work = NULL;
2734         }
2735
2736         if (cli->cl_lru_work) {
2737                 ptlrpcd_destroy_work(cli->cl_lru_work);
2738                 cli->cl_lru_work = NULL;
2739         }
2740
2741         obd_cleanup_client_import(obd);
2742         ptlrpc_lprocfs_unregister_obd(obd);
2743         lprocfs_obd_cleanup(obd);
2744         RETURN(0);
2745 }
2746
2747 int osc_cleanup(struct obd_device *obd)
2748 {
2749         struct client_obd *cli = &obd->u.cli;
2750         int rc;
2751
2752         ENTRY;
2753
2754         /* lru cleanup */
2755         if (cli->cl_cache != NULL) {
2756                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2757                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2758                 list_del_init(&cli->cl_lru_osc);
2759                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2760                 cli->cl_lru_left = NULL;
2761                 cl_cache_decref(cli->cl_cache);
2762                 cli->cl_cache = NULL;
2763         }
2764
2765         /* free memory of osc quota cache */
2766         osc_quota_cleanup(obd);
2767
2768         rc = client_obd_cleanup(obd);
2769
2770         ptlrpcd_decref();
2771         RETURN(rc);
2772 }
2773
2774 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2775 {
2776         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2777         return rc > 0 ? 0: rc;
2778 }
2779
2780 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2781 {
2782         return osc_process_config_base(obd, buf);
2783 }
2784
2785 static struct obd_ops osc_obd_ops = {
2786         .o_owner                = THIS_MODULE,
2787         .o_setup                = osc_setup,
2788         .o_precleanup           = osc_precleanup,
2789         .o_cleanup              = osc_cleanup,
2790         .o_add_conn             = client_import_add_conn,
2791         .o_del_conn             = client_import_del_conn,
2792         .o_connect              = client_connect_import,
2793         .o_reconnect            = osc_reconnect,
2794         .o_disconnect           = osc_disconnect,
2795         .o_statfs               = osc_statfs,
2796         .o_statfs_async         = osc_statfs_async,
2797         .o_create               = osc_create,
2798         .o_destroy              = osc_destroy,
2799         .o_getattr              = osc_getattr,
2800         .o_setattr              = osc_setattr,
2801         .o_iocontrol            = osc_iocontrol,
2802         .o_set_info_async       = osc_set_info_async,
2803         .o_import_event         = osc_import_event,
2804         .o_process_config       = osc_process_config,
2805         .o_quotactl             = osc_quotactl,
2806 };
2807
2808 static int __init osc_init(void)
2809 {
2810         bool enable_proc = true;
2811         struct obd_type *type;
2812         unsigned int reqpool_size;
2813         unsigned int reqsize;
2814         int rc;
2815
2816         ENTRY;
2817
2818         /* print an address of _any_ initialized kernel symbol from this
2819          * module, to allow debugging with gdb that doesn't support data
2820          * symbols from modules.*/
2821         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2822
2823         rc = lu_kmem_init(osc_caches);
2824         if (rc)
2825                 RETURN(rc);
2826
2827         type = class_search_type(LUSTRE_OSP_NAME);
2828         if (type != NULL && type->typ_procsym != NULL)
2829                 enable_proc = false;
2830
2831         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2832                                  LUSTRE_OSC_NAME, &osc_device_type);
2833         if (rc)
2834                 GOTO(out_kmem, rc);
2835
2836         /* This is obviously too much memory, only prevent overflow here */
2837         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2838                 GOTO(out_type, rc = -EINVAL);
2839
2840         reqpool_size = osc_reqpool_mem_max << 20;
2841
2842         reqsize = 1;
2843         while (reqsize < OST_IO_MAXREQSIZE)
2844                 reqsize = reqsize << 1;
2845
2846         /*
2847          * We don't enlarge the request count in OSC pool according to
2848          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2849          * tried after normal allocation failed. So a small OSC pool won't
2850          * cause much performance degression in most of cases.
2851          */
2852         osc_reqpool_maxreqcount = reqpool_size / reqsize;
2853
2854         atomic_set(&osc_pool_req_count, 0);
2855         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2856                                           ptlrpc_add_rqs_to_pool);
2857
2858         if (osc_rq_pool != NULL)
2859                 GOTO(out, rc);
2860         rc = -ENOMEM;
2861 out_type:
2862         class_unregister_type(LUSTRE_OSC_NAME);
2863 out_kmem:
2864         lu_kmem_fini(osc_caches);
2865 out:
2866         RETURN(rc);
2867 }
2868
2869 static void /*__exit*/ osc_exit(void)
2870 {
2871         class_unregister_type(LUSTRE_OSC_NAME);
2872         lu_kmem_fini(osc_caches);
2873         ptlrpc_free_rq_pool(osc_rq_pool);
2874 }
2875
2876 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2877 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2878 MODULE_VERSION(LUSTRE_VERSION_STRING);
2879 MODULE_LICENSE("GPL");
2880
2881 module_init(osc_init);
2882 module_exit(osc_exit);