Whamcloud - gitweb
3655730b377cd853dd157631d222e120bf1ee2a5
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_enqueue_args {
95         struct obd_export       *oa_exp;
96         enum ldlm_type          oa_type;
97         enum ldlm_mode          oa_mode;
98         __u64                   *oa_flags;
99         osc_enqueue_upcall_f    oa_upcall;
100         void                    *oa_cookie;
101         struct ost_lvb          *oa_lvb;
102         struct lustre_handle    oa_lockh;
103         unsigned int            oa_agl:1;
104 };
105
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108                          void *data, int rc);
109
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
111 {
112         struct ost_body *body;
113
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         LASSERT(body);
116
117         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 }
119
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121                        struct obdo *oa)
122 {
123         struct ptlrpc_request   *req;
124         struct ost_body         *body;
125         int                      rc;
126
127         ENTRY;
128         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
129         if (req == NULL)
130                 RETURN(-ENOMEM);
131
132         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
133         if (rc) {
134                 ptlrpc_request_free(req);
135                 RETURN(rc);
136         }
137
138         osc_pack_req_body(req, oa);
139
140         ptlrpc_request_set_replen(req);
141
142         rc = ptlrpc_queue_wait(req);
143         if (rc)
144                 GOTO(out, rc);
145
146         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
147         if (body == NULL)
148                 GOTO(out, rc = -EPROTO);
149
150         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
152
153         oa->o_blksize = cli_brw_size(exp->exp_obd);
154         oa->o_valid |= OBD_MD_FLBLKSZ;
155
156         EXIT;
157 out:
158         ptlrpc_req_finished(req);
159
160         return rc;
161 }
162
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164                        struct obdo *oa)
165 {
166         struct ptlrpc_request   *req;
167         struct ost_body         *body;
168         int                      rc;
169
170         ENTRY;
171         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
172
173         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
174         if (req == NULL)
175                 RETURN(-ENOMEM);
176
177         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
178         if (rc) {
179                 ptlrpc_request_free(req);
180                 RETURN(rc);
181         }
182
183         osc_pack_req_body(req, oa);
184
185         ptlrpc_request_set_replen(req);
186
187         rc = ptlrpc_queue_wait(req);
188         if (rc)
189                 GOTO(out, rc);
190
191         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192         if (body == NULL)
193                 GOTO(out, rc = -EPROTO);
194
195         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
196
197         EXIT;
198 out:
199         ptlrpc_req_finished(req);
200
201         RETURN(rc);
202 }
203
204 static int osc_setattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_setattr_args *sa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body == NULL)
216                 GOTO(out, rc = -EPROTO);
217
218         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219                              &body->oa);
220 out:
221         rc = sa->sa_upcall(sa->sa_cookie, rc);
222         RETURN(rc);
223 }
224
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226                       obd_enqueue_update_f upcall, void *cookie,
227                       struct ptlrpc_request_set *rqset)
228 {
229         struct ptlrpc_request   *req;
230         struct osc_setattr_args *sa;
231         int                      rc;
232
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oa);
246
247         ptlrpc_request_set_replen(req);
248
249         /* do mds to ost setattr asynchronously */
250         if (!rqset) {
251                 /* Do not wait for response. */
252                 ptlrpcd_add_req(req);
253         } else {
254                 req->rq_interpret_reply =
255                         (ptlrpc_interpterer_t)osc_setattr_interpret;
256
257                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258                 sa = ptlrpc_req_async_args(req);
259                 sa->sa_oa = oa;
260                 sa->sa_upcall = upcall;
261                 sa->sa_cookie = cookie;
262
263                 if (rqset == PTLRPCD_SET)
264                         ptlrpcd_add_req(req);
265                 else
266                         ptlrpc_set_add_req(rqset, req);
267         }
268
269         RETURN(0);
270 }
271
272 static int osc_create(const struct lu_env *env, struct obd_export *exp,
273                       struct obdo *oa)
274 {
275         struct ptlrpc_request *req;
276         struct ost_body       *body;
277         int                    rc;
278         ENTRY;
279
280         LASSERT(oa != NULL);
281         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
282         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
283
284         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
285         if (req == NULL)
286                 GOTO(out, rc = -ENOMEM);
287
288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
289         if (rc) {
290                 ptlrpc_request_free(req);
291                 GOTO(out, rc);
292         }
293
294         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
295         LASSERT(body);
296
297         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
298
299         ptlrpc_request_set_replen(req);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out_req, rc);
304
305         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
306         if (body == NULL)
307                 GOTO(out_req, rc = -EPROTO);
308
309         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
310         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
311
312         oa->o_blksize = cli_brw_size(exp->exp_obd);
313         oa->o_valid |= OBD_MD_FLBLKSZ;
314
315         CDEBUG(D_HA, "transno: "LPD64"\n",
316                lustre_msg_get_transno(req->rq_repmsg));
317 out_req:
318         ptlrpc_req_finished(req);
319 out:
320         RETURN(rc);
321 }
322
323 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
324                    obd_enqueue_update_f upcall, void *cookie,
325                    struct ptlrpc_request_set *rqset)
326 {
327         struct ptlrpc_request   *req;
328         struct osc_setattr_args *sa;
329         struct ost_body         *body;
330         int                      rc;
331         ENTRY;
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
334         if (req == NULL)
335                 RETURN(-ENOMEM);
336
337         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 RETURN(rc);
341         }
342         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
343         ptlrpc_at_set_req_timeout(req);
344
345         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346         LASSERT(body);
347         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
348
349         ptlrpc_request_set_replen(req);
350
351         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
352         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
353         sa = ptlrpc_req_async_args(req);
354         sa->sa_oa = oa;
355         sa->sa_upcall = upcall;
356         sa->sa_cookie = cookie;
357         if (rqset == PTLRPCD_SET)
358                 ptlrpcd_add_req(req);
359         else
360                 ptlrpc_set_add_req(rqset, req);
361
362         RETURN(0);
363 }
364
365 static int osc_sync_interpret(const struct lu_env *env,
366                               struct ptlrpc_request *req,
367                               void *arg, int rc)
368 {
369         struct osc_fsync_args   *fa = arg;
370         struct ost_body         *body;
371         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
372         unsigned long           valid = 0;
373         struct cl_object        *obj;
374         ENTRY;
375
376         if (rc != 0)
377                 GOTO(out, rc);
378
379         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
380         if (body == NULL) {
381                 CERROR("can't unpack ost_body\n");
382                 GOTO(out, rc = -EPROTO);
383         }
384
385         *fa->fa_oa = body->oa;
386         obj = osc2cl(fa->fa_obj);
387
388         /* Update osc object's blocks attribute */
389         cl_object_attr_lock(obj);
390         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
391                 attr->cat_blocks = body->oa.o_blocks;
392                 valid |= CAT_BLOCKS;
393         }
394
395         if (valid != 0)
396                 cl_object_attr_update(env, obj, attr, valid);
397         cl_object_attr_unlock(obj);
398
399 out:
400         rc = fa->fa_upcall(fa->fa_cookie, rc);
401         RETURN(rc);
402 }
403
404 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
405                   obd_enqueue_update_f upcall, void *cookie,
406                   struct ptlrpc_request_set *rqset)
407 {
408         struct obd_export     *exp = osc_export(obj);
409         struct ptlrpc_request *req;
410         struct ost_body       *body;
411         struct osc_fsync_args *fa;
412         int                    rc;
413         ENTRY;
414
415         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
416         if (req == NULL)
417                 RETURN(-ENOMEM);
418
419         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
420         if (rc) {
421                 ptlrpc_request_free(req);
422                 RETURN(rc);
423         }
424
425         /* overload the size and blocks fields in the oa with start/end */
426         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
427         LASSERT(body);
428         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
429
430         ptlrpc_request_set_replen(req);
431         req->rq_interpret_reply = osc_sync_interpret;
432
433         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
434         fa = ptlrpc_req_async_args(req);
435         fa->fa_obj = obj;
436         fa->fa_oa = oa;
437         fa->fa_upcall = upcall;
438         fa->fa_cookie = cookie;
439
440         if (rqset == PTLRPCD_SET)
441                 ptlrpcd_add_req(req);
442         else
443                 ptlrpc_set_add_req(rqset, req);
444
445         RETURN (0);
446 }
447
448 /* Find and cancel locally locks matched by @mode in the resource found by
449  * @objid. Found locks are added into @cancel list. Returns the amount of
450  * locks added to @cancels list. */
451 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
452                                    struct list_head *cancels,
453                                    enum ldlm_mode mode, __u64 lock_flags)
454 {
455         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
456         struct ldlm_res_id res_id;
457         struct ldlm_resource *res;
458         int count;
459         ENTRY;
460
461         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
462          * export) but disabled through procfs (flag in NS).
463          *
464          * This distinguishes from a case when ELC is not supported originally,
465          * when we still want to cancel locks in advance and just cancel them
466          * locally, without sending any RPC. */
467         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
468                 RETURN(0);
469
470         ostid_build_res_name(&oa->o_oi, &res_id);
471         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
472         if (IS_ERR(res))
473                 RETURN(0);
474
475         LDLM_RESOURCE_ADDREF(res);
476         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
477                                            lock_flags, 0, NULL);
478         LDLM_RESOURCE_DELREF(res);
479         ldlm_resource_putref(res);
480         RETURN(count);
481 }
482
483 static int osc_destroy_interpret(const struct lu_env *env,
484                                  struct ptlrpc_request *req, void *data,
485                                  int rc)
486 {
487         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
488
489         atomic_dec(&cli->cl_destroy_in_flight);
490         wake_up(&cli->cl_destroy_waitq);
491         return 0;
492 }
493
494 static int osc_can_send_destroy(struct client_obd *cli)
495 {
496         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
497             cli->cl_max_rpcs_in_flight) {
498                 /* The destroy request can be sent */
499                 return 1;
500         }
501         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
502             cli->cl_max_rpcs_in_flight) {
503                 /*
504                  * The counter has been modified between the two atomic
505                  * operations.
506                  */
507                 wake_up(&cli->cl_destroy_waitq);
508         }
509         return 0;
510 }
511
512 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
513                        struct obdo *oa)
514 {
515         struct client_obd     *cli = &exp->exp_obd->u.cli;
516         struct ptlrpc_request *req;
517         struct ost_body       *body;
518         struct list_head       cancels = LIST_HEAD_INIT(cancels);
519         int rc, count;
520         ENTRY;
521
522         if (!oa) {
523                 CDEBUG(D_INFO, "oa NULL\n");
524                 RETURN(-EINVAL);
525         }
526
527         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
528                                         LDLM_FL_DISCARD_DATA);
529
530         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
531         if (req == NULL) {
532                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
533                 RETURN(-ENOMEM);
534         }
535
536         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
537                                0, &cancels, count);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542
543         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
544         ptlrpc_at_set_req_timeout(req);
545
546         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
547         LASSERT(body);
548         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
549
550         ptlrpc_request_set_replen(req);
551
552         req->rq_interpret_reply = osc_destroy_interpret;
553         if (!osc_can_send_destroy(cli)) {
554                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
555
556                 /*
557                  * Wait until the number of on-going destroy RPCs drops
558                  * under max_rpc_in_flight
559                  */
560                 l_wait_event_exclusive(cli->cl_destroy_waitq,
561                                        osc_can_send_destroy(cli), &lwi);
562         }
563
564         /* Do not wait for response */
565         ptlrpcd_add_req(req);
566         RETURN(0);
567 }
568
569 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
570                                 long writing_bytes)
571 {
572         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
573
574         LASSERT(!(oa->o_valid & bits));
575
576         oa->o_valid |= bits;
577         spin_lock(&cli->cl_loi_list_lock);
578         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
579                 oa->o_dirty = cli->cl_dirty_grant;
580         else
581                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
582         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
583                      cli->cl_dirty_max_pages)) {
584                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
585                        cli->cl_dirty_pages, cli->cl_dirty_transit,
586                        cli->cl_dirty_max_pages);
587                 oa->o_undirty = 0;
588         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
589                             atomic_long_read(&obd_dirty_transit_pages) >
590                             (long)(obd_max_dirty_pages + 1))) {
591                 /* The atomic_read() allowing the atomic_inc() are
592                  * not covered by a lock thus they may safely race and trip
593                  * this CERROR() unless we add in a small fudge factor (+1). */
594                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
595                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
596                        atomic_long_read(&obd_dirty_transit_pages),
597                        obd_max_dirty_pages);
598                 oa->o_undirty = 0;
599         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
600                             0x7fffffff)) {
601                 CERROR("dirty %lu - dirty_max %lu too big???\n",
602                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
603                 oa->o_undirty = 0;
604         } else {
605                 unsigned long nrpages;
606
607                 nrpages = cli->cl_max_pages_per_rpc;
608                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
609                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
610                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
611                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
612                                  GRANT_PARAM)) {
613                         int nrextents;
614
615                         /* take extent tax into account when asking for more
616                          * grant space */
617                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
618                                      cli->cl_max_extent_pages;
619                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
620                 }
621         }
622         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
623         oa->o_dropped = cli->cl_lost_grant;
624         cli->cl_lost_grant = 0;
625         spin_unlock(&cli->cl_loi_list_lock);
626         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
627                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
628 }
629
630 void osc_update_next_shrink(struct client_obd *cli)
631 {
632         cli->cl_next_shrink_grant =
633                 cfs_time_shift(cli->cl_grant_shrink_interval);
634         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
635                cli->cl_next_shrink_grant);
636 }
637
638 static void __osc_update_grant(struct client_obd *cli, u64 grant)
639 {
640         spin_lock(&cli->cl_loi_list_lock);
641         cli->cl_avail_grant += grant;
642         spin_unlock(&cli->cl_loi_list_lock);
643 }
644
645 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
646 {
647         if (body->oa.o_valid & OBD_MD_FLGRANT) {
648                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
649                 __osc_update_grant(cli, body->oa.o_grant);
650         }
651 }
652
653 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
654                               u32 keylen, void *key,
655                               u32 vallen, void *val,
656                               struct ptlrpc_request_set *set);
657
658 static int osc_shrink_grant_interpret(const struct lu_env *env,
659                                       struct ptlrpc_request *req,
660                                       void *aa, int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
664         struct ost_body *body;
665
666         if (rc != 0) {
667                 __osc_update_grant(cli, oa->o_grant);
668                 GOTO(out, rc);
669         }
670
671         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
672         LASSERT(body);
673         osc_update_grant(cli, body);
674 out:
675         OBDO_FREE(oa);
676         return rc;
677 }
678
679 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
680 {
681         spin_lock(&cli->cl_loi_list_lock);
682         oa->o_grant = cli->cl_avail_grant / 4;
683         cli->cl_avail_grant -= oa->o_grant;
684         spin_unlock(&cli->cl_loi_list_lock);
685         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
686                 oa->o_valid |= OBD_MD_FLFLAGS;
687                 oa->o_flags = 0;
688         }
689         oa->o_flags |= OBD_FL_SHRINK_GRANT;
690         osc_update_next_shrink(cli);
691 }
692
693 /* Shrink the current grant, either from some large amount to enough for a
694  * full set of in-flight RPCs, or if we have already shrunk to that limit
695  * then to enough for a single RPC.  This avoids keeping more grant than
696  * needed, and avoids shrinking the grant piecemeal. */
697 static int osc_shrink_grant(struct client_obd *cli)
698 {
699         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
700                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
701
702         spin_lock(&cli->cl_loi_list_lock);
703         if (cli->cl_avail_grant <= target_bytes)
704                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
705         spin_unlock(&cli->cl_loi_list_lock);
706
707         return osc_shrink_grant_to_target(cli, target_bytes);
708 }
709
710 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
711 {
712         int                     rc = 0;
713         struct ost_body        *body;
714         ENTRY;
715
716         spin_lock(&cli->cl_loi_list_lock);
717         /* Don't shrink if we are already above or below the desired limit
718          * We don't want to shrink below a single RPC, as that will negatively
719          * impact block allocation and long-term performance. */
720         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
721                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
722
723         if (target_bytes >= cli->cl_avail_grant) {
724                 spin_unlock(&cli->cl_loi_list_lock);
725                 RETURN(0);
726         }
727         spin_unlock(&cli->cl_loi_list_lock);
728
729         OBD_ALLOC_PTR(body);
730         if (!body)
731                 RETURN(-ENOMEM);
732
733         osc_announce_cached(cli, &body->oa, 0);
734
735         spin_lock(&cli->cl_loi_list_lock);
736         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
737         cli->cl_avail_grant = target_bytes;
738         spin_unlock(&cli->cl_loi_list_lock);
739         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
740                 body->oa.o_valid |= OBD_MD_FLFLAGS;
741                 body->oa.o_flags = 0;
742         }
743         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
744         osc_update_next_shrink(cli);
745
746         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
747                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
748                                 sizeof(*body), body, NULL);
749         if (rc != 0)
750                 __osc_update_grant(cli, body->oa.o_grant);
751         OBD_FREE_PTR(body);
752         RETURN(rc);
753 }
754
755 static int osc_should_shrink_grant(struct client_obd *client)
756 {
757         cfs_time_t time = cfs_time_current();
758         cfs_time_t next_shrink = client->cl_next_shrink_grant;
759
760         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
761              OBD_CONNECT_GRANT_SHRINK) == 0)
762                 return 0;
763
764         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
765                 /* Get the current RPC size directly, instead of going via:
766                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
767                  * Keep comment here so that it can be found by searching. */
768                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
769
770                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
771                     client->cl_avail_grant > brw_size)
772                         return 1;
773                 else
774                         osc_update_next_shrink(client);
775         }
776         return 0;
777 }
778
779 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
780 {
781         struct client_obd *client;
782
783         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
784                 if (osc_should_shrink_grant(client))
785                         osc_shrink_grant(client);
786         }
787         return 0;
788 }
789
790 static int osc_add_shrink_grant(struct client_obd *client)
791 {
792         int rc;
793
794         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
795                                        TIMEOUT_GRANT,
796                                        osc_grant_shrink_grant_cb, NULL,
797                                        &client->cl_grant_shrink_list);
798         if (rc) {
799                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
800                 return rc;
801         }
802         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
803         osc_update_next_shrink(client);
804         return 0;
805 }
806
807 static int osc_del_shrink_grant(struct client_obd *client)
808 {
809         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
810                                          TIMEOUT_GRANT);
811 }
812
813 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
814 {
815         /*
816          * ocd_grant is the total grant amount we're expect to hold: if we've
817          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
818          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
819          * dirty.
820          *
821          * race is tolerable here: if we're evicted, but imp_state already
822          * left EVICTED state, then cl_dirty_pages must be 0 already.
823          */
824         spin_lock(&cli->cl_loi_list_lock);
825         cli->cl_avail_grant = ocd->ocd_grant;
826         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
827                 cli->cl_avail_grant -= cli->cl_reserved_grant;
828                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
829                         cli->cl_avail_grant -= cli->cl_dirty_grant;
830                 else
831                         cli->cl_avail_grant -=
832                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
833         }
834
835         if (cli->cl_avail_grant < 0) {
836                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
837                       cli_name(cli), cli->cl_avail_grant,
838                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
839                 /* workaround for servers which do not have the patch from
840                  * LU-2679 */
841                 cli->cl_avail_grant = ocd->ocd_grant;
842         }
843
844         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
845                 u64 size;
846
847                 /* overhead for each extent insertion */
848                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
849                 /* determine the appropriate chunk size used by osc_extent. */
850                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
851                                           ocd->ocd_grant_blkbits);
852                 /* determine maximum extent size, in #pages */
853                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
854                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
855                 if (cli->cl_max_extent_pages == 0)
856                         cli->cl_max_extent_pages = 1;
857         } else {
858                 cli->cl_grant_extent_tax = 0;
859                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
860                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
865                 "chunk bits: %d cl_max_extent_pages: %d\n",
866                 cli_name(cli),
867                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
868                 cli->cl_max_extent_pages);
869
870         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
871             list_empty(&cli->cl_grant_shrink_list))
872                 osc_add_shrink_grant(cli);
873 }
874
875 /* We assume that the reason this OSC got a short read is because it read
876  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
877  * via the LOV, and it _knows_ it's reading inside the file, it's just that
878  * this stripe never got written at or beyond this stripe offset yet. */
879 static void handle_short_read(int nob_read, size_t page_count,
880                               struct brw_page **pga)
881 {
882         char *ptr;
883         int i = 0;
884
885         /* skip bytes read OK */
886         while (nob_read > 0) {
887                 LASSERT (page_count > 0);
888
889                 if (pga[i]->count > nob_read) {
890                         /* EOF inside this page */
891                         ptr = kmap(pga[i]->pg) +
892                                 (pga[i]->off & ~PAGE_MASK);
893                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
894                         kunmap(pga[i]->pg);
895                         page_count--;
896                         i++;
897                         break;
898                 }
899
900                 nob_read -= pga[i]->count;
901                 page_count--;
902                 i++;
903         }
904
905         /* zero remaining pages */
906         while (page_count-- > 0) {
907                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
908                 memset(ptr, 0, pga[i]->count);
909                 kunmap(pga[i]->pg);
910                 i++;
911         }
912 }
913
914 static int check_write_rcs(struct ptlrpc_request *req,
915                            int requested_nob, int niocount,
916                            size_t page_count, struct brw_page **pga)
917 {
918         int     i;
919         __u32   *remote_rcs;
920
921         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
922                                                   sizeof(*remote_rcs) *
923                                                   niocount);
924         if (remote_rcs == NULL) {
925                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
926                 return(-EPROTO);
927         }
928
929         /* return error if any niobuf was in error */
930         for (i = 0; i < niocount; i++) {
931                 if ((int)remote_rcs[i] < 0)
932                         return(remote_rcs[i]);
933
934                 if (remote_rcs[i] != 0) {
935                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
936                                 i, remote_rcs[i], req);
937                         return(-EPROTO);
938                 }
939         }
940
941         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
942                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
943                        req->rq_bulk->bd_nob_transferred, requested_nob);
944                 return(-EPROTO);
945         }
946
947         return (0);
948 }
949
950 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
951 {
952         if (p1->flag != p2->flag) {
953                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
954                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
955                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
956
957                 /* warn if we try to combine flags that we don't know to be
958                  * safe to combine */
959                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
960                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
961                               "report this at https://jira.hpdd.intel.com/\n",
962                               p1->flag, p2->flag);
963                 }
964                 return 0;
965         }
966
967         return (p1->off + p1->count == p2->off);
968 }
969
970 static u32 osc_checksum_bulk(int nob, size_t pg_count,
971                              struct brw_page **pga, int opc,
972                              cksum_type_t cksum_type)
973 {
974         u32                             cksum;
975         int                             i = 0;
976         struct cfs_crypto_hash_desc     *hdesc;
977         unsigned int                    bufsize;
978         int                             err;
979         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
980
981         LASSERT(pg_count > 0);
982
983         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
984         if (IS_ERR(hdesc)) {
985                 CERROR("Unable to initialize checksum hash %s\n",
986                        cfs_crypto_hash_name(cfs_alg));
987                 return PTR_ERR(hdesc);
988         }
989
990         while (nob > 0 && pg_count > 0) {
991                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
992
993                 /* corrupt the data before we compute the checksum, to
994                  * simulate an OST->client data error */
995                 if (i == 0 && opc == OST_READ &&
996                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
997                         unsigned char *ptr = kmap(pga[i]->pg);
998                         int off = pga[i]->off & ~PAGE_MASK;
999
1000                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1001                         kunmap(pga[i]->pg);
1002                 }
1003                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1004                                             pga[i]->off & ~PAGE_MASK,
1005                                             count);
1006                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1007                                (int)(pga[i]->off & ~PAGE_MASK));
1008
1009                 nob -= pga[i]->count;
1010                 pg_count--;
1011                 i++;
1012         }
1013
1014         bufsize = sizeof(cksum);
1015         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1016
1017         /* For sending we only compute the wrong checksum instead
1018          * of corrupting the data so it is still correct on a redo */
1019         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1020                 cksum++;
1021
1022         return cksum;
1023 }
1024
1025 static int
1026 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1027                      u32 page_count, struct brw_page **pga,
1028                      struct ptlrpc_request **reqp, int resend)
1029 {
1030         struct ptlrpc_request   *req;
1031         struct ptlrpc_bulk_desc *desc;
1032         struct ost_body         *body;
1033         struct obd_ioobj        *ioobj;
1034         struct niobuf_remote    *niobuf;
1035         int niocount, i, requested_nob, opc, rc;
1036         struct osc_brw_async_args *aa;
1037         struct req_capsule      *pill;
1038         struct brw_page *pg_prev;
1039
1040         ENTRY;
1041         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1042                 RETURN(-ENOMEM); /* Recoverable */
1043         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1044                 RETURN(-EINVAL); /* Fatal */
1045
1046         if ((cmd & OBD_BRW_WRITE) != 0) {
1047                 opc = OST_WRITE;
1048                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1049                                                 osc_rq_pool,
1050                                                 &RQF_OST_BRW_WRITE);
1051         } else {
1052                 opc = OST_READ;
1053                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1054         }
1055         if (req == NULL)
1056                 RETURN(-ENOMEM);
1057
1058         for (niocount = i = 1; i < page_count; i++) {
1059                 if (!can_merge_pages(pga[i - 1], pga[i]))
1060                         niocount++;
1061         }
1062
1063         pill = &req->rq_pill;
1064         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1065                              sizeof(*ioobj));
1066         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1067                              niocount * sizeof(*niobuf));
1068
1069         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1070         if (rc) {
1071                 ptlrpc_request_free(req);
1072                 RETURN(rc);
1073         }
1074         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1075         ptlrpc_at_set_req_timeout(req);
1076         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1077          * retry logic */
1078         req->rq_no_retry_einprogress = 1;
1079
1080         desc = ptlrpc_prep_bulk_imp(req, page_count,
1081                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1082                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1083                         PTLRPC_BULK_PUT_SINK) |
1084                         PTLRPC_BULK_BUF_KIOV,
1085                 OST_BULK_PORTAL,
1086                 &ptlrpc_bulk_kiov_pin_ops);
1087
1088         if (desc == NULL)
1089                 GOTO(out, rc = -ENOMEM);
1090         /* NB request now owns desc and will free it when it gets freed */
1091
1092         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1093         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1094         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1095         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1096
1097         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1098
1099         obdo_to_ioobj(oa, ioobj);
1100         ioobj->ioo_bufcnt = niocount;
1101         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1102          * that might be send for this request.  The actual number is decided
1103          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1104          * "max - 1" for old client compatibility sending "0", and also so the
1105          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1106         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1107         LASSERT(page_count > 0);
1108         pg_prev = pga[0];
1109         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1110                 struct brw_page *pg = pga[i];
1111                 int poff = pg->off & ~PAGE_MASK;
1112
1113                 LASSERT(pg->count > 0);
1114                 /* make sure there is no gap in the middle of page array */
1115                 LASSERTF(page_count == 1 ||
1116                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1117                           ergo(i > 0 && i < page_count - 1,
1118                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1119                           ergo(i == page_count - 1, poff == 0)),
1120                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1121                          i, page_count, pg, pg->off, pg->count);
1122                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1123                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1124                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1125                          i, page_count,
1126                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1127                          pg_prev->pg, page_private(pg_prev->pg),
1128                          pg_prev->pg->index, pg_prev->off);
1129                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1130                         (pg->flag & OBD_BRW_SRVLOCK));
1131
1132                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1133                 requested_nob += pg->count;
1134
1135                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1136                         niobuf--;
1137                         niobuf->rnb_len += pg->count;
1138                 } else {
1139                         niobuf->rnb_offset = pg->off;
1140                         niobuf->rnb_len    = pg->count;
1141                         niobuf->rnb_flags  = pg->flag;
1142                 }
1143                 pg_prev = pg;
1144         }
1145
1146         LASSERTF((void *)(niobuf - niocount) ==
1147                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1148                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1149                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1150
1151         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1152         if (resend) {
1153                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1154                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1155                         body->oa.o_flags = 0;
1156                 }
1157                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1158         }
1159
1160         if (osc_should_shrink_grant(cli))
1161                 osc_shrink_grant_local(cli, &body->oa);
1162
1163         /* size[REQ_REC_OFF] still sizeof (*body) */
1164         if (opc == OST_WRITE) {
1165                 if (cli->cl_checksum &&
1166                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1167                         /* store cl_cksum_type in a local variable since
1168                          * it can be changed via lprocfs */
1169                         cksum_type_t cksum_type = cli->cl_cksum_type;
1170
1171                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1172                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1173                                 body->oa.o_flags = 0;
1174                         }
1175                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1176                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1177                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1178                                                              page_count, pga,
1179                                                              OST_WRITE,
1180                                                              cksum_type);
1181                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1182                                body->oa.o_cksum);
1183                         /* save this in 'oa', too, for later checking */
1184                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1185                         oa->o_flags |= cksum_type_pack(cksum_type);
1186                 } else {
1187                         /* clear out the checksum flag, in case this is a
1188                          * resend but cl_checksum is no longer set. b=11238 */
1189                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1190                 }
1191                 oa->o_cksum = body->oa.o_cksum;
1192                 /* 1 RC per niobuf */
1193                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1194                                      sizeof(__u32) * niocount);
1195         } else {
1196                 if (cli->cl_checksum &&
1197                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1198                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1199                                 body->oa.o_flags = 0;
1200                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1201                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1202                 }
1203         }
1204         ptlrpc_request_set_replen(req);
1205
1206         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1207         aa = ptlrpc_req_async_args(req);
1208         aa->aa_oa = oa;
1209         aa->aa_requested_nob = requested_nob;
1210         aa->aa_nio_count = niocount;
1211         aa->aa_page_count = page_count;
1212         aa->aa_resends = 0;
1213         aa->aa_ppga = pga;
1214         aa->aa_cli = cli;
1215         INIT_LIST_HEAD(&aa->aa_oaps);
1216
1217         *reqp = req;
1218         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1219         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1220                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1221                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1222         RETURN(0);
1223
1224  out:
1225         ptlrpc_req_finished(req);
1226         RETURN(rc);
1227 }
1228
1229 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1230                                 __u32 client_cksum, __u32 server_cksum, int nob,
1231                                 size_t page_count, struct brw_page **pga,
1232                                 cksum_type_t client_cksum_type)
1233 {
1234         __u32 new_cksum;
1235         char *msg;
1236         cksum_type_t cksum_type;
1237
1238         if (server_cksum == client_cksum) {
1239                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1240                 return 0;
1241         }
1242
1243         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1244                                        oa->o_flags : 0);
1245         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1246                                       cksum_type);
1247
1248         if (cksum_type != client_cksum_type)
1249                 msg = "the server did not use the checksum type specified in "
1250                       "the original request - likely a protocol problem";
1251         else if (new_cksum == server_cksum)
1252                 msg = "changed on the client after we checksummed it - "
1253                       "likely false positive due to mmap IO (bug 11742)";
1254         else if (new_cksum == client_cksum)
1255                 msg = "changed in transit before arrival at OST";
1256         else
1257                 msg = "changed in transit AND doesn't match the original - "
1258                       "likely false positive due to mmap IO (bug 11742)";
1259
1260         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1261                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1262                            msg, libcfs_nid2str(peer->nid),
1263                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1264                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1265                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1266                            POSTID(&oa->o_oi), pga[0]->off,
1267                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1268         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1269                "client csum now %x\n", client_cksum, client_cksum_type,
1270                server_cksum, cksum_type, new_cksum);
1271         return 1;
1272 }
1273
1274 /* Note rc enters this function as number of bytes transferred */
1275 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1276 {
1277         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1278         const lnet_process_id_t *peer =
1279                         &req->rq_import->imp_connection->c_peer;
1280         struct client_obd *cli = aa->aa_cli;
1281         struct ost_body *body;
1282         u32 client_cksum = 0;
1283         ENTRY;
1284
1285         if (rc < 0 && rc != -EDQUOT) {
1286                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1287                 RETURN(rc);
1288         }
1289
1290         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1292         if (body == NULL) {
1293                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1294                 RETURN(-EPROTO);
1295         }
1296
1297         /* set/clear over quota flag for a uid/gid */
1298         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1299             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1300                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1301
1302                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1303                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1304                        body->oa.o_flags);
1305                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1306         }
1307
1308         osc_update_grant(cli, body);
1309
1310         if (rc < 0)
1311                 RETURN(rc);
1312
1313         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1314                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1315
1316         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1317                 if (rc > 0) {
1318                         CERROR("Unexpected +ve rc %d\n", rc);
1319                         RETURN(-EPROTO);
1320                 }
1321                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1322
1323                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1324                         RETURN(-EAGAIN);
1325
1326                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1327                     check_write_checksum(&body->oa, peer, client_cksum,
1328                                          body->oa.o_cksum, aa->aa_requested_nob,
1329                                          aa->aa_page_count, aa->aa_ppga,
1330                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1331                         RETURN(-EAGAIN);
1332
1333                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1334                                      aa->aa_page_count, aa->aa_ppga);
1335                 GOTO(out, rc);
1336         }
1337
1338         /* The rest of this function executes only for OST_READs */
1339
1340         /* if unwrap_bulk failed, return -EAGAIN to retry */
1341         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1342         if (rc < 0)
1343                 GOTO(out, rc = -EAGAIN);
1344
1345         if (rc > aa->aa_requested_nob) {
1346                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1347                        aa->aa_requested_nob);
1348                 RETURN(-EPROTO);
1349         }
1350
1351         if (rc != req->rq_bulk->bd_nob_transferred) {
1352                 CERROR ("Unexpected rc %d (%d transferred)\n",
1353                         rc, req->rq_bulk->bd_nob_transferred);
1354                 return (-EPROTO);
1355         }
1356
1357         if (rc < aa->aa_requested_nob)
1358                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1359
1360         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1361                 static int cksum_counter;
1362                 u32        server_cksum = body->oa.o_cksum;
1363                 char      *via = "";
1364                 char      *router = "";
1365                 cksum_type_t cksum_type;
1366
1367                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1368                                                body->oa.o_flags : 0);
1369                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1370                                                  aa->aa_ppga, OST_READ,
1371                                                  cksum_type);
1372
1373                 if (peer->nid != req->rq_bulk->bd_sender) {
1374                         via = " via ";
1375                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1376                 }
1377
1378                 if (server_cksum != client_cksum) {
1379                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1380                                            "%s%s%s inode "DFID" object "DOSTID
1381                                            " extent ["LPU64"-"LPU64"]\n",
1382                                            req->rq_import->imp_obd->obd_name,
1383                                            libcfs_nid2str(peer->nid),
1384                                            via, router,
1385                                            body->oa.o_valid & OBD_MD_FLFID ?
1386                                                 body->oa.o_parent_seq : (__u64)0,
1387                                            body->oa.o_valid & OBD_MD_FLFID ?
1388                                                 body->oa.o_parent_oid : 0,
1389                                            body->oa.o_valid & OBD_MD_FLFID ?
1390                                                 body->oa.o_parent_ver : 0,
1391                                            POSTID(&body->oa.o_oi),
1392                                            aa->aa_ppga[0]->off,
1393                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1394                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1395                                                                         1);
1396                         CERROR("client %x, server %x, cksum_type %x\n",
1397                                client_cksum, server_cksum, cksum_type);
1398                         cksum_counter = 0;
1399                         aa->aa_oa->o_cksum = client_cksum;
1400                         rc = -EAGAIN;
1401                 } else {
1402                         cksum_counter++;
1403                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1404                         rc = 0;
1405                 }
1406         } else if (unlikely(client_cksum)) {
1407                 static int cksum_missed;
1408
1409                 cksum_missed++;
1410                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1411                         CERROR("Checksum %u requested from %s but not sent\n",
1412                                cksum_missed, libcfs_nid2str(peer->nid));
1413         } else {
1414                 rc = 0;
1415         }
1416 out:
1417         if (rc >= 0)
1418                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1419                                      aa->aa_oa, &body->oa);
1420
1421         RETURN(rc);
1422 }
1423
1424 static int osc_brw_redo_request(struct ptlrpc_request *request,
1425                                 struct osc_brw_async_args *aa, int rc)
1426 {
1427         struct ptlrpc_request *new_req;
1428         struct osc_brw_async_args *new_aa;
1429         struct osc_async_page *oap;
1430         ENTRY;
1431
1432         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1433                   "redo for recoverable error %d", rc);
1434
1435         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1436                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1437                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1438                                   aa->aa_ppga, &new_req, 1);
1439         if (rc)
1440                 RETURN(rc);
1441
1442         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1443                 if (oap->oap_request != NULL) {
1444                         LASSERTF(request == oap->oap_request,
1445                                  "request %p != oap_request %p\n",
1446                                  request, oap->oap_request);
1447                         if (oap->oap_interrupted) {
1448                                 ptlrpc_req_finished(new_req);
1449                                 RETURN(-EINTR);
1450                         }
1451                 }
1452         }
1453         /* New request takes over pga and oaps from old request.
1454          * Note that copying a list_head doesn't work, need to move it... */
1455         aa->aa_resends++;
1456         new_req->rq_interpret_reply = request->rq_interpret_reply;
1457         new_req->rq_async_args = request->rq_async_args;
1458         new_req->rq_commit_cb = request->rq_commit_cb;
1459         /* cap resend delay to the current request timeout, this is similar to
1460          * what ptlrpc does (see after_reply()) */
1461         if (aa->aa_resends > new_req->rq_timeout)
1462                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1463         else
1464                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1465         new_req->rq_generation_set = 1;
1466         new_req->rq_import_generation = request->rq_import_generation;
1467
1468         new_aa = ptlrpc_req_async_args(new_req);
1469
1470         INIT_LIST_HEAD(&new_aa->aa_oaps);
1471         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1472         INIT_LIST_HEAD(&new_aa->aa_exts);
1473         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1474         new_aa->aa_resends = aa->aa_resends;
1475
1476         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1477                 if (oap->oap_request) {
1478                         ptlrpc_req_finished(oap->oap_request);
1479                         oap->oap_request = ptlrpc_request_addref(new_req);
1480                 }
1481         }
1482
1483         /* XXX: This code will run into problem if we're going to support
1484          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1485          * and wait for all of them to be finished. We should inherit request
1486          * set from old request. */
1487         ptlrpcd_add_req(new_req);
1488
1489         DEBUG_REQ(D_INFO, new_req, "new request");
1490         RETURN(0);
1491 }
1492
1493 /*
1494  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1495  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1496  * fine for our small page arrays and doesn't require allocation.  its an
1497  * insertion sort that swaps elements that are strides apart, shrinking the
1498  * stride down until its '1' and the array is sorted.
1499  */
1500 static void sort_brw_pages(struct brw_page **array, int num)
1501 {
1502         int stride, i, j;
1503         struct brw_page *tmp;
1504
1505         if (num == 1)
1506                 return;
1507         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1508                 ;
1509
1510         do {
1511                 stride /= 3;
1512                 for (i = stride ; i < num ; i++) {
1513                         tmp = array[i];
1514                         j = i;
1515                         while (j >= stride && array[j - stride]->off > tmp->off) {
1516                                 array[j] = array[j - stride];
1517                                 j -= stride;
1518                         }
1519                         array[j] = tmp;
1520                 }
1521         } while (stride > 1);
1522 }
1523
1524 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1525 {
1526         LASSERT(ppga != NULL);
1527         OBD_FREE(ppga, sizeof(*ppga) * count);
1528 }
1529
1530 static int brw_interpret(const struct lu_env *env,
1531                          struct ptlrpc_request *req, void *data, int rc)
1532 {
1533         struct osc_brw_async_args *aa = data;
1534         struct osc_extent *ext;
1535         struct osc_extent *tmp;
1536         struct client_obd *cli = aa->aa_cli;
1537         ENTRY;
1538
1539         rc = osc_brw_fini_request(req, rc);
1540         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1541         /* When server return -EINPROGRESS, client should always retry
1542          * regardless of the number of times the bulk was resent already. */
1543         if (osc_recoverable_error(rc)) {
1544                 if (req->rq_import_generation !=
1545                     req->rq_import->imp_generation) {
1546                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1547                                ""DOSTID", rc = %d.\n",
1548                                req->rq_import->imp_obd->obd_name,
1549                                POSTID(&aa->aa_oa->o_oi), rc);
1550                 } else if (rc == -EINPROGRESS ||
1551                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1552                         rc = osc_brw_redo_request(req, aa, rc);
1553                 } else {
1554                         CERROR("%s: too many resent retries for object: "
1555                                ""LPU64":"LPU64", rc = %d.\n",
1556                                req->rq_import->imp_obd->obd_name,
1557                                POSTID(&aa->aa_oa->o_oi), rc);
1558                 }
1559
1560                 if (rc == 0)
1561                         RETURN(0);
1562                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1563                         rc = -EIO;
1564         }
1565
1566         if (rc == 0) {
1567                 struct obdo *oa = aa->aa_oa;
1568                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1569                 unsigned long valid = 0;
1570                 struct cl_object *obj;
1571                 struct osc_async_page *last;
1572
1573                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1574                 obj = osc2cl(last->oap_obj);
1575
1576                 cl_object_attr_lock(obj);
1577                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1578                         attr->cat_blocks = oa->o_blocks;
1579                         valid |= CAT_BLOCKS;
1580                 }
1581                 if (oa->o_valid & OBD_MD_FLMTIME) {
1582                         attr->cat_mtime = oa->o_mtime;
1583                         valid |= CAT_MTIME;
1584                 }
1585                 if (oa->o_valid & OBD_MD_FLATIME) {
1586                         attr->cat_atime = oa->o_atime;
1587                         valid |= CAT_ATIME;
1588                 }
1589                 if (oa->o_valid & OBD_MD_FLCTIME) {
1590                         attr->cat_ctime = oa->o_ctime;
1591                         valid |= CAT_CTIME;
1592                 }
1593
1594                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1595                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1596                         loff_t last_off = last->oap_count + last->oap_obj_off +
1597                                 last->oap_page_off;
1598
1599                         /* Change file size if this is an out of quota or
1600                          * direct IO write and it extends the file size */
1601                         if (loi->loi_lvb.lvb_size < last_off) {
1602                                 attr->cat_size = last_off;
1603                                 valid |= CAT_SIZE;
1604                         }
1605                         /* Extend KMS if it's not a lockless write */
1606                         if (loi->loi_kms < last_off &&
1607                             oap2osc_page(last)->ops_srvlock == 0) {
1608                                 attr->cat_kms = last_off;
1609                                 valid |= CAT_KMS;
1610                         }
1611                 }
1612
1613                 if (valid != 0)
1614                         cl_object_attr_update(env, obj, attr, valid);
1615                 cl_object_attr_unlock(obj);
1616         }
1617         OBDO_FREE(aa->aa_oa);
1618
1619         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1620                 osc_inc_unstable_pages(req);
1621
1622         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1623                 list_del_init(&ext->oe_link);
1624                 osc_extent_finish(env, ext, 1, rc);
1625         }
1626         LASSERT(list_empty(&aa->aa_exts));
1627         LASSERT(list_empty(&aa->aa_oaps));
1628
1629         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1630         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1631
1632         spin_lock(&cli->cl_loi_list_lock);
1633         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1634          * is called so we know whether to go to sync BRWs or wait for more
1635          * RPCs to complete */
1636         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1637                 cli->cl_w_in_flight--;
1638         else
1639                 cli->cl_r_in_flight--;
1640         osc_wake_cache_waiters(cli);
1641         spin_unlock(&cli->cl_loi_list_lock);
1642
1643         osc_io_unplug(env, cli, NULL);
1644         RETURN(rc);
1645 }
1646
1647 static void brw_commit(struct ptlrpc_request *req)
1648 {
1649         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1650          * this called via the rq_commit_cb, I need to ensure
1651          * osc_dec_unstable_pages is still called. Otherwise unstable
1652          * pages may be leaked. */
1653         spin_lock(&req->rq_lock);
1654         if (likely(req->rq_unstable)) {
1655                 req->rq_unstable = 0;
1656                 spin_unlock(&req->rq_lock);
1657
1658                 osc_dec_unstable_pages(req);
1659         } else {
1660                 req->rq_committed = 1;
1661                 spin_unlock(&req->rq_lock);
1662         }
1663 }
1664
1665 /**
1666  * Build an RPC by the list of extent @ext_list. The caller must ensure
1667  * that the total pages in this list are NOT over max pages per RPC.
1668  * Extents in the list must be in OES_RPC state.
1669  */
1670 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1671                   struct list_head *ext_list, int cmd)
1672 {
1673         struct ptlrpc_request           *req = NULL;
1674         struct osc_extent               *ext;
1675         struct brw_page                 **pga = NULL;
1676         struct osc_brw_async_args       *aa = NULL;
1677         struct obdo                     *oa = NULL;
1678         struct osc_async_page           *oap;
1679         struct osc_object               *obj = NULL;
1680         struct cl_req_attr              *crattr = NULL;
1681         loff_t                          starting_offset = OBD_OBJECT_EOF;
1682         loff_t                          ending_offset = 0;
1683         int                             mpflag = 0;
1684         int                             mem_tight = 0;
1685         int                             page_count = 0;
1686         bool                            soft_sync = false;
1687         bool                            interrupted = false;
1688         int                             i;
1689         int                             grant = 0;
1690         int                             rc;
1691         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1692         struct ost_body                 *body;
1693         ENTRY;
1694         LASSERT(!list_empty(ext_list));
1695
1696         /* add pages into rpc_list to build BRW rpc */
1697         list_for_each_entry(ext, ext_list, oe_link) {
1698                 LASSERT(ext->oe_state == OES_RPC);
1699                 mem_tight |= ext->oe_memalloc;
1700                 grant += ext->oe_grants;
1701                 page_count += ext->oe_nr_pages;
1702                 if (obj == NULL)
1703                         obj = ext->oe_obj;
1704         }
1705
1706         soft_sync = osc_over_unstable_soft_limit(cli);
1707         if (mem_tight)
1708                 mpflag = cfs_memory_pressure_get_and_set();
1709
1710         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1711         if (pga == NULL)
1712                 GOTO(out, rc = -ENOMEM);
1713
1714         OBDO_ALLOC(oa);
1715         if (oa == NULL)
1716                 GOTO(out, rc = -ENOMEM);
1717
1718         i = 0;
1719         list_for_each_entry(ext, ext_list, oe_link) {
1720                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1721                         if (mem_tight)
1722                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1723                         if (soft_sync)
1724                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1725                         pga[i] = &oap->oap_brw_page;
1726                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1727                         i++;
1728
1729                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1730                         if (starting_offset == OBD_OBJECT_EOF ||
1731                             starting_offset > oap->oap_obj_off)
1732                                 starting_offset = oap->oap_obj_off;
1733                         else
1734                                 LASSERT(oap->oap_page_off == 0);
1735                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1736                                 ending_offset = oap->oap_obj_off +
1737                                                 oap->oap_count;
1738                         else
1739                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1740                                         PAGE_CACHE_SIZE);
1741                         if (oap->oap_interrupted)
1742                                 interrupted = true;
1743                 }
1744         }
1745
1746         /* first page in the list */
1747         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1748
1749         crattr = &osc_env_info(env)->oti_req_attr;
1750         memset(crattr, 0, sizeof(*crattr));
1751         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1752         crattr->cra_flags = ~0ULL;
1753         crattr->cra_page = oap2cl_page(oap);
1754         crattr->cra_oa = oa;
1755         cl_req_attr_set(env, osc2cl(obj), crattr);
1756
1757         if (cmd == OBD_BRW_WRITE)
1758                 oa->o_grant_used = grant;
1759
1760         sort_brw_pages(pga, page_count);
1761         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1762         if (rc != 0) {
1763                 CERROR("prep_req failed: %d\n", rc);
1764                 GOTO(out, rc);
1765         }
1766
1767         req->rq_commit_cb = brw_commit;
1768         req->rq_interpret_reply = brw_interpret;
1769         req->rq_memalloc = mem_tight != 0;
1770         oap->oap_request = ptlrpc_request_addref(req);
1771         if (interrupted && !req->rq_intr)
1772                 ptlrpc_mark_interrupted(req);
1773
1774         /* Need to update the timestamps after the request is built in case
1775          * we race with setattr (locally or in queue at OST).  If OST gets
1776          * later setattr before earlier BRW (as determined by the request xid),
1777          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1778          * way to do this in a single call.  bug 10150 */
1779         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1780         crattr->cra_oa = &body->oa;
1781         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1782         cl_req_attr_set(env, osc2cl(obj), crattr);
1783         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1784
1785         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1786         aa = ptlrpc_req_async_args(req);
1787         INIT_LIST_HEAD(&aa->aa_oaps);
1788         list_splice_init(&rpc_list, &aa->aa_oaps);
1789         INIT_LIST_HEAD(&aa->aa_exts);
1790         list_splice_init(ext_list, &aa->aa_exts);
1791
1792         spin_lock(&cli->cl_loi_list_lock);
1793         starting_offset >>= PAGE_CACHE_SHIFT;
1794         if (cmd == OBD_BRW_READ) {
1795                 cli->cl_r_in_flight++;
1796                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1797                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1798                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1799                                       starting_offset + 1);
1800         } else {
1801                 cli->cl_w_in_flight++;
1802                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1803                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1804                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1805                                       starting_offset + 1);
1806         }
1807         spin_unlock(&cli->cl_loi_list_lock);
1808
1809         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1810                   page_count, aa, cli->cl_r_in_flight,
1811                   cli->cl_w_in_flight);
1812
1813         ptlrpcd_add_req(req);
1814         rc = 0;
1815         EXIT;
1816
1817 out:
1818         if (mem_tight != 0)
1819                 cfs_memory_pressure_restore(mpflag);
1820
1821         if (rc != 0) {
1822                 LASSERT(req == NULL);
1823
1824                 if (oa)
1825                         OBDO_FREE(oa);
1826                 if (pga)
1827                         OBD_FREE(pga, sizeof(*pga) * page_count);
1828                 /* this should happen rarely and is pretty bad, it makes the
1829                  * pending list not follow the dirty order */
1830                 while (!list_empty(ext_list)) {
1831                         ext = list_entry(ext_list->next, struct osc_extent,
1832                                          oe_link);
1833                         list_del_init(&ext->oe_link);
1834                         osc_extent_finish(env, ext, 0, rc);
1835                 }
1836         }
1837         RETURN(rc);
1838 }
1839
1840 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1841                                         struct ldlm_enqueue_info *einfo)
1842 {
1843         void *data = einfo->ei_cbdata;
1844         int set = 0;
1845
1846         LASSERT(lock != NULL);
1847         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1848         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1849         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1850         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1851
1852         lock_res_and_lock(lock);
1853
1854         if (lock->l_ast_data == NULL)
1855                 lock->l_ast_data = data;
1856         if (lock->l_ast_data == data)
1857                 set = 1;
1858
1859         unlock_res_and_lock(lock);
1860
1861         return set;
1862 }
1863
1864 static int osc_set_data_with_check(struct lustre_handle *lockh,
1865                                    struct ldlm_enqueue_info *einfo)
1866 {
1867         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1868         int set = 0;
1869
1870         if (lock != NULL) {
1871                 set = osc_set_lock_data_with_check(lock, einfo);
1872                 LDLM_LOCK_PUT(lock);
1873         } else
1874                 CERROR("lockh %p, data %p - client evicted?\n",
1875                        lockh, einfo->ei_cbdata);
1876         return set;
1877 }
1878
1879 static int osc_enqueue_fini(struct ptlrpc_request *req,
1880                             osc_enqueue_upcall_f upcall, void *cookie,
1881                             struct lustre_handle *lockh, enum ldlm_mode mode,
1882                             __u64 *flags, int agl, int errcode)
1883 {
1884         bool intent = *flags & LDLM_FL_HAS_INTENT;
1885         int rc;
1886         ENTRY;
1887
1888         /* The request was created before ldlm_cli_enqueue call. */
1889         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1890                 struct ldlm_reply *rep;
1891
1892                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1893                 LASSERT(rep != NULL);
1894
1895                 rep->lock_policy_res1 =
1896                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1897                 if (rep->lock_policy_res1)
1898                         errcode = rep->lock_policy_res1;
1899                 if (!agl)
1900                         *flags |= LDLM_FL_LVB_READY;
1901         } else if (errcode == ELDLM_OK) {
1902                 *flags |= LDLM_FL_LVB_READY;
1903         }
1904
1905         /* Call the update callback. */
1906         rc = (*upcall)(cookie, lockh, errcode);
1907
1908         /* release the reference taken in ldlm_cli_enqueue() */
1909         if (errcode == ELDLM_LOCK_MATCHED)
1910                 errcode = ELDLM_OK;
1911         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1912                 ldlm_lock_decref(lockh, mode);
1913
1914         RETURN(rc);
1915 }
1916
1917 static int osc_enqueue_interpret(const struct lu_env *env,
1918                                  struct ptlrpc_request *req,
1919                                  struct osc_enqueue_args *aa, int rc)
1920 {
1921         struct ldlm_lock *lock;
1922         struct lustre_handle *lockh = &aa->oa_lockh;
1923         enum ldlm_mode mode = aa->oa_mode;
1924         struct ost_lvb *lvb = aa->oa_lvb;
1925         __u32 lvb_len = sizeof(*lvb);
1926         __u64 flags = 0;
1927
1928         ENTRY;
1929
1930         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1931          * be valid. */
1932         lock = ldlm_handle2lock(lockh);
1933         LASSERTF(lock != NULL,
1934                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1935                  lockh->cookie, req, aa);
1936
1937         /* Take an additional reference so that a blocking AST that
1938          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1939          * to arrive after an upcall has been executed by
1940          * osc_enqueue_fini(). */
1941         ldlm_lock_addref(lockh, mode);
1942
1943         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1944         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1945
1946         /* Let CP AST to grant the lock first. */
1947         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1948
1949         if (aa->oa_agl) {
1950                 LASSERT(aa->oa_lvb == NULL);
1951                 LASSERT(aa->oa_flags == NULL);
1952                 aa->oa_flags = &flags;
1953         }
1954
1955         /* Complete obtaining the lock procedure. */
1956         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1957                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1958                                    lockh, rc);
1959         /* Complete osc stuff. */
1960         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1961                               aa->oa_flags, aa->oa_agl, rc);
1962
1963         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1964
1965         ldlm_lock_decref(lockh, mode);
1966         LDLM_LOCK_PUT(lock);
1967         RETURN(rc);
1968 }
1969
1970 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1971
1972 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1973  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1974  * other synchronous requests, however keeping some locks and trying to obtain
1975  * others may take a considerable amount of time in a case of ost failure; and
1976  * when other sync requests do not get released lock from a client, the client
1977  * is evicted from the cluster -- such scenarious make the life difficult, so
1978  * release locks just after they are obtained. */
1979 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1980                      __u64 *flags, union ldlm_policy_data *policy,
1981                      struct ost_lvb *lvb, int kms_valid,
1982                      osc_enqueue_upcall_f upcall, void *cookie,
1983                      struct ldlm_enqueue_info *einfo,
1984                      struct ptlrpc_request_set *rqset, int async, int agl)
1985 {
1986         struct obd_device *obd = exp->exp_obd;
1987         struct lustre_handle lockh = { 0 };
1988         struct ptlrpc_request *req = NULL;
1989         int intent = *flags & LDLM_FL_HAS_INTENT;
1990         __u64 match_flags = *flags;
1991         enum ldlm_mode mode;
1992         int rc;
1993         ENTRY;
1994
1995         /* Filesystem lock extents are extended to page boundaries so that
1996          * dealing with the page cache is a little smoother.  */
1997         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1998         policy->l_extent.end |= ~PAGE_MASK;
1999
2000         /*
2001          * kms is not valid when either object is completely fresh (so that no
2002          * locks are cached), or object was evicted. In the latter case cached
2003          * lock cannot be used, because it would prime inode state with
2004          * potentially stale LVB.
2005          */
2006         if (!kms_valid)
2007                 goto no_match;
2008
2009         /* Next, search for already existing extent locks that will cover us */
2010         /* If we're trying to read, we also search for an existing PW lock.  The
2011          * VFS and page cache already protect us locally, so lots of readers/
2012          * writers can share a single PW lock.
2013          *
2014          * There are problems with conversion deadlocks, so instead of
2015          * converting a read lock to a write lock, we'll just enqueue a new
2016          * one.
2017          *
2018          * At some point we should cancel the read lock instead of making them
2019          * send us a blocking callback, but there are problems with canceling
2020          * locks out from other users right now, too. */
2021         mode = einfo->ei_mode;
2022         if (einfo->ei_mode == LCK_PR)
2023                 mode |= LCK_PW;
2024         if (agl == 0)
2025                 match_flags |= LDLM_FL_LVB_READY;
2026         if (intent != 0)
2027                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2028         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2029                                einfo->ei_type, policy, mode, &lockh, 0);
2030         if (mode) {
2031                 struct ldlm_lock *matched;
2032
2033                 if (*flags & LDLM_FL_TEST_LOCK)
2034                         RETURN(ELDLM_OK);
2035
2036                 matched = ldlm_handle2lock(&lockh);
2037                 if (agl) {
2038                         /* AGL enqueues DLM locks speculatively. Therefore if
2039                          * it already exists a DLM lock, it wll just inform the
2040                          * caller to cancel the AGL process for this stripe. */
2041                         ldlm_lock_decref(&lockh, mode);
2042                         LDLM_LOCK_PUT(matched);
2043                         RETURN(-ECANCELED);
2044                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2045                         *flags |= LDLM_FL_LVB_READY;
2046
2047                         /* We already have a lock, and it's referenced. */
2048                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2049
2050                         ldlm_lock_decref(&lockh, mode);
2051                         LDLM_LOCK_PUT(matched);
2052                         RETURN(ELDLM_OK);
2053                 } else {
2054                         ldlm_lock_decref(&lockh, mode);
2055                         LDLM_LOCK_PUT(matched);
2056                 }
2057         }
2058
2059 no_match:
2060         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2061                 RETURN(-ENOLCK);
2062
2063         if (intent) {
2064                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2065                                            &RQF_LDLM_ENQUEUE_LVB);
2066                 if (req == NULL)
2067                         RETURN(-ENOMEM);
2068
2069                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2070                 if (rc) {
2071                         ptlrpc_request_free(req);
2072                         RETURN(rc);
2073                 }
2074
2075                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2076                                      sizeof *lvb);
2077                 ptlrpc_request_set_replen(req);
2078         }
2079
2080         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2081         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2082
2083         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2084                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2085         if (async) {
2086                 if (!rc) {
2087                         struct osc_enqueue_args *aa;
2088                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2089                         aa = ptlrpc_req_async_args(req);
2090                         aa->oa_exp    = exp;
2091                         aa->oa_mode   = einfo->ei_mode;
2092                         aa->oa_type   = einfo->ei_type;
2093                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2094                         aa->oa_upcall = upcall;
2095                         aa->oa_cookie = cookie;
2096                         aa->oa_agl    = !!agl;
2097                         if (!agl) {
2098                                 aa->oa_flags  = flags;
2099                                 aa->oa_lvb    = lvb;
2100                         } else {
2101                                 /* AGL is essentially to enqueue an DLM lock
2102                                  * in advance, so we don't care about the
2103                                  * result of AGL enqueue. */
2104                                 aa->oa_lvb    = NULL;
2105                                 aa->oa_flags  = NULL;
2106                         }
2107
2108                         req->rq_interpret_reply =
2109                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2110                         if (rqset == PTLRPCD_SET)
2111                                 ptlrpcd_add_req(req);
2112                         else
2113                                 ptlrpc_set_add_req(rqset, req);
2114                 } else if (intent) {
2115                         ptlrpc_req_finished(req);
2116                 }
2117                 RETURN(rc);
2118         }
2119
2120         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2121                               flags, agl, rc);
2122         if (intent)
2123                 ptlrpc_req_finished(req);
2124
2125         RETURN(rc);
2126 }
2127
2128 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2129                    enum ldlm_type type, union ldlm_policy_data *policy,
2130                    enum ldlm_mode mode, __u64 *flags, void *data,
2131                    struct lustre_handle *lockh, int unref)
2132 {
2133         struct obd_device *obd = exp->exp_obd;
2134         __u64 lflags = *flags;
2135         enum ldlm_mode rc;
2136         ENTRY;
2137
2138         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2139                 RETURN(-EIO);
2140
2141         /* Filesystem lock extents are extended to page boundaries so that
2142          * dealing with the page cache is a little smoother */
2143         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2144         policy->l_extent.end |= ~PAGE_MASK;
2145
2146         /* Next, search for already existing extent locks that will cover us */
2147         /* If we're trying to read, we also search for an existing PW lock.  The
2148          * VFS and page cache already protect us locally, so lots of readers/
2149          * writers can share a single PW lock. */
2150         rc = mode;
2151         if (mode == LCK_PR)
2152                 rc |= LCK_PW;
2153         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2154                              res_id, type, policy, rc, lockh, unref);
2155         if (rc) {
2156                 if (data != NULL) {
2157                         if (!osc_set_data_with_check(lockh, data)) {
2158                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2159                                         ldlm_lock_decref(lockh, rc);
2160                                 RETURN(0);
2161                         }
2162                 }
2163                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2164                         ldlm_lock_addref(lockh, LCK_PR);
2165                         ldlm_lock_decref(lockh, LCK_PW);
2166                 }
2167                 RETURN(rc);
2168         }
2169         RETURN(rc);
2170 }
2171
2172 static int osc_statfs_interpret(const struct lu_env *env,
2173                                 struct ptlrpc_request *req,
2174                                 struct osc_async_args *aa, int rc)
2175 {
2176         struct obd_statfs *msfs;
2177         ENTRY;
2178
2179         if (rc == -EBADR)
2180                 /* The request has in fact never been sent
2181                  * due to issues at a higher level (LOV).
2182                  * Exit immediately since the caller is
2183                  * aware of the problem and takes care
2184                  * of the clean up */
2185                  RETURN(rc);
2186
2187         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2188             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2189                 GOTO(out, rc = 0);
2190
2191         if (rc != 0)
2192                 GOTO(out, rc);
2193
2194         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2195         if (msfs == NULL) {
2196                 GOTO(out, rc = -EPROTO);
2197         }
2198
2199         *aa->aa_oi->oi_osfs = *msfs;
2200 out:
2201         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2202         RETURN(rc);
2203 }
2204
2205 static int osc_statfs_async(struct obd_export *exp,
2206                             struct obd_info *oinfo, __u64 max_age,
2207                             struct ptlrpc_request_set *rqset)
2208 {
2209         struct obd_device     *obd = class_exp2obd(exp);
2210         struct ptlrpc_request *req;
2211         struct osc_async_args *aa;
2212         int                    rc;
2213         ENTRY;
2214
2215         /* We could possibly pass max_age in the request (as an absolute
2216          * timestamp or a "seconds.usec ago") so the target can avoid doing
2217          * extra calls into the filesystem if that isn't necessary (e.g.
2218          * during mount that would help a bit).  Having relative timestamps
2219          * is not so great if request processing is slow, while absolute
2220          * timestamps are not ideal because they need time synchronization. */
2221         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2222         if (req == NULL)
2223                 RETURN(-ENOMEM);
2224
2225         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2226         if (rc) {
2227                 ptlrpc_request_free(req);
2228                 RETURN(rc);
2229         }
2230         ptlrpc_request_set_replen(req);
2231         req->rq_request_portal = OST_CREATE_PORTAL;
2232         ptlrpc_at_set_req_timeout(req);
2233
2234         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2235                 /* procfs requests not want stat in wait for avoid deadlock */
2236                 req->rq_no_resend = 1;
2237                 req->rq_no_delay = 1;
2238         }
2239
2240         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2241         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2242         aa = ptlrpc_req_async_args(req);
2243         aa->aa_oi = oinfo;
2244
2245         ptlrpc_set_add_req(rqset, req);
2246         RETURN(0);
2247 }
2248
2249 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2250                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2251 {
2252         struct obd_device     *obd = class_exp2obd(exp);
2253         struct obd_statfs     *msfs;
2254         struct ptlrpc_request *req;
2255         struct obd_import     *imp = NULL;
2256         int rc;
2257         ENTRY;
2258
2259         /*Since the request might also come from lprocfs, so we need
2260          *sync this with client_disconnect_export Bug15684*/
2261         down_read(&obd->u.cli.cl_sem);
2262         if (obd->u.cli.cl_import)
2263                 imp = class_import_get(obd->u.cli.cl_import);
2264         up_read(&obd->u.cli.cl_sem);
2265         if (!imp)
2266                 RETURN(-ENODEV);
2267
2268         /* We could possibly pass max_age in the request (as an absolute
2269          * timestamp or a "seconds.usec ago") so the target can avoid doing
2270          * extra calls into the filesystem if that isn't necessary (e.g.
2271          * during mount that would help a bit).  Having relative timestamps
2272          * is not so great if request processing is slow, while absolute
2273          * timestamps are not ideal because they need time synchronization. */
2274         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2275
2276         class_import_put(imp);
2277
2278         if (req == NULL)
2279                 RETURN(-ENOMEM);
2280
2281         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2282         if (rc) {
2283                 ptlrpc_request_free(req);
2284                 RETURN(rc);
2285         }
2286         ptlrpc_request_set_replen(req);
2287         req->rq_request_portal = OST_CREATE_PORTAL;
2288         ptlrpc_at_set_req_timeout(req);
2289
2290         if (flags & OBD_STATFS_NODELAY) {
2291                 /* procfs requests not want stat in wait for avoid deadlock */
2292                 req->rq_no_resend = 1;
2293                 req->rq_no_delay = 1;
2294         }
2295
2296         rc = ptlrpc_queue_wait(req);
2297         if (rc)
2298                 GOTO(out, rc);
2299
2300         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2301         if (msfs == NULL) {
2302                 GOTO(out, rc = -EPROTO);
2303         }
2304
2305         *osfs = *msfs;
2306
2307         EXIT;
2308  out:
2309         ptlrpc_req_finished(req);
2310         return rc;
2311 }
2312
2313 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2314                          void *karg, void __user *uarg)
2315 {
2316         struct obd_device *obd = exp->exp_obd;
2317         struct obd_ioctl_data *data = karg;
2318         int err = 0;
2319         ENTRY;
2320
2321         if (!try_module_get(THIS_MODULE)) {
2322                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2323                        module_name(THIS_MODULE));
2324                 return -EINVAL;
2325         }
2326         switch (cmd) {
2327         case OBD_IOC_CLIENT_RECOVER:
2328                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2329                                             data->ioc_inlbuf1, 0);
2330                 if (err > 0)
2331                         err = 0;
2332                 GOTO(out, err);
2333         case IOC_OSC_SET_ACTIVE:
2334                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2335                                                data->ioc_offset);
2336                 GOTO(out, err);
2337         case OBD_IOC_PING_TARGET:
2338                 err = ptlrpc_obd_ping(obd);
2339                 GOTO(out, err);
2340         default:
2341                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2342                        cmd, current_comm());
2343                 GOTO(out, err = -ENOTTY);
2344         }
2345 out:
2346         module_put(THIS_MODULE);
2347         return err;
2348 }
2349
2350 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2351                               u32 keylen, void *key,
2352                               u32 vallen, void *val,
2353                               struct ptlrpc_request_set *set)
2354 {
2355         struct ptlrpc_request *req;
2356         struct obd_device     *obd = exp->exp_obd;
2357         struct obd_import     *imp = class_exp2cliimp(exp);
2358         char                  *tmp;
2359         int                    rc;
2360         ENTRY;
2361
2362         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2363
2364         if (KEY_IS(KEY_CHECKSUM)) {
2365                 if (vallen != sizeof(int))
2366                         RETURN(-EINVAL);
2367                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2368                 RETURN(0);
2369         }
2370
2371         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2372                 sptlrpc_conf_client_adapt(obd);
2373                 RETURN(0);
2374         }
2375
2376         if (KEY_IS(KEY_FLUSH_CTX)) {
2377                 sptlrpc_import_flush_my_ctx(imp);
2378                 RETURN(0);
2379         }
2380
2381         if (KEY_IS(KEY_CACHE_SET)) {
2382                 struct client_obd *cli = &obd->u.cli;
2383
2384                 LASSERT(cli->cl_cache == NULL); /* only once */
2385                 cli->cl_cache = (struct cl_client_cache *)val;
2386                 cl_cache_incref(cli->cl_cache);
2387                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2388
2389                 /* add this osc into entity list */
2390                 LASSERT(list_empty(&cli->cl_lru_osc));
2391                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2392                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2393                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2394
2395                 RETURN(0);
2396         }
2397
2398         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2399                 struct client_obd *cli = &obd->u.cli;
2400                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2401                 long target = *(long *)val;
2402
2403                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2404                 *(long *)val -= nr;
2405                 RETURN(0);
2406         }
2407
2408         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2409                 RETURN(-EINVAL);
2410
2411         /* We pass all other commands directly to OST. Since nobody calls osc
2412            methods directly and everybody is supposed to go through LOV, we
2413            assume lov checked invalid values for us.
2414            The only recognised values so far are evict_by_nid and mds_conn.
2415            Even if something bad goes through, we'd get a -EINVAL from OST
2416            anyway. */
2417
2418         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2419                                                 &RQF_OST_SET_GRANT_INFO :
2420                                                 &RQF_OBD_SET_INFO);
2421         if (req == NULL)
2422                 RETURN(-ENOMEM);
2423
2424         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2425                              RCL_CLIENT, keylen);
2426         if (!KEY_IS(KEY_GRANT_SHRINK))
2427                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2428                                      RCL_CLIENT, vallen);
2429         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2430         if (rc) {
2431                 ptlrpc_request_free(req);
2432                 RETURN(rc);
2433         }
2434
2435         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2436         memcpy(tmp, key, keylen);
2437         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2438                                                         &RMF_OST_BODY :
2439                                                         &RMF_SETINFO_VAL);
2440         memcpy(tmp, val, vallen);
2441
2442         if (KEY_IS(KEY_GRANT_SHRINK)) {
2443                 struct osc_grant_args *aa;
2444                 struct obdo *oa;
2445
2446                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2447                 aa = ptlrpc_req_async_args(req);
2448                 OBDO_ALLOC(oa);
2449                 if (!oa) {
2450                         ptlrpc_req_finished(req);
2451                         RETURN(-ENOMEM);
2452                 }
2453                 *oa = ((struct ost_body *)val)->oa;
2454                 aa->aa_oa = oa;
2455                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2456         }
2457
2458         ptlrpc_request_set_replen(req);
2459         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2460                 LASSERT(set != NULL);
2461                 ptlrpc_set_add_req(set, req);
2462                 ptlrpc_check_set(NULL, set);
2463         } else {
2464                 ptlrpcd_add_req(req);
2465         }
2466
2467         RETURN(0);
2468 }
2469
2470 static int osc_reconnect(const struct lu_env *env,
2471                          struct obd_export *exp, struct obd_device *obd,
2472                          struct obd_uuid *cluuid,
2473                          struct obd_connect_data *data,
2474                          void *localdata)
2475 {
2476         struct client_obd *cli = &obd->u.cli;
2477
2478         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2479                 long lost_grant;
2480                 long grant;
2481
2482                 spin_lock(&cli->cl_loi_list_lock);
2483                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2484                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2485                         grant += cli->cl_dirty_grant;
2486                 else
2487                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2488                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2489                 lost_grant = cli->cl_lost_grant;
2490                 cli->cl_lost_grant = 0;
2491                 spin_unlock(&cli->cl_loi_list_lock);
2492
2493                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2494                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2495                        data->ocd_version, data->ocd_grant, lost_grant);
2496         }
2497
2498         RETURN(0);
2499 }
2500
2501 static int osc_disconnect(struct obd_export *exp)
2502 {
2503         struct obd_device *obd = class_exp2obd(exp);
2504         int rc;
2505
2506         rc = client_disconnect_export(exp);
2507         /**
2508          * Initially we put del_shrink_grant before disconnect_export, but it
2509          * causes the following problem if setup (connect) and cleanup
2510          * (disconnect) are tangled together.
2511          *      connect p1                     disconnect p2
2512          *   ptlrpc_connect_import
2513          *     ...............               class_manual_cleanup
2514          *                                     osc_disconnect
2515          *                                     del_shrink_grant
2516          *   ptlrpc_connect_interrupt
2517          *     init_grant_shrink
2518          *   add this client to shrink list
2519          *                                      cleanup_osc
2520          * Bang! pinger trigger the shrink.
2521          * So the osc should be disconnected from the shrink list, after we
2522          * are sure the import has been destroyed. BUG18662
2523          */
2524         if (obd->u.cli.cl_import == NULL)
2525                 osc_del_shrink_grant(&obd->u.cli);
2526         return rc;
2527 }
2528
2529 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2530         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2531 {
2532         struct lu_env *env = arg;
2533         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2534         struct ldlm_lock *lock;
2535         struct osc_object *osc = NULL;
2536         ENTRY;
2537
2538         lock_res(res);
2539         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2540                 if (lock->l_ast_data != NULL && osc == NULL) {
2541                         osc = lock->l_ast_data;
2542                         cl_object_get(osc2cl(osc));
2543                 }
2544
2545                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2546                  * by the 2nd round of ldlm_namespace_clean() call in
2547                  * osc_import_event(). */
2548                 ldlm_clear_cleaned(lock);
2549         }
2550         unlock_res(res);
2551
2552         if (osc != NULL) {
2553                 osc_object_invalidate(env, osc);
2554                 cl_object_put(env, osc2cl(osc));
2555         }
2556
2557         RETURN(0);
2558 }
2559
2560 static int osc_import_event(struct obd_device *obd,
2561                             struct obd_import *imp,
2562                             enum obd_import_event event)
2563 {
2564         struct client_obd *cli;
2565         int rc = 0;
2566
2567         ENTRY;
2568         LASSERT(imp->imp_obd == obd);
2569
2570         switch (event) {
2571         case IMP_EVENT_DISCON: {
2572                 cli = &obd->u.cli;
2573                 spin_lock(&cli->cl_loi_list_lock);
2574                 cli->cl_avail_grant = 0;
2575                 cli->cl_lost_grant = 0;
2576                 spin_unlock(&cli->cl_loi_list_lock);
2577                 break;
2578         }
2579         case IMP_EVENT_INACTIVE: {
2580                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2581                 break;
2582         }
2583         case IMP_EVENT_INVALIDATE: {
2584                 struct ldlm_namespace *ns = obd->obd_namespace;
2585                 struct lu_env         *env;
2586                 __u16                  refcheck;
2587
2588                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2589
2590                 env = cl_env_get(&refcheck);
2591                 if (!IS_ERR(env)) {
2592                         osc_io_unplug(env, &obd->u.cli, NULL);
2593
2594                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2595                                                  osc_ldlm_resource_invalidate,
2596                                                  env, 0);
2597                         cl_env_put(env, &refcheck);
2598
2599                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2600                 } else
2601                         rc = PTR_ERR(env);
2602                 break;
2603         }
2604         case IMP_EVENT_ACTIVE: {
2605                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2606                 break;
2607         }
2608         case IMP_EVENT_OCD: {
2609                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2610
2611                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2612                         osc_init_grant(&obd->u.cli, ocd);
2613
2614                 /* See bug 7198 */
2615                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2616                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2617
2618                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2619                 break;
2620         }
2621         case IMP_EVENT_DEACTIVATE: {
2622                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2623                 break;
2624         }
2625         case IMP_EVENT_ACTIVATE: {
2626                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2627                 break;
2628         }
2629         default:
2630                 CERROR("Unknown import event %d\n", event);
2631                 LBUG();
2632         }
2633         RETURN(rc);
2634 }
2635
2636 /**
2637  * Determine whether the lock can be canceled before replaying the lock
2638  * during recovery, see bug16774 for detailed information.
2639  *
2640  * \retval zero the lock can't be canceled
2641  * \retval other ok to cancel
2642  */
2643 static int osc_cancel_weight(struct ldlm_lock *lock)
2644 {
2645         /*
2646          * Cancel all unused and granted extent lock.
2647          */
2648         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2649             lock->l_granted_mode == lock->l_req_mode &&
2650             osc_ldlm_weigh_ast(lock) == 0)
2651                 RETURN(1);
2652
2653         RETURN(0);
2654 }
2655
2656 static int brw_queue_work(const struct lu_env *env, void *data)
2657 {
2658         struct client_obd *cli = data;
2659
2660         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2661
2662         osc_io_unplug(env, cli, NULL);
2663         RETURN(0);
2664 }
2665
2666 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2667 {
2668         struct client_obd *cli = &obd->u.cli;
2669         struct obd_type   *type;
2670         void              *handler;
2671         int                rc;
2672         int                adding;
2673         int                added;
2674         int                req_count;
2675         ENTRY;
2676
2677         rc = ptlrpcd_addref();
2678         if (rc)
2679                 RETURN(rc);
2680
2681         rc = client_obd_setup(obd, lcfg);
2682         if (rc)
2683                 GOTO(out_ptlrpcd, rc);
2684
2685         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2686         if (IS_ERR(handler))
2687                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2688         cli->cl_writeback_work = handler;
2689
2690         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2691         if (IS_ERR(handler))
2692                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2693         cli->cl_lru_work = handler;
2694
2695         rc = osc_quota_setup(obd);
2696         if (rc)
2697                 GOTO(out_ptlrpcd_work, rc);
2698
2699         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2700
2701 #ifdef CONFIG_PROC_FS
2702         obd->obd_vars = lprocfs_osc_obd_vars;
2703 #endif
2704         /* If this is true then both client (osc) and server (osp) are on the
2705          * same node. The osp layer if loaded first will register the osc proc
2706          * directory. In that case this obd_device will be attached its proc
2707          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2708         type = class_search_type(LUSTRE_OSP_NAME);
2709         if (type && type->typ_procsym) {
2710                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2711                                                        type->typ_procsym,
2712                                                        obd->obd_vars, obd);
2713                 if (IS_ERR(obd->obd_proc_entry)) {
2714                         rc = PTR_ERR(obd->obd_proc_entry);
2715                         CERROR("error %d setting up lprocfs for %s\n", rc,
2716                                obd->obd_name);
2717                         obd->obd_proc_entry = NULL;
2718                 }
2719         } else {
2720                 rc = lprocfs_obd_setup(obd);
2721         }
2722
2723         /* If the basic OSC proc tree construction succeeded then
2724          * lets do the rest. */
2725         if (rc == 0) {
2726                 lproc_osc_attach_seqstat(obd);
2727                 sptlrpc_lprocfs_cliobd_attach(obd);
2728                 ptlrpc_lprocfs_register_obd(obd);
2729         }
2730
2731         /*
2732          * We try to control the total number of requests with a upper limit
2733          * osc_reqpool_maxreqcount. There might be some race which will cause
2734          * over-limit allocation, but it is fine.
2735          */
2736         req_count = atomic_read(&osc_pool_req_count);
2737         if (req_count < osc_reqpool_maxreqcount) {
2738                 adding = cli->cl_max_rpcs_in_flight + 2;
2739                 if (req_count + adding > osc_reqpool_maxreqcount)
2740                         adding = osc_reqpool_maxreqcount - req_count;
2741
2742                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2743                 atomic_add(added, &osc_pool_req_count);
2744         }
2745
2746         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2747         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2748
2749         spin_lock(&osc_shrink_lock);
2750         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2751         spin_unlock(&osc_shrink_lock);
2752
2753         RETURN(0);
2754
2755 out_ptlrpcd_work:
2756         if (cli->cl_writeback_work != NULL) {
2757                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2758                 cli->cl_writeback_work = NULL;
2759         }
2760         if (cli->cl_lru_work != NULL) {
2761                 ptlrpcd_destroy_work(cli->cl_lru_work);
2762                 cli->cl_lru_work = NULL;
2763         }
2764 out_client_setup:
2765         client_obd_cleanup(obd);
2766 out_ptlrpcd:
2767         ptlrpcd_decref();
2768         RETURN(rc);
2769 }
2770
2771 static int osc_precleanup(struct obd_device *obd)
2772 {
2773         struct client_obd *cli = &obd->u.cli;
2774         ENTRY;
2775
2776         /* LU-464
2777          * for echo client, export may be on zombie list, wait for
2778          * zombie thread to cull it, because cli.cl_import will be
2779          * cleared in client_disconnect_export():
2780          *   class_export_destroy() -> obd_cleanup() ->
2781          *   echo_device_free() -> echo_client_cleanup() ->
2782          *   obd_disconnect() -> osc_disconnect() ->
2783          *   client_disconnect_export()
2784          */
2785         obd_zombie_barrier();
2786         if (cli->cl_writeback_work) {
2787                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2788                 cli->cl_writeback_work = NULL;
2789         }
2790
2791         if (cli->cl_lru_work) {
2792                 ptlrpcd_destroy_work(cli->cl_lru_work);
2793                 cli->cl_lru_work = NULL;
2794         }
2795
2796         obd_cleanup_client_import(obd);
2797         ptlrpc_lprocfs_unregister_obd(obd);
2798         lprocfs_obd_cleanup(obd);
2799         RETURN(0);
2800 }
2801
2802 int osc_cleanup(struct obd_device *obd)
2803 {
2804         struct client_obd *cli = &obd->u.cli;
2805         int rc;
2806
2807         ENTRY;
2808
2809         spin_lock(&osc_shrink_lock);
2810         list_del(&cli->cl_shrink_list);
2811         spin_unlock(&osc_shrink_lock);
2812
2813         /* lru cleanup */
2814         if (cli->cl_cache != NULL) {
2815                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2816                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2817                 list_del_init(&cli->cl_lru_osc);
2818                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2819                 cli->cl_lru_left = NULL;
2820                 cl_cache_decref(cli->cl_cache);
2821                 cli->cl_cache = NULL;
2822         }
2823
2824         /* free memory of osc quota cache */
2825         osc_quota_cleanup(obd);
2826
2827         rc = client_obd_cleanup(obd);
2828
2829         ptlrpcd_decref();
2830         RETURN(rc);
2831 }
2832
2833 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2834 {
2835         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2836         return rc > 0 ? 0: rc;
2837 }
2838
2839 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2840 {
2841         return osc_process_config_base(obd, buf);
2842 }
2843
2844 static struct obd_ops osc_obd_ops = {
2845         .o_owner                = THIS_MODULE,
2846         .o_setup                = osc_setup,
2847         .o_precleanup           = osc_precleanup,
2848         .o_cleanup              = osc_cleanup,
2849         .o_add_conn             = client_import_add_conn,
2850         .o_del_conn             = client_import_del_conn,
2851         .o_connect              = client_connect_import,
2852         .o_reconnect            = osc_reconnect,
2853         .o_disconnect           = osc_disconnect,
2854         .o_statfs               = osc_statfs,
2855         .o_statfs_async         = osc_statfs_async,
2856         .o_create               = osc_create,
2857         .o_destroy              = osc_destroy,
2858         .o_getattr              = osc_getattr,
2859         .o_setattr              = osc_setattr,
2860         .o_iocontrol            = osc_iocontrol,
2861         .o_set_info_async       = osc_set_info_async,
2862         .o_import_event         = osc_import_event,
2863         .o_process_config       = osc_process_config,
2864         .o_quotactl             = osc_quotactl,
2865 };
2866
2867 static struct shrinker *osc_cache_shrinker;
2868 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2869 DEFINE_SPINLOCK(osc_shrink_lock);
2870
2871 #ifndef HAVE_SHRINKER_COUNT
2872 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2873 {
2874         struct shrink_control scv = {
2875                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2876                 .gfp_mask   = shrink_param(sc, gfp_mask)
2877         };
2878 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2879         struct shrinker *shrinker = NULL;
2880 #endif
2881
2882         (void)osc_cache_shrink_scan(shrinker, &scv);
2883
2884         return osc_cache_shrink_count(shrinker, &scv);
2885 }
2886 #endif
2887
2888 static int __init osc_init(void)
2889 {
2890         bool enable_proc = true;
2891         struct obd_type *type;
2892         unsigned int reqpool_size;
2893         unsigned int reqsize;
2894         int rc;
2895         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2896                          osc_cache_shrink_count, osc_cache_shrink_scan);
2897         ENTRY;
2898
2899         /* print an address of _any_ initialized kernel symbol from this
2900          * module, to allow debugging with gdb that doesn't support data
2901          * symbols from modules.*/
2902         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2903
2904         rc = lu_kmem_init(osc_caches);
2905         if (rc)
2906                 RETURN(rc);
2907
2908         type = class_search_type(LUSTRE_OSP_NAME);
2909         if (type != NULL && type->typ_procsym != NULL)
2910                 enable_proc = false;
2911
2912         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2913                                  LUSTRE_OSC_NAME, &osc_device_type);
2914         if (rc)
2915                 GOTO(out_kmem, rc);
2916
2917         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2918
2919         /* This is obviously too much memory, only prevent overflow here */
2920         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2921                 GOTO(out_type, rc = -EINVAL);
2922
2923         reqpool_size = osc_reqpool_mem_max << 20;
2924
2925         reqsize = 1;
2926         while (reqsize < OST_IO_MAXREQSIZE)
2927                 reqsize = reqsize << 1;
2928
2929         /*
2930          * We don't enlarge the request count in OSC pool according to
2931          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2932          * tried after normal allocation failed. So a small OSC pool won't
2933          * cause much performance degression in most of cases.
2934          */
2935         osc_reqpool_maxreqcount = reqpool_size / reqsize;
2936
2937         atomic_set(&osc_pool_req_count, 0);
2938         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2939                                           ptlrpc_add_rqs_to_pool);
2940
2941         if (osc_rq_pool != NULL)
2942                 GOTO(out, rc);
2943         rc = -ENOMEM;
2944 out_type:
2945         class_unregister_type(LUSTRE_OSC_NAME);
2946 out_kmem:
2947         lu_kmem_fini(osc_caches);
2948 out:
2949         RETURN(rc);
2950 }
2951
2952 static void __exit osc_exit(void)
2953 {
2954         remove_shrinker(osc_cache_shrinker);
2955         class_unregister_type(LUSTRE_OSC_NAME);
2956         lu_kmem_fini(osc_caches);
2957         ptlrpc_free_rq_pool(osc_rq_pool);
2958 }
2959
2960 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2961 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2962 MODULE_VERSION(LUSTRE_VERSION_STRING);
2963 MODULE_LICENSE("GPL");
2964
2965 module_init(osc_init);
2966 module_exit(osc_exit);