Whamcloud - gitweb
LU-2049 grant: add support for OBD_CONNECT_GRANT_PARAM
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include <obd.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
60
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
64
65 struct osc_brw_async_args {
66         struct obdo              *aa_oa;
67         int                       aa_requested_nob;
68         int                       aa_nio_count;
69         u32                       aa_page_count;
70         int                       aa_resends;
71         struct brw_page **aa_ppga;
72         struct client_obd        *aa_cli;
73         struct list_head          aa_oaps;
74         struct list_head          aa_exts;
75 };
76
77 #define osc_grant_args osc_brw_async_args
78
79 struct osc_setattr_args {
80         struct obdo             *sa_oa;
81         obd_enqueue_update_f     sa_upcall;
82         void                    *sa_cookie;
83 };
84
85 struct osc_fsync_args {
86         struct osc_object       *fa_obj;
87         struct obdo             *fa_oa;
88         obd_enqueue_update_f    fa_upcall;
89         void                    *fa_cookie;
90 };
91
92 struct osc_enqueue_args {
93         struct obd_export       *oa_exp;
94         enum ldlm_type          oa_type;
95         enum ldlm_mode          oa_mode;
96         __u64                   *oa_flags;
97         osc_enqueue_upcall_f    oa_upcall;
98         void                    *oa_cookie;
99         struct ost_lvb          *oa_lvb;
100         struct lustre_handle    oa_lockh;
101         unsigned int            oa_agl:1;
102 };
103
104 static void osc_release_ppga(struct brw_page **ppga, size_t count);
105 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
106                          void *data, int rc);
107
108 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
109 {
110         struct ost_body *body;
111
112         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
113         LASSERT(body);
114
115         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
116 }
117
118 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
119                        struct obdo *oa)
120 {
121         struct ptlrpc_request   *req;
122         struct ost_body         *body;
123         int                      rc;
124
125         ENTRY;
126         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
127         if (req == NULL)
128                 RETURN(-ENOMEM);
129
130         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
131         if (rc) {
132                 ptlrpc_request_free(req);
133                 RETURN(rc);
134         }
135
136         osc_pack_req_body(req, oa);
137
138         ptlrpc_request_set_replen(req);
139
140         rc = ptlrpc_queue_wait(req);
141         if (rc)
142                 GOTO(out, rc);
143
144         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
145         if (body == NULL)
146                 GOTO(out, rc = -EPROTO);
147
148         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
149         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
150
151         oa->o_blksize = cli_brw_size(exp->exp_obd);
152         oa->o_valid |= OBD_MD_FLBLKSZ;
153
154         EXIT;
155 out:
156         ptlrpc_req_finished(req);
157
158         return rc;
159 }
160
161 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
162                        struct obdo *oa)
163 {
164         struct ptlrpc_request   *req;
165         struct ost_body         *body;
166         int                      rc;
167
168         ENTRY;
169         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
170
171         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
172         if (req == NULL)
173                 RETURN(-ENOMEM);
174
175         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
176         if (rc) {
177                 ptlrpc_request_free(req);
178                 RETURN(rc);
179         }
180
181         osc_pack_req_body(req, oa);
182
183         ptlrpc_request_set_replen(req);
184
185         rc = ptlrpc_queue_wait(req);
186         if (rc)
187                 GOTO(out, rc);
188
189         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
190         if (body == NULL)
191                 GOTO(out, rc = -EPROTO);
192
193         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
194
195         EXIT;
196 out:
197         ptlrpc_req_finished(req);
198
199         RETURN(rc);
200 }
201
202 static int osc_setattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_setattr_args *sa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body == NULL)
214                 GOTO(out, rc = -EPROTO);
215
216         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
217                              &body->oa);
218 out:
219         rc = sa->sa_upcall(sa->sa_cookie, rc);
220         RETURN(rc);
221 }
222
223 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
224                       obd_enqueue_update_f upcall, void *cookie,
225                       struct ptlrpc_request_set *rqset)
226 {
227         struct ptlrpc_request   *req;
228         struct osc_setattr_args *sa;
229         int                      rc;
230
231         ENTRY;
232
233         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
234         if (req == NULL)
235                 RETURN(-ENOMEM);
236
237         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
238         if (rc) {
239                 ptlrpc_request_free(req);
240                 RETURN(rc);
241         }
242
243         osc_pack_req_body(req, oa);
244
245         ptlrpc_request_set_replen(req);
246
247         /* do mds to ost setattr asynchronously */
248         if (!rqset) {
249                 /* Do not wait for response. */
250                 ptlrpcd_add_req(req);
251         } else {
252                 req->rq_interpret_reply =
253                         (ptlrpc_interpterer_t)osc_setattr_interpret;
254
255                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
256                 sa = ptlrpc_req_async_args(req);
257                 sa->sa_oa = oa;
258                 sa->sa_upcall = upcall;
259                 sa->sa_cookie = cookie;
260
261                 if (rqset == PTLRPCD_SET)
262                         ptlrpcd_add_req(req);
263                 else
264                         ptlrpc_set_add_req(rqset, req);
265         }
266
267         RETURN(0);
268 }
269
270 static int osc_create(const struct lu_env *env, struct obd_export *exp,
271                       struct obdo *oa)
272 {
273         struct ptlrpc_request *req;
274         struct ost_body       *body;
275         int                    rc;
276         ENTRY;
277
278         LASSERT(oa != NULL);
279         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
280         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
283         if (req == NULL)
284                 GOTO(out, rc = -ENOMEM);
285
286         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
287         if (rc) {
288                 ptlrpc_request_free(req);
289                 GOTO(out, rc);
290         }
291
292         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
293         LASSERT(body);
294
295         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
296
297         ptlrpc_request_set_replen(req);
298
299         rc = ptlrpc_queue_wait(req);
300         if (rc)
301                 GOTO(out_req, rc);
302
303         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
304         if (body == NULL)
305                 GOTO(out_req, rc = -EPROTO);
306
307         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
308         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
309
310         oa->o_blksize = cli_brw_size(exp->exp_obd);
311         oa->o_valid |= OBD_MD_FLBLKSZ;
312
313         CDEBUG(D_HA, "transno: "LPD64"\n",
314                lustre_msg_get_transno(req->rq_repmsg));
315 out_req:
316         ptlrpc_req_finished(req);
317 out:
318         RETURN(rc);
319 }
320
321 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
322                    obd_enqueue_update_f upcall, void *cookie,
323                    struct ptlrpc_request_set *rqset)
324 {
325         struct ptlrpc_request   *req;
326         struct osc_setattr_args *sa;
327         struct ost_body         *body;
328         int                      rc;
329         ENTRY;
330
331         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
332         if (req == NULL)
333                 RETURN(-ENOMEM);
334
335         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
336         if (rc) {
337                 ptlrpc_request_free(req);
338                 RETURN(rc);
339         }
340         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
341         ptlrpc_at_set_req_timeout(req);
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
346
347         ptlrpc_request_set_replen(req);
348
349         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
350         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
351         sa = ptlrpc_req_async_args(req);
352         sa->sa_oa = oa;
353         sa->sa_upcall = upcall;
354         sa->sa_cookie = cookie;
355         if (rqset == PTLRPCD_SET)
356                 ptlrpcd_add_req(req);
357         else
358                 ptlrpc_set_add_req(rqset, req);
359
360         RETURN(0);
361 }
362
363 static int osc_sync_interpret(const struct lu_env *env,
364                               struct ptlrpc_request *req,
365                               void *arg, int rc)
366 {
367         struct osc_fsync_args   *fa = arg;
368         struct ost_body         *body;
369         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
370         unsigned long           valid = 0;
371         struct cl_object        *obj;
372         ENTRY;
373
374         if (rc != 0)
375                 GOTO(out, rc);
376
377         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
378         if (body == NULL) {
379                 CERROR("can't unpack ost_body\n");
380                 GOTO(out, rc = -EPROTO);
381         }
382
383         *fa->fa_oa = body->oa;
384         obj = osc2cl(fa->fa_obj);
385
386         /* Update osc object's blocks attribute */
387         cl_object_attr_lock(obj);
388         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
389                 attr->cat_blocks = body->oa.o_blocks;
390                 valid |= CAT_BLOCKS;
391         }
392
393         if (valid != 0)
394                 cl_object_attr_update(env, obj, attr, valid);
395         cl_object_attr_unlock(obj);
396
397 out:
398         rc = fa->fa_upcall(fa->fa_cookie, rc);
399         RETURN(rc);
400 }
401
402 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
403                   obd_enqueue_update_f upcall, void *cookie,
404                   struct ptlrpc_request_set *rqset)
405 {
406         struct obd_export     *exp = osc_export(obj);
407         struct ptlrpc_request *req;
408         struct ost_body       *body;
409         struct osc_fsync_args *fa;
410         int                    rc;
411         ENTRY;
412
413         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
414         if (req == NULL)
415                 RETURN(-ENOMEM);
416
417         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(rc);
421         }
422
423         /* overload the size and blocks fields in the oa with start/end */
424         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
425         LASSERT(body);
426         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
427
428         ptlrpc_request_set_replen(req);
429         req->rq_interpret_reply = osc_sync_interpret;
430
431         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
432         fa = ptlrpc_req_async_args(req);
433         fa->fa_obj = obj;
434         fa->fa_oa = oa;
435         fa->fa_upcall = upcall;
436         fa->fa_cookie = cookie;
437
438         if (rqset == PTLRPCD_SET)
439                 ptlrpcd_add_req(req);
440         else
441                 ptlrpc_set_add_req(rqset, req);
442
443         RETURN (0);
444 }
445
446 /* Find and cancel locally locks matched by @mode in the resource found by
447  * @objid. Found locks are added into @cancel list. Returns the amount of
448  * locks added to @cancels list. */
449 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
450                                    struct list_head *cancels,
451                                    enum ldlm_mode mode, __u64 lock_flags)
452 {
453         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
454         struct ldlm_res_id res_id;
455         struct ldlm_resource *res;
456         int count;
457         ENTRY;
458
459         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
460          * export) but disabled through procfs (flag in NS).
461          *
462          * This distinguishes from a case when ELC is not supported originally,
463          * when we still want to cancel locks in advance and just cancel them
464          * locally, without sending any RPC. */
465         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
466                 RETURN(0);
467
468         ostid_build_res_name(&oa->o_oi, &res_id);
469         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
470         if (IS_ERR(res))
471                 RETURN(0);
472
473         LDLM_RESOURCE_ADDREF(res);
474         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
475                                            lock_flags, 0, NULL);
476         LDLM_RESOURCE_DELREF(res);
477         ldlm_resource_putref(res);
478         RETURN(count);
479 }
480
481 static int osc_destroy_interpret(const struct lu_env *env,
482                                  struct ptlrpc_request *req, void *data,
483                                  int rc)
484 {
485         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
486
487         atomic_dec(&cli->cl_destroy_in_flight);
488         wake_up(&cli->cl_destroy_waitq);
489         return 0;
490 }
491
492 static int osc_can_send_destroy(struct client_obd *cli)
493 {
494         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
495             cli->cl_max_rpcs_in_flight) {
496                 /* The destroy request can be sent */
497                 return 1;
498         }
499         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
500             cli->cl_max_rpcs_in_flight) {
501                 /*
502                  * The counter has been modified between the two atomic
503                  * operations.
504                  */
505                 wake_up(&cli->cl_destroy_waitq);
506         }
507         return 0;
508 }
509
510 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
511                        struct obdo *oa)
512 {
513         struct client_obd     *cli = &exp->exp_obd->u.cli;
514         struct ptlrpc_request *req;
515         struct ost_body       *body;
516         struct list_head       cancels = LIST_HEAD_INIT(cancels);
517         int rc, count;
518         ENTRY;
519
520         if (!oa) {
521                 CDEBUG(D_INFO, "oa NULL\n");
522                 RETURN(-EINVAL);
523         }
524
525         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
526                                         LDLM_FL_DISCARD_DATA);
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
529         if (req == NULL) {
530                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
531                 RETURN(-ENOMEM);
532         }
533
534         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
535                                0, &cancels, count);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 RETURN(rc);
539         }
540
541         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542         ptlrpc_at_set_req_timeout(req);
543
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547
548         ptlrpc_request_set_replen(req);
549
550         req->rq_interpret_reply = osc_destroy_interpret;
551         if (!osc_can_send_destroy(cli)) {
552                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
553
554                 /*
555                  * Wait until the number of on-going destroy RPCs drops
556                  * under max_rpc_in_flight
557                  */
558                 l_wait_event_exclusive(cli->cl_destroy_waitq,
559                                        osc_can_send_destroy(cli), &lwi);
560         }
561
562         /* Do not wait for response */
563         ptlrpcd_add_req(req);
564         RETURN(0);
565 }
566
567 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
568                                 long writing_bytes)
569 {
570         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
571
572         LASSERT(!(oa->o_valid & bits));
573
574         oa->o_valid |= bits;
575         spin_lock(&cli->cl_loi_list_lock);
576         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
577                 oa->o_dirty = cli->cl_dirty_grant;
578         else
579                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
580         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
581                      cli->cl_dirty_max_pages)) {
582                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
583                        cli->cl_dirty_pages, cli->cl_dirty_transit,
584                        cli->cl_dirty_max_pages);
585                 oa->o_undirty = 0;
586         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
587                             atomic_long_read(&obd_dirty_transit_pages) >
588                             (long)(obd_max_dirty_pages + 1))) {
589                 /* The atomic_read() allowing the atomic_inc() are
590                  * not covered by a lock thus they may safely race and trip
591                  * this CERROR() unless we add in a small fudge factor (+1). */
592                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
593                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
594                        atomic_long_read(&obd_dirty_transit_pages),
595                        obd_max_dirty_pages);
596                 oa->o_undirty = 0;
597         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
598                             0x7fffffff)) {
599                 CERROR("dirty %lu - dirty_max %lu too big???\n",
600                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
601                 oa->o_undirty = 0;
602         } else {
603                 unsigned long nrpages;
604
605                 nrpages = cli->cl_max_pages_per_rpc;
606                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
607                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
608                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
609                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
610                                  GRANT_PARAM)) {
611                         int nrextents;
612
613                         /* take extent tax into account when asking for more
614                          * grant space */
615                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
616                                      cli->cl_max_extent_pages;
617                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
618                 }
619         }
620         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
621         oa->o_dropped = cli->cl_lost_grant;
622         cli->cl_lost_grant = 0;
623         spin_unlock(&cli->cl_loi_list_lock);
624         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
625                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
626 }
627
628 void osc_update_next_shrink(struct client_obd *cli)
629 {
630         cli->cl_next_shrink_grant =
631                 cfs_time_shift(cli->cl_grant_shrink_interval);
632         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
633                cli->cl_next_shrink_grant);
634 }
635
636 static void __osc_update_grant(struct client_obd *cli, u64 grant)
637 {
638         spin_lock(&cli->cl_loi_list_lock);
639         cli->cl_avail_grant += grant;
640         spin_unlock(&cli->cl_loi_list_lock);
641 }
642
643 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
644 {
645         if (body->oa.o_valid & OBD_MD_FLGRANT) {
646                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
647                 __osc_update_grant(cli, body->oa.o_grant);
648         }
649 }
650
651 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
652                               u32 keylen, void *key,
653                               u32 vallen, void *val,
654                               struct ptlrpc_request_set *set);
655
656 static int osc_shrink_grant_interpret(const struct lu_env *env,
657                                       struct ptlrpc_request *req,
658                                       void *aa, int rc)
659 {
660         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
662         struct ost_body *body;
663
664         if (rc != 0) {
665                 __osc_update_grant(cli, oa->o_grant);
666                 GOTO(out, rc);
667         }
668
669         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
670         LASSERT(body);
671         osc_update_grant(cli, body);
672 out:
673         OBDO_FREE(oa);
674         return rc;
675 }
676
677 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
678 {
679         spin_lock(&cli->cl_loi_list_lock);
680         oa->o_grant = cli->cl_avail_grant / 4;
681         cli->cl_avail_grant -= oa->o_grant;
682         spin_unlock(&cli->cl_loi_list_lock);
683         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
684                 oa->o_valid |= OBD_MD_FLFLAGS;
685                 oa->o_flags = 0;
686         }
687         oa->o_flags |= OBD_FL_SHRINK_GRANT;
688         osc_update_next_shrink(cli);
689 }
690
691 /* Shrink the current grant, either from some large amount to enough for a
692  * full set of in-flight RPCs, or if we have already shrunk to that limit
693  * then to enough for a single RPC.  This avoids keeping more grant than
694  * needed, and avoids shrinking the grant piecemeal. */
695 static int osc_shrink_grant(struct client_obd *cli)
696 {
697         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
698                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
699
700         spin_lock(&cli->cl_loi_list_lock);
701         if (cli->cl_avail_grant <= target_bytes)
702                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
703         spin_unlock(&cli->cl_loi_list_lock);
704
705         return osc_shrink_grant_to_target(cli, target_bytes);
706 }
707
708 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
709 {
710         int                     rc = 0;
711         struct ost_body        *body;
712         ENTRY;
713
714         spin_lock(&cli->cl_loi_list_lock);
715         /* Don't shrink if we are already above or below the desired limit
716          * We don't want to shrink below a single RPC, as that will negatively
717          * impact block allocation and long-term performance. */
718         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
719                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
720
721         if (target_bytes >= cli->cl_avail_grant) {
722                 spin_unlock(&cli->cl_loi_list_lock);
723                 RETURN(0);
724         }
725         spin_unlock(&cli->cl_loi_list_lock);
726
727         OBD_ALLOC_PTR(body);
728         if (!body)
729                 RETURN(-ENOMEM);
730
731         osc_announce_cached(cli, &body->oa, 0);
732
733         spin_lock(&cli->cl_loi_list_lock);
734         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
735         cli->cl_avail_grant = target_bytes;
736         spin_unlock(&cli->cl_loi_list_lock);
737         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
738                 body->oa.o_valid |= OBD_MD_FLFLAGS;
739                 body->oa.o_flags = 0;
740         }
741         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
742         osc_update_next_shrink(cli);
743
744         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
745                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
746                                 sizeof(*body), body, NULL);
747         if (rc != 0)
748                 __osc_update_grant(cli, body->oa.o_grant);
749         OBD_FREE_PTR(body);
750         RETURN(rc);
751 }
752
753 static int osc_should_shrink_grant(struct client_obd *client)
754 {
755         cfs_time_t time = cfs_time_current();
756         cfs_time_t next_shrink = client->cl_next_shrink_grant;
757
758         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
759              OBD_CONNECT_GRANT_SHRINK) == 0)
760                 return 0;
761
762         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
763                 /* Get the current RPC size directly, instead of going via:
764                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
765                  * Keep comment here so that it can be found by searching. */
766                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767
768                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
769                     client->cl_avail_grant > brw_size)
770                         return 1;
771                 else
772                         osc_update_next_shrink(client);
773         }
774         return 0;
775 }
776
777 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
778 {
779         struct client_obd *client;
780
781         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
782                 if (osc_should_shrink_grant(client))
783                         osc_shrink_grant(client);
784         }
785         return 0;
786 }
787
788 static int osc_add_shrink_grant(struct client_obd *client)
789 {
790         int rc;
791
792         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
793                                        TIMEOUT_GRANT,
794                                        osc_grant_shrink_grant_cb, NULL,
795                                        &client->cl_grant_shrink_list);
796         if (rc) {
797                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
798                 return rc;
799         }
800         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
801         osc_update_next_shrink(client);
802         return 0;
803 }
804
805 static int osc_del_shrink_grant(struct client_obd *client)
806 {
807         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
808                                          TIMEOUT_GRANT);
809 }
810
811 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
812 {
813         /*
814          * ocd_grant is the total grant amount we're expect to hold: if we've
815          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
816          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
817          * dirty.
818          *
819          * race is tolerable here: if we're evicted, but imp_state already
820          * left EVICTED state, then cl_dirty_pages must be 0 already.
821          */
822         spin_lock(&cli->cl_loi_list_lock);
823         cli->cl_avail_grant = ocd->ocd_grant;
824         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
825                 cli->cl_avail_grant -= cli->cl_reserved_grant;
826                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
827                         cli->cl_avail_grant -= cli->cl_dirty_grant;
828                 else
829                         cli->cl_avail_grant -=
830                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
831         }
832
833         if (cli->cl_avail_grant < 0) {
834                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
835                       cli_name(cli), cli->cl_avail_grant,
836                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
837                 /* workaround for servers which do not have the patch from
838                  * LU-2679 */
839                 cli->cl_avail_grant = ocd->ocd_grant;
840         }
841
842         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
843                 u64 size;
844
845                 /* overhead for each extent insertion */
846                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
847                 /* determine the appropriate chunk size used by osc_extent. */
848                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
849                                           ocd->ocd_grant_blkbits);
850                 /* determine maximum extent size, in #pages */
851                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
852                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
853                 if (cli->cl_max_extent_pages == 0)
854                         cli->cl_max_extent_pages = 1;
855         } else {
856                 cli->cl_grant_extent_tax = 0;
857                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
858                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
859         }
860         spin_unlock(&cli->cl_loi_list_lock);
861
862         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
863                 "chunk bits: %d cl_max_extent_pages: %d\n",
864                 cli_name(cli),
865                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
866                 cli->cl_max_extent_pages);
867
868         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
869             list_empty(&cli->cl_grant_shrink_list))
870                 osc_add_shrink_grant(cli);
871 }
872
873 /* We assume that the reason this OSC got a short read is because it read
874  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
875  * via the LOV, and it _knows_ it's reading inside the file, it's just that
876  * this stripe never got written at or beyond this stripe offset yet. */
877 static void handle_short_read(int nob_read, size_t page_count,
878                               struct brw_page **pga)
879 {
880         char *ptr;
881         int i = 0;
882
883         /* skip bytes read OK */
884         while (nob_read > 0) {
885                 LASSERT (page_count > 0);
886
887                 if (pga[i]->count > nob_read) {
888                         /* EOF inside this page */
889                         ptr = kmap(pga[i]->pg) +
890                                 (pga[i]->off & ~PAGE_MASK);
891                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
892                         kunmap(pga[i]->pg);
893                         page_count--;
894                         i++;
895                         break;
896                 }
897
898                 nob_read -= pga[i]->count;
899                 page_count--;
900                 i++;
901         }
902
903         /* zero remaining pages */
904         while (page_count-- > 0) {
905                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
906                 memset(ptr, 0, pga[i]->count);
907                 kunmap(pga[i]->pg);
908                 i++;
909         }
910 }
911
912 static int check_write_rcs(struct ptlrpc_request *req,
913                            int requested_nob, int niocount,
914                            size_t page_count, struct brw_page **pga)
915 {
916         int     i;
917         __u32   *remote_rcs;
918
919         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
920                                                   sizeof(*remote_rcs) *
921                                                   niocount);
922         if (remote_rcs == NULL) {
923                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
924                 return(-EPROTO);
925         }
926
927         /* return error if any niobuf was in error */
928         for (i = 0; i < niocount; i++) {
929                 if ((int)remote_rcs[i] < 0)
930                         return(remote_rcs[i]);
931
932                 if (remote_rcs[i] != 0) {
933                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
934                                 i, remote_rcs[i], req);
935                         return(-EPROTO);
936                 }
937         }
938
939         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
940                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
941                        req->rq_bulk->bd_nob_transferred, requested_nob);
942                 return(-EPROTO);
943         }
944
945         return (0);
946 }
947
948 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
949 {
950         if (p1->flag != p2->flag) {
951                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
952                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
953                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
954
955                 /* warn if we try to combine flags that we don't know to be
956                  * safe to combine */
957                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
958                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
959                               "report this at https://jira.hpdd.intel.com/\n",
960                               p1->flag, p2->flag);
961                 }
962                 return 0;
963         }
964
965         return (p1->off + p1->count == p2->off);
966 }
967
968 static u32 osc_checksum_bulk(int nob, size_t pg_count,
969                              struct brw_page **pga, int opc,
970                              cksum_type_t cksum_type)
971 {
972         u32                             cksum;
973         int                             i = 0;
974         struct cfs_crypto_hash_desc     *hdesc;
975         unsigned int                    bufsize;
976         int                             err;
977         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
978
979         LASSERT(pg_count > 0);
980
981         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
982         if (IS_ERR(hdesc)) {
983                 CERROR("Unable to initialize checksum hash %s\n",
984                        cfs_crypto_hash_name(cfs_alg));
985                 return PTR_ERR(hdesc);
986         }
987
988         while (nob > 0 && pg_count > 0) {
989                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
990
991                 /* corrupt the data before we compute the checksum, to
992                  * simulate an OST->client data error */
993                 if (i == 0 && opc == OST_READ &&
994                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
995                         unsigned char *ptr = kmap(pga[i]->pg);
996                         int off = pga[i]->off & ~PAGE_MASK;
997
998                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
999                         kunmap(pga[i]->pg);
1000                 }
1001                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1002                                             pga[i]->off & ~PAGE_MASK,
1003                                             count);
1004                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1005                                (int)(pga[i]->off & ~PAGE_MASK));
1006
1007                 nob -= pga[i]->count;
1008                 pg_count--;
1009                 i++;
1010         }
1011
1012         bufsize = sizeof(cksum);
1013         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1014
1015         /* For sending we only compute the wrong checksum instead
1016          * of corrupting the data so it is still correct on a redo */
1017         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1018                 cksum++;
1019
1020         return cksum;
1021 }
1022
1023 static int
1024 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1025                      u32 page_count, struct brw_page **pga,
1026                      struct ptlrpc_request **reqp, int resend)
1027 {
1028         struct ptlrpc_request   *req;
1029         struct ptlrpc_bulk_desc *desc;
1030         struct ost_body         *body;
1031         struct obd_ioobj        *ioobj;
1032         struct niobuf_remote    *niobuf;
1033         int niocount, i, requested_nob, opc, rc;
1034         struct osc_brw_async_args *aa;
1035         struct req_capsule      *pill;
1036         struct brw_page *pg_prev;
1037
1038         ENTRY;
1039         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1040                 RETURN(-ENOMEM); /* Recoverable */
1041         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1042                 RETURN(-EINVAL); /* Fatal */
1043
1044         if ((cmd & OBD_BRW_WRITE) != 0) {
1045                 opc = OST_WRITE;
1046                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1047                                                 osc_rq_pool,
1048                                                 &RQF_OST_BRW_WRITE);
1049         } else {
1050                 opc = OST_READ;
1051                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1052         }
1053         if (req == NULL)
1054                 RETURN(-ENOMEM);
1055
1056         for (niocount = i = 1; i < page_count; i++) {
1057                 if (!can_merge_pages(pga[i - 1], pga[i]))
1058                         niocount++;
1059         }
1060
1061         pill = &req->rq_pill;
1062         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1063                              sizeof(*ioobj));
1064         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1065                              niocount * sizeof(*niobuf));
1066
1067         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1068         if (rc) {
1069                 ptlrpc_request_free(req);
1070                 RETURN(rc);
1071         }
1072         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1073         ptlrpc_at_set_req_timeout(req);
1074         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1075          * retry logic */
1076         req->rq_no_retry_einprogress = 1;
1077
1078         desc = ptlrpc_prep_bulk_imp(req, page_count,
1079                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1080                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1081                         PTLRPC_BULK_PUT_SINK) |
1082                         PTLRPC_BULK_BUF_KIOV,
1083                 OST_BULK_PORTAL,
1084                 &ptlrpc_bulk_kiov_pin_ops);
1085
1086         if (desc == NULL)
1087                 GOTO(out, rc = -ENOMEM);
1088         /* NB request now owns desc and will free it when it gets freed */
1089
1090         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1091         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1092         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1093         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1094
1095         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1096
1097         obdo_to_ioobj(oa, ioobj);
1098         ioobj->ioo_bufcnt = niocount;
1099         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1100          * that might be send for this request.  The actual number is decided
1101          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1102          * "max - 1" for old client compatibility sending "0", and also so the
1103          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1104         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1105         LASSERT(page_count > 0);
1106         pg_prev = pga[0];
1107         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1108                 struct brw_page *pg = pga[i];
1109                 int poff = pg->off & ~PAGE_MASK;
1110
1111                 LASSERT(pg->count > 0);
1112                 /* make sure there is no gap in the middle of page array */
1113                 LASSERTF(page_count == 1 ||
1114                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1115                           ergo(i > 0 && i < page_count - 1,
1116                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1117                           ergo(i == page_count - 1, poff == 0)),
1118                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1119                          i, page_count, pg, pg->off, pg->count);
1120                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1121                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1122                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1123                          i, page_count,
1124                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1125                          pg_prev->pg, page_private(pg_prev->pg),
1126                          pg_prev->pg->index, pg_prev->off);
1127                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1128                         (pg->flag & OBD_BRW_SRVLOCK));
1129
1130                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1131                 requested_nob += pg->count;
1132
1133                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1134                         niobuf--;
1135                         niobuf->rnb_len += pg->count;
1136                 } else {
1137                         niobuf->rnb_offset = pg->off;
1138                         niobuf->rnb_len    = pg->count;
1139                         niobuf->rnb_flags  = pg->flag;
1140                 }
1141                 pg_prev = pg;
1142         }
1143
1144         LASSERTF((void *)(niobuf - niocount) ==
1145                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1146                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1147                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1148
1149         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1150         if (resend) {
1151                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1152                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1153                         body->oa.o_flags = 0;
1154                 }
1155                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1156         }
1157
1158         if (osc_should_shrink_grant(cli))
1159                 osc_shrink_grant_local(cli, &body->oa);
1160
1161         /* size[REQ_REC_OFF] still sizeof (*body) */
1162         if (opc == OST_WRITE) {
1163                 if (cli->cl_checksum &&
1164                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1165                         /* store cl_cksum_type in a local variable since
1166                          * it can be changed via lprocfs */
1167                         cksum_type_t cksum_type = cli->cl_cksum_type;
1168
1169                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1170                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1171                                 body->oa.o_flags = 0;
1172                         }
1173                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1174                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1175                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1176                                                              page_count, pga,
1177                                                              OST_WRITE,
1178                                                              cksum_type);
1179                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1180                                body->oa.o_cksum);
1181                         /* save this in 'oa', too, for later checking */
1182                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1183                         oa->o_flags |= cksum_type_pack(cksum_type);
1184                 } else {
1185                         /* clear out the checksum flag, in case this is a
1186                          * resend but cl_checksum is no longer set. b=11238 */
1187                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1188                 }
1189                 oa->o_cksum = body->oa.o_cksum;
1190                 /* 1 RC per niobuf */
1191                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1192                                      sizeof(__u32) * niocount);
1193         } else {
1194                 if (cli->cl_checksum &&
1195                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1196                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1197                                 body->oa.o_flags = 0;
1198                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1199                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1200                 }
1201         }
1202         ptlrpc_request_set_replen(req);
1203
1204         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1205         aa = ptlrpc_req_async_args(req);
1206         aa->aa_oa = oa;
1207         aa->aa_requested_nob = requested_nob;
1208         aa->aa_nio_count = niocount;
1209         aa->aa_page_count = page_count;
1210         aa->aa_resends = 0;
1211         aa->aa_ppga = pga;
1212         aa->aa_cli = cli;
1213         INIT_LIST_HEAD(&aa->aa_oaps);
1214
1215         *reqp = req;
1216         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1217         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1218                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1219                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1220         RETURN(0);
1221
1222  out:
1223         ptlrpc_req_finished(req);
1224         RETURN(rc);
1225 }
1226
1227 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1228                                 __u32 client_cksum, __u32 server_cksum, int nob,
1229                                 size_t page_count, struct brw_page **pga,
1230                                 cksum_type_t client_cksum_type)
1231 {
1232         __u32 new_cksum;
1233         char *msg;
1234         cksum_type_t cksum_type;
1235
1236         if (server_cksum == client_cksum) {
1237                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1238                 return 0;
1239         }
1240
1241         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1242                                        oa->o_flags : 0);
1243         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1244                                       cksum_type);
1245
1246         if (cksum_type != client_cksum_type)
1247                 msg = "the server did not use the checksum type specified in "
1248                       "the original request - likely a protocol problem";
1249         else if (new_cksum == server_cksum)
1250                 msg = "changed on the client after we checksummed it - "
1251                       "likely false positive due to mmap IO (bug 11742)";
1252         else if (new_cksum == client_cksum)
1253                 msg = "changed in transit before arrival at OST";
1254         else
1255                 msg = "changed in transit AND doesn't match the original - "
1256                       "likely false positive due to mmap IO (bug 11742)";
1257
1258         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1259                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1260                            msg, libcfs_nid2str(peer->nid),
1261                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1262                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1263                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1264                            POSTID(&oa->o_oi), pga[0]->off,
1265                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1266         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1267                "client csum now %x\n", client_cksum, client_cksum_type,
1268                server_cksum, cksum_type, new_cksum);
1269         return 1;
1270 }
1271
1272 /* Note rc enters this function as number of bytes transferred */
1273 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1274 {
1275         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1276         const lnet_process_id_t *peer =
1277                         &req->rq_import->imp_connection->c_peer;
1278         struct client_obd *cli = aa->aa_cli;
1279         struct ost_body *body;
1280         u32 client_cksum = 0;
1281         ENTRY;
1282
1283         if (rc < 0 && rc != -EDQUOT) {
1284                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1285                 RETURN(rc);
1286         }
1287
1288         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1290         if (body == NULL) {
1291                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1292                 RETURN(-EPROTO);
1293         }
1294
1295         /* set/clear over quota flag for a uid/gid */
1296         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1297             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1298                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1299
1300                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1301                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1302                        body->oa.o_flags);
1303                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1304         }
1305
1306         osc_update_grant(cli, body);
1307
1308         if (rc < 0)
1309                 RETURN(rc);
1310
1311         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1312                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1313
1314         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1315                 if (rc > 0) {
1316                         CERROR("Unexpected +ve rc %d\n", rc);
1317                         RETURN(-EPROTO);
1318                 }
1319                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1320
1321                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1322                         RETURN(-EAGAIN);
1323
1324                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1325                     check_write_checksum(&body->oa, peer, client_cksum,
1326                                          body->oa.o_cksum, aa->aa_requested_nob,
1327                                          aa->aa_page_count, aa->aa_ppga,
1328                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1329                         RETURN(-EAGAIN);
1330
1331                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1332                                      aa->aa_page_count, aa->aa_ppga);
1333                 GOTO(out, rc);
1334         }
1335
1336         /* The rest of this function executes only for OST_READs */
1337
1338         /* if unwrap_bulk failed, return -EAGAIN to retry */
1339         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1340         if (rc < 0)
1341                 GOTO(out, rc = -EAGAIN);
1342
1343         if (rc > aa->aa_requested_nob) {
1344                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1345                        aa->aa_requested_nob);
1346                 RETURN(-EPROTO);
1347         }
1348
1349         if (rc != req->rq_bulk->bd_nob_transferred) {
1350                 CERROR ("Unexpected rc %d (%d transferred)\n",
1351                         rc, req->rq_bulk->bd_nob_transferred);
1352                 return (-EPROTO);
1353         }
1354
1355         if (rc < aa->aa_requested_nob)
1356                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1357
1358         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1359                 static int cksum_counter;
1360                 u32        server_cksum = body->oa.o_cksum;
1361                 char      *via = "";
1362                 char      *router = "";
1363                 cksum_type_t cksum_type;
1364
1365                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1366                                                body->oa.o_flags : 0);
1367                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1368                                                  aa->aa_ppga, OST_READ,
1369                                                  cksum_type);
1370
1371                 if (peer->nid != req->rq_bulk->bd_sender) {
1372                         via = " via ";
1373                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1374                 }
1375
1376                 if (server_cksum != client_cksum) {
1377                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1378                                            "%s%s%s inode "DFID" object "DOSTID
1379                                            " extent ["LPU64"-"LPU64"]\n",
1380                                            req->rq_import->imp_obd->obd_name,
1381                                            libcfs_nid2str(peer->nid),
1382                                            via, router,
1383                                            body->oa.o_valid & OBD_MD_FLFID ?
1384                                                 body->oa.o_parent_seq : (__u64)0,
1385                                            body->oa.o_valid & OBD_MD_FLFID ?
1386                                                 body->oa.o_parent_oid : 0,
1387                                            body->oa.o_valid & OBD_MD_FLFID ?
1388                                                 body->oa.o_parent_ver : 0,
1389                                            POSTID(&body->oa.o_oi),
1390                                            aa->aa_ppga[0]->off,
1391                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1392                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1393                                                                         1);
1394                         CERROR("client %x, server %x, cksum_type %x\n",
1395                                client_cksum, server_cksum, cksum_type);
1396                         cksum_counter = 0;
1397                         aa->aa_oa->o_cksum = client_cksum;
1398                         rc = -EAGAIN;
1399                 } else {
1400                         cksum_counter++;
1401                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1402                         rc = 0;
1403                 }
1404         } else if (unlikely(client_cksum)) {
1405                 static int cksum_missed;
1406
1407                 cksum_missed++;
1408                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1409                         CERROR("Checksum %u requested from %s but not sent\n",
1410                                cksum_missed, libcfs_nid2str(peer->nid));
1411         } else {
1412                 rc = 0;
1413         }
1414 out:
1415         if (rc >= 0)
1416                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1417                                      aa->aa_oa, &body->oa);
1418
1419         RETURN(rc);
1420 }
1421
1422 static int osc_brw_redo_request(struct ptlrpc_request *request,
1423                                 struct osc_brw_async_args *aa, int rc)
1424 {
1425         struct ptlrpc_request *new_req;
1426         struct osc_brw_async_args *new_aa;
1427         struct osc_async_page *oap;
1428         ENTRY;
1429
1430         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1431                   "redo for recoverable error %d", rc);
1432
1433         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1434                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1435                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1436                                   aa->aa_ppga, &new_req, 1);
1437         if (rc)
1438                 RETURN(rc);
1439
1440         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1441                 if (oap->oap_request != NULL) {
1442                         LASSERTF(request == oap->oap_request,
1443                                  "request %p != oap_request %p\n",
1444                                  request, oap->oap_request);
1445                         if (oap->oap_interrupted) {
1446                                 ptlrpc_req_finished(new_req);
1447                                 RETURN(-EINTR);
1448                         }
1449                 }
1450         }
1451         /* New request takes over pga and oaps from old request.
1452          * Note that copying a list_head doesn't work, need to move it... */
1453         aa->aa_resends++;
1454         new_req->rq_interpret_reply = request->rq_interpret_reply;
1455         new_req->rq_async_args = request->rq_async_args;
1456         new_req->rq_commit_cb = request->rq_commit_cb;
1457         /* cap resend delay to the current request timeout, this is similar to
1458          * what ptlrpc does (see after_reply()) */
1459         if (aa->aa_resends > new_req->rq_timeout)
1460                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1461         else
1462                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1463         new_req->rq_generation_set = 1;
1464         new_req->rq_import_generation = request->rq_import_generation;
1465
1466         new_aa = ptlrpc_req_async_args(new_req);
1467
1468         INIT_LIST_HEAD(&new_aa->aa_oaps);
1469         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1470         INIT_LIST_HEAD(&new_aa->aa_exts);
1471         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1472         new_aa->aa_resends = aa->aa_resends;
1473
1474         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1475                 if (oap->oap_request) {
1476                         ptlrpc_req_finished(oap->oap_request);
1477                         oap->oap_request = ptlrpc_request_addref(new_req);
1478                 }
1479         }
1480
1481         /* XXX: This code will run into problem if we're going to support
1482          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1483          * and wait for all of them to be finished. We should inherit request
1484          * set from old request. */
1485         ptlrpcd_add_req(new_req);
1486
1487         DEBUG_REQ(D_INFO, new_req, "new request");
1488         RETURN(0);
1489 }
1490
1491 /*
1492  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1493  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1494  * fine for our small page arrays and doesn't require allocation.  its an
1495  * insertion sort that swaps elements that are strides apart, shrinking the
1496  * stride down until its '1' and the array is sorted.
1497  */
1498 static void sort_brw_pages(struct brw_page **array, int num)
1499 {
1500         int stride, i, j;
1501         struct brw_page *tmp;
1502
1503         if (num == 1)
1504                 return;
1505         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1506                 ;
1507
1508         do {
1509                 stride /= 3;
1510                 for (i = stride ; i < num ; i++) {
1511                         tmp = array[i];
1512                         j = i;
1513                         while (j >= stride && array[j - stride]->off > tmp->off) {
1514                                 array[j] = array[j - stride];
1515                                 j -= stride;
1516                         }
1517                         array[j] = tmp;
1518                 }
1519         } while (stride > 1);
1520 }
1521
1522 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1523 {
1524         LASSERT(ppga != NULL);
1525         OBD_FREE(ppga, sizeof(*ppga) * count);
1526 }
1527
1528 static int brw_interpret(const struct lu_env *env,
1529                          struct ptlrpc_request *req, void *data, int rc)
1530 {
1531         struct osc_brw_async_args *aa = data;
1532         struct osc_extent *ext;
1533         struct osc_extent *tmp;
1534         struct client_obd *cli = aa->aa_cli;
1535         ENTRY;
1536
1537         rc = osc_brw_fini_request(req, rc);
1538         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1539         /* When server return -EINPROGRESS, client should always retry
1540          * regardless of the number of times the bulk was resent already. */
1541         if (osc_recoverable_error(rc)) {
1542                 if (req->rq_import_generation !=
1543                     req->rq_import->imp_generation) {
1544                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1545                                ""DOSTID", rc = %d.\n",
1546                                req->rq_import->imp_obd->obd_name,
1547                                POSTID(&aa->aa_oa->o_oi), rc);
1548                 } else if (rc == -EINPROGRESS ||
1549                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1550                         rc = osc_brw_redo_request(req, aa, rc);
1551                 } else {
1552                         CERROR("%s: too many resent retries for object: "
1553                                ""LPU64":"LPU64", rc = %d.\n",
1554                                req->rq_import->imp_obd->obd_name,
1555                                POSTID(&aa->aa_oa->o_oi), rc);
1556                 }
1557
1558                 if (rc == 0)
1559                         RETURN(0);
1560                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1561                         rc = -EIO;
1562         }
1563
1564         if (rc == 0) {
1565                 struct obdo *oa = aa->aa_oa;
1566                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1567                 unsigned long valid = 0;
1568                 struct cl_object *obj;
1569                 struct osc_async_page *last;
1570
1571                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1572                 obj = osc2cl(last->oap_obj);
1573
1574                 cl_object_attr_lock(obj);
1575                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1576                         attr->cat_blocks = oa->o_blocks;
1577                         valid |= CAT_BLOCKS;
1578                 }
1579                 if (oa->o_valid & OBD_MD_FLMTIME) {
1580                         attr->cat_mtime = oa->o_mtime;
1581                         valid |= CAT_MTIME;
1582                 }
1583                 if (oa->o_valid & OBD_MD_FLATIME) {
1584                         attr->cat_atime = oa->o_atime;
1585                         valid |= CAT_ATIME;
1586                 }
1587                 if (oa->o_valid & OBD_MD_FLCTIME) {
1588                         attr->cat_ctime = oa->o_ctime;
1589                         valid |= CAT_CTIME;
1590                 }
1591
1592                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1593                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1594                         loff_t last_off = last->oap_count + last->oap_obj_off +
1595                                 last->oap_page_off;
1596
1597                         /* Change file size if this is an out of quota or
1598                          * direct IO write and it extends the file size */
1599                         if (loi->loi_lvb.lvb_size < last_off) {
1600                                 attr->cat_size = last_off;
1601                                 valid |= CAT_SIZE;
1602                         }
1603                         /* Extend KMS if it's not a lockless write */
1604                         if (loi->loi_kms < last_off &&
1605                             oap2osc_page(last)->ops_srvlock == 0) {
1606                                 attr->cat_kms = last_off;
1607                                 valid |= CAT_KMS;
1608                         }
1609                 }
1610
1611                 if (valid != 0)
1612                         cl_object_attr_update(env, obj, attr, valid);
1613                 cl_object_attr_unlock(obj);
1614         }
1615         OBDO_FREE(aa->aa_oa);
1616
1617         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1618                 osc_inc_unstable_pages(req);
1619
1620         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1621                 list_del_init(&ext->oe_link);
1622                 osc_extent_finish(env, ext, 1, rc);
1623         }
1624         LASSERT(list_empty(&aa->aa_exts));
1625         LASSERT(list_empty(&aa->aa_oaps));
1626
1627         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1628         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1629
1630         spin_lock(&cli->cl_loi_list_lock);
1631         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1632          * is called so we know whether to go to sync BRWs or wait for more
1633          * RPCs to complete */
1634         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1635                 cli->cl_w_in_flight--;
1636         else
1637                 cli->cl_r_in_flight--;
1638         osc_wake_cache_waiters(cli);
1639         spin_unlock(&cli->cl_loi_list_lock);
1640
1641         osc_io_unplug(env, cli, NULL);
1642         RETURN(rc);
1643 }
1644
1645 static void brw_commit(struct ptlrpc_request *req)
1646 {
1647         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1648          * this called via the rq_commit_cb, I need to ensure
1649          * osc_dec_unstable_pages is still called. Otherwise unstable
1650          * pages may be leaked. */
1651         spin_lock(&req->rq_lock);
1652         if (likely(req->rq_unstable)) {
1653                 req->rq_unstable = 0;
1654                 spin_unlock(&req->rq_lock);
1655
1656                 osc_dec_unstable_pages(req);
1657         } else {
1658                 req->rq_committed = 1;
1659                 spin_unlock(&req->rq_lock);
1660         }
1661 }
1662
1663 /**
1664  * Build an RPC by the list of extent @ext_list. The caller must ensure
1665  * that the total pages in this list are NOT over max pages per RPC.
1666  * Extents in the list must be in OES_RPC state.
1667  */
1668 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1669                   struct list_head *ext_list, int cmd)
1670 {
1671         struct ptlrpc_request           *req = NULL;
1672         struct osc_extent               *ext;
1673         struct brw_page                 **pga = NULL;
1674         struct osc_brw_async_args       *aa = NULL;
1675         struct obdo                     *oa = NULL;
1676         struct osc_async_page           *oap;
1677         struct osc_object               *obj = NULL;
1678         struct cl_req_attr              *crattr = NULL;
1679         loff_t                          starting_offset = OBD_OBJECT_EOF;
1680         loff_t                          ending_offset = 0;
1681         int                             mpflag = 0;
1682         int                             mem_tight = 0;
1683         int                             page_count = 0;
1684         bool                            soft_sync = false;
1685         bool                            interrupted = false;
1686         int                             i;
1687         int                             grant = 0;
1688         int                             rc;
1689         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1690         struct ost_body                 *body;
1691         ENTRY;
1692         LASSERT(!list_empty(ext_list));
1693
1694         /* add pages into rpc_list to build BRW rpc */
1695         list_for_each_entry(ext, ext_list, oe_link) {
1696                 LASSERT(ext->oe_state == OES_RPC);
1697                 mem_tight |= ext->oe_memalloc;
1698                 grant += ext->oe_grants;
1699                 page_count += ext->oe_nr_pages;
1700                 if (obj == NULL)
1701                         obj = ext->oe_obj;
1702         }
1703
1704         soft_sync = osc_over_unstable_soft_limit(cli);
1705         if (mem_tight)
1706                 mpflag = cfs_memory_pressure_get_and_set();
1707
1708         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1709         if (pga == NULL)
1710                 GOTO(out, rc = -ENOMEM);
1711
1712         OBDO_ALLOC(oa);
1713         if (oa == NULL)
1714                 GOTO(out, rc = -ENOMEM);
1715
1716         i = 0;
1717         list_for_each_entry(ext, ext_list, oe_link) {
1718                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1719                         if (mem_tight)
1720                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1721                         if (soft_sync)
1722                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1723                         pga[i] = &oap->oap_brw_page;
1724                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1725                         i++;
1726
1727                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1728                         if (starting_offset == OBD_OBJECT_EOF ||
1729                             starting_offset > oap->oap_obj_off)
1730                                 starting_offset = oap->oap_obj_off;
1731                         else
1732                                 LASSERT(oap->oap_page_off == 0);
1733                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1734                                 ending_offset = oap->oap_obj_off +
1735                                                 oap->oap_count;
1736                         else
1737                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1738                                         PAGE_CACHE_SIZE);
1739                         if (oap->oap_interrupted)
1740                                 interrupted = true;
1741                 }
1742         }
1743
1744         /* first page in the list */
1745         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1746
1747         crattr = &osc_env_info(env)->oti_req_attr;
1748         memset(crattr, 0, sizeof(*crattr));
1749         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1750         crattr->cra_flags = ~0ULL;
1751         crattr->cra_page = oap2cl_page(oap);
1752         crattr->cra_oa = oa;
1753         cl_req_attr_set(env, osc2cl(obj), crattr);
1754
1755         if (cmd == OBD_BRW_WRITE)
1756                 oa->o_grant_used = grant;
1757
1758         sort_brw_pages(pga, page_count);
1759         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1760         if (rc != 0) {
1761                 CERROR("prep_req failed: %d\n", rc);
1762                 GOTO(out, rc);
1763         }
1764
1765         req->rq_commit_cb = brw_commit;
1766         req->rq_interpret_reply = brw_interpret;
1767         req->rq_memalloc = mem_tight != 0;
1768         oap->oap_request = ptlrpc_request_addref(req);
1769         if (interrupted && !req->rq_intr)
1770                 ptlrpc_mark_interrupted(req);
1771
1772         /* Need to update the timestamps after the request is built in case
1773          * we race with setattr (locally or in queue at OST).  If OST gets
1774          * later setattr before earlier BRW (as determined by the request xid),
1775          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1776          * way to do this in a single call.  bug 10150 */
1777         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1778         crattr->cra_oa = &body->oa;
1779         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1780         cl_req_attr_set(env, osc2cl(obj), crattr);
1781         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1782
1783         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1784         aa = ptlrpc_req_async_args(req);
1785         INIT_LIST_HEAD(&aa->aa_oaps);
1786         list_splice_init(&rpc_list, &aa->aa_oaps);
1787         INIT_LIST_HEAD(&aa->aa_exts);
1788         list_splice_init(ext_list, &aa->aa_exts);
1789
1790         spin_lock(&cli->cl_loi_list_lock);
1791         starting_offset >>= PAGE_CACHE_SHIFT;
1792         if (cmd == OBD_BRW_READ) {
1793                 cli->cl_r_in_flight++;
1794                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1795                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1796                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1797                                       starting_offset + 1);
1798         } else {
1799                 cli->cl_w_in_flight++;
1800                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1801                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1802                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1803                                       starting_offset + 1);
1804         }
1805         spin_unlock(&cli->cl_loi_list_lock);
1806
1807         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1808                   page_count, aa, cli->cl_r_in_flight,
1809                   cli->cl_w_in_flight);
1810
1811         ptlrpcd_add_req(req);
1812         rc = 0;
1813         EXIT;
1814
1815 out:
1816         if (mem_tight != 0)
1817                 cfs_memory_pressure_restore(mpflag);
1818
1819         if (rc != 0) {
1820                 LASSERT(req == NULL);
1821
1822                 if (oa)
1823                         OBDO_FREE(oa);
1824                 if (pga)
1825                         OBD_FREE(pga, sizeof(*pga) * page_count);
1826                 /* this should happen rarely and is pretty bad, it makes the
1827                  * pending list not follow the dirty order */
1828                 while (!list_empty(ext_list)) {
1829                         ext = list_entry(ext_list->next, struct osc_extent,
1830                                          oe_link);
1831                         list_del_init(&ext->oe_link);
1832                         osc_extent_finish(env, ext, 0, rc);
1833                 }
1834         }
1835         RETURN(rc);
1836 }
1837
1838 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1839                                         struct ldlm_enqueue_info *einfo)
1840 {
1841         void *data = einfo->ei_cbdata;
1842         int set = 0;
1843
1844         LASSERT(lock != NULL);
1845         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1846         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1847         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1848         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1849
1850         lock_res_and_lock(lock);
1851
1852         if (lock->l_ast_data == NULL)
1853                 lock->l_ast_data = data;
1854         if (lock->l_ast_data == data)
1855                 set = 1;
1856
1857         unlock_res_and_lock(lock);
1858
1859         return set;
1860 }
1861
1862 static int osc_set_data_with_check(struct lustre_handle *lockh,
1863                                    struct ldlm_enqueue_info *einfo)
1864 {
1865         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1866         int set = 0;
1867
1868         if (lock != NULL) {
1869                 set = osc_set_lock_data_with_check(lock, einfo);
1870                 LDLM_LOCK_PUT(lock);
1871         } else
1872                 CERROR("lockh %p, data %p - client evicted?\n",
1873                        lockh, einfo->ei_cbdata);
1874         return set;
1875 }
1876
1877 static int osc_enqueue_fini(struct ptlrpc_request *req,
1878                             osc_enqueue_upcall_f upcall, void *cookie,
1879                             struct lustre_handle *lockh, enum ldlm_mode mode,
1880                             __u64 *flags, int agl, int errcode)
1881 {
1882         bool intent = *flags & LDLM_FL_HAS_INTENT;
1883         int rc;
1884         ENTRY;
1885
1886         /* The request was created before ldlm_cli_enqueue call. */
1887         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1888                 struct ldlm_reply *rep;
1889
1890                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1891                 LASSERT(rep != NULL);
1892
1893                 rep->lock_policy_res1 =
1894                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1895                 if (rep->lock_policy_res1)
1896                         errcode = rep->lock_policy_res1;
1897                 if (!agl)
1898                         *flags |= LDLM_FL_LVB_READY;
1899         } else if (errcode == ELDLM_OK) {
1900                 *flags |= LDLM_FL_LVB_READY;
1901         }
1902
1903         /* Call the update callback. */
1904         rc = (*upcall)(cookie, lockh, errcode);
1905
1906         /* release the reference taken in ldlm_cli_enqueue() */
1907         if (errcode == ELDLM_LOCK_MATCHED)
1908                 errcode = ELDLM_OK;
1909         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1910                 ldlm_lock_decref(lockh, mode);
1911
1912         RETURN(rc);
1913 }
1914
1915 static int osc_enqueue_interpret(const struct lu_env *env,
1916                                  struct ptlrpc_request *req,
1917                                  struct osc_enqueue_args *aa, int rc)
1918 {
1919         struct ldlm_lock *lock;
1920         struct lustre_handle *lockh = &aa->oa_lockh;
1921         enum ldlm_mode mode = aa->oa_mode;
1922         struct ost_lvb *lvb = aa->oa_lvb;
1923         __u32 lvb_len = sizeof(*lvb);
1924         __u64 flags = 0;
1925
1926         ENTRY;
1927
1928         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1929          * be valid. */
1930         lock = ldlm_handle2lock(lockh);
1931         LASSERTF(lock != NULL,
1932                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1933                  lockh->cookie, req, aa);
1934
1935         /* Take an additional reference so that a blocking AST that
1936          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1937          * to arrive after an upcall has been executed by
1938          * osc_enqueue_fini(). */
1939         ldlm_lock_addref(lockh, mode);
1940
1941         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1942         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1943
1944         /* Let CP AST to grant the lock first. */
1945         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1946
1947         if (aa->oa_agl) {
1948                 LASSERT(aa->oa_lvb == NULL);
1949                 LASSERT(aa->oa_flags == NULL);
1950                 aa->oa_flags = &flags;
1951         }
1952
1953         /* Complete obtaining the lock procedure. */
1954         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1955                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1956                                    lockh, rc);
1957         /* Complete osc stuff. */
1958         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1959                               aa->oa_flags, aa->oa_agl, rc);
1960
1961         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1962
1963         ldlm_lock_decref(lockh, mode);
1964         LDLM_LOCK_PUT(lock);
1965         RETURN(rc);
1966 }
1967
1968 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1969
1970 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1971  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1972  * other synchronous requests, however keeping some locks and trying to obtain
1973  * others may take a considerable amount of time in a case of ost failure; and
1974  * when other sync requests do not get released lock from a client, the client
1975  * is evicted from the cluster -- such scenarious make the life difficult, so
1976  * release locks just after they are obtained. */
1977 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1978                      __u64 *flags, union ldlm_policy_data *policy,
1979                      struct ost_lvb *lvb, int kms_valid,
1980                      osc_enqueue_upcall_f upcall, void *cookie,
1981                      struct ldlm_enqueue_info *einfo,
1982                      struct ptlrpc_request_set *rqset, int async, int agl)
1983 {
1984         struct obd_device *obd = exp->exp_obd;
1985         struct lustre_handle lockh = { 0 };
1986         struct ptlrpc_request *req = NULL;
1987         int intent = *flags & LDLM_FL_HAS_INTENT;
1988         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1989         enum ldlm_mode mode;
1990         int rc;
1991         ENTRY;
1992
1993         /* Filesystem lock extents are extended to page boundaries so that
1994          * dealing with the page cache is a little smoother.  */
1995         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1996         policy->l_extent.end |= ~PAGE_MASK;
1997
1998         /*
1999          * kms is not valid when either object is completely fresh (so that no
2000          * locks are cached), or object was evicted. In the latter case cached
2001          * lock cannot be used, because it would prime inode state with
2002          * potentially stale LVB.
2003          */
2004         if (!kms_valid)
2005                 goto no_match;
2006
2007         /* Next, search for already existing extent locks that will cover us */
2008         /* If we're trying to read, we also search for an existing PW lock.  The
2009          * VFS and page cache already protect us locally, so lots of readers/
2010          * writers can share a single PW lock.
2011          *
2012          * There are problems with conversion deadlocks, so instead of
2013          * converting a read lock to a write lock, we'll just enqueue a new
2014          * one.
2015          *
2016          * At some point we should cancel the read lock instead of making them
2017          * send us a blocking callback, but there are problems with canceling
2018          * locks out from other users right now, too. */
2019         mode = einfo->ei_mode;
2020         if (einfo->ei_mode == LCK_PR)
2021                 mode |= LCK_PW;
2022         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2023                                einfo->ei_type, policy, mode, &lockh, 0);
2024         if (mode) {
2025                 struct ldlm_lock *matched;
2026
2027                 if (*flags & LDLM_FL_TEST_LOCK)
2028                         RETURN(ELDLM_OK);
2029
2030                 matched = ldlm_handle2lock(&lockh);
2031                 if (agl) {
2032                         /* AGL enqueues DLM locks speculatively. Therefore if
2033                          * it already exists a DLM lock, it wll just inform the
2034                          * caller to cancel the AGL process for this stripe. */
2035                         ldlm_lock_decref(&lockh, mode);
2036                         LDLM_LOCK_PUT(matched);
2037                         RETURN(-ECANCELED);
2038                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2039                         *flags |= LDLM_FL_LVB_READY;
2040
2041                         /* We already have a lock, and it's referenced. */
2042                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2043
2044                         ldlm_lock_decref(&lockh, mode);
2045                         LDLM_LOCK_PUT(matched);
2046                         RETURN(ELDLM_OK);
2047                 } else {
2048                         ldlm_lock_decref(&lockh, mode);
2049                         LDLM_LOCK_PUT(matched);
2050                 }
2051         }
2052
2053 no_match:
2054         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2055                 RETURN(-ENOLCK);
2056
2057         if (intent) {
2058                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2059                                            &RQF_LDLM_ENQUEUE_LVB);
2060                 if (req == NULL)
2061                         RETURN(-ENOMEM);
2062
2063                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2064                 if (rc) {
2065                         ptlrpc_request_free(req);
2066                         RETURN(rc);
2067                 }
2068
2069                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2070                                      sizeof *lvb);
2071                 ptlrpc_request_set_replen(req);
2072         }
2073
2074         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2075         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2076
2077         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2078                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2079         if (async) {
2080                 if (!rc) {
2081                         struct osc_enqueue_args *aa;
2082                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2083                         aa = ptlrpc_req_async_args(req);
2084                         aa->oa_exp    = exp;
2085                         aa->oa_mode   = einfo->ei_mode;
2086                         aa->oa_type   = einfo->ei_type;
2087                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2088                         aa->oa_upcall = upcall;
2089                         aa->oa_cookie = cookie;
2090                         aa->oa_agl    = !!agl;
2091                         if (!agl) {
2092                                 aa->oa_flags  = flags;
2093                                 aa->oa_lvb    = lvb;
2094                         } else {
2095                                 /* AGL is essentially to enqueue an DLM lock
2096                                  * in advance, so we don't care about the
2097                                  * result of AGL enqueue. */
2098                                 aa->oa_lvb    = NULL;
2099                                 aa->oa_flags  = NULL;
2100                         }
2101
2102                         req->rq_interpret_reply =
2103                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2104                         if (rqset == PTLRPCD_SET)
2105                                 ptlrpcd_add_req(req);
2106                         else
2107                                 ptlrpc_set_add_req(rqset, req);
2108                 } else if (intent) {
2109                         ptlrpc_req_finished(req);
2110                 }
2111                 RETURN(rc);
2112         }
2113
2114         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2115                               flags, agl, rc);
2116         if (intent)
2117                 ptlrpc_req_finished(req);
2118
2119         RETURN(rc);
2120 }
2121
2122 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2123                    enum ldlm_type type, union ldlm_policy_data *policy,
2124                    enum ldlm_mode mode, __u64 *flags, void *data,
2125                    struct lustre_handle *lockh, int unref)
2126 {
2127         struct obd_device *obd = exp->exp_obd;
2128         __u64 lflags = *flags;
2129         enum ldlm_mode rc;
2130         ENTRY;
2131
2132         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2133                 RETURN(-EIO);
2134
2135         /* Filesystem lock extents are extended to page boundaries so that
2136          * dealing with the page cache is a little smoother */
2137         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2138         policy->l_extent.end |= ~PAGE_MASK;
2139
2140         /* Next, search for already existing extent locks that will cover us */
2141         /* If we're trying to read, we also search for an existing PW lock.  The
2142          * VFS and page cache already protect us locally, so lots of readers/
2143          * writers can share a single PW lock. */
2144         rc = mode;
2145         if (mode == LCK_PR)
2146                 rc |= LCK_PW;
2147         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2148                              res_id, type, policy, rc, lockh, unref);
2149         if (rc) {
2150                 if (data != NULL) {
2151                         if (!osc_set_data_with_check(lockh, data)) {
2152                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2153                                         ldlm_lock_decref(lockh, rc);
2154                                 RETURN(0);
2155                         }
2156                 }
2157                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2158                         ldlm_lock_addref(lockh, LCK_PR);
2159                         ldlm_lock_decref(lockh, LCK_PW);
2160                 }
2161                 RETURN(rc);
2162         }
2163         RETURN(rc);
2164 }
2165
2166 static int osc_statfs_interpret(const struct lu_env *env,
2167                                 struct ptlrpc_request *req,
2168                                 struct osc_async_args *aa, int rc)
2169 {
2170         struct obd_statfs *msfs;
2171         ENTRY;
2172
2173         if (rc == -EBADR)
2174                 /* The request has in fact never been sent
2175                  * due to issues at a higher level (LOV).
2176                  * Exit immediately since the caller is
2177                  * aware of the problem and takes care
2178                  * of the clean up */
2179                  RETURN(rc);
2180
2181         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2182             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2183                 GOTO(out, rc = 0);
2184
2185         if (rc != 0)
2186                 GOTO(out, rc);
2187
2188         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2189         if (msfs == NULL) {
2190                 GOTO(out, rc = -EPROTO);
2191         }
2192
2193         *aa->aa_oi->oi_osfs = *msfs;
2194 out:
2195         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2196         RETURN(rc);
2197 }
2198
2199 static int osc_statfs_async(struct obd_export *exp,
2200                             struct obd_info *oinfo, __u64 max_age,
2201                             struct ptlrpc_request_set *rqset)
2202 {
2203         struct obd_device     *obd = class_exp2obd(exp);
2204         struct ptlrpc_request *req;
2205         struct osc_async_args *aa;
2206         int                    rc;
2207         ENTRY;
2208
2209         /* We could possibly pass max_age in the request (as an absolute
2210          * timestamp or a "seconds.usec ago") so the target can avoid doing
2211          * extra calls into the filesystem if that isn't necessary (e.g.
2212          * during mount that would help a bit).  Having relative timestamps
2213          * is not so great if request processing is slow, while absolute
2214          * timestamps are not ideal because they need time synchronization. */
2215         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2216         if (req == NULL)
2217                 RETURN(-ENOMEM);
2218
2219         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2220         if (rc) {
2221                 ptlrpc_request_free(req);
2222                 RETURN(rc);
2223         }
2224         ptlrpc_request_set_replen(req);
2225         req->rq_request_portal = OST_CREATE_PORTAL;
2226         ptlrpc_at_set_req_timeout(req);
2227
2228         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2229                 /* procfs requests not want stat in wait for avoid deadlock */
2230                 req->rq_no_resend = 1;
2231                 req->rq_no_delay = 1;
2232         }
2233
2234         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2235         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2236         aa = ptlrpc_req_async_args(req);
2237         aa->aa_oi = oinfo;
2238
2239         ptlrpc_set_add_req(rqset, req);
2240         RETURN(0);
2241 }
2242
2243 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2244                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2245 {
2246         struct obd_device     *obd = class_exp2obd(exp);
2247         struct obd_statfs     *msfs;
2248         struct ptlrpc_request *req;
2249         struct obd_import     *imp = NULL;
2250         int rc;
2251         ENTRY;
2252
2253         /*Since the request might also come from lprocfs, so we need
2254          *sync this with client_disconnect_export Bug15684*/
2255         down_read(&obd->u.cli.cl_sem);
2256         if (obd->u.cli.cl_import)
2257                 imp = class_import_get(obd->u.cli.cl_import);
2258         up_read(&obd->u.cli.cl_sem);
2259         if (!imp)
2260                 RETURN(-ENODEV);
2261
2262         /* We could possibly pass max_age in the request (as an absolute
2263          * timestamp or a "seconds.usec ago") so the target can avoid doing
2264          * extra calls into the filesystem if that isn't necessary (e.g.
2265          * during mount that would help a bit).  Having relative timestamps
2266          * is not so great if request processing is slow, while absolute
2267          * timestamps are not ideal because they need time synchronization. */
2268         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2269
2270         class_import_put(imp);
2271
2272         if (req == NULL)
2273                 RETURN(-ENOMEM);
2274
2275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2276         if (rc) {
2277                 ptlrpc_request_free(req);
2278                 RETURN(rc);
2279         }
2280         ptlrpc_request_set_replen(req);
2281         req->rq_request_portal = OST_CREATE_PORTAL;
2282         ptlrpc_at_set_req_timeout(req);
2283
2284         if (flags & OBD_STATFS_NODELAY) {
2285                 /* procfs requests not want stat in wait for avoid deadlock */
2286                 req->rq_no_resend = 1;
2287                 req->rq_no_delay = 1;
2288         }
2289
2290         rc = ptlrpc_queue_wait(req);
2291         if (rc)
2292                 GOTO(out, rc);
2293
2294         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2295         if (msfs == NULL) {
2296                 GOTO(out, rc = -EPROTO);
2297         }
2298
2299         *osfs = *msfs;
2300
2301         EXIT;
2302  out:
2303         ptlrpc_req_finished(req);
2304         return rc;
2305 }
2306
2307 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2308                          void *karg, void __user *uarg)
2309 {
2310         struct obd_device *obd = exp->exp_obd;
2311         struct obd_ioctl_data *data = karg;
2312         int err = 0;
2313         ENTRY;
2314
2315         if (!try_module_get(THIS_MODULE)) {
2316                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2317                        module_name(THIS_MODULE));
2318                 return -EINVAL;
2319         }
2320         switch (cmd) {
2321         case OBD_IOC_CLIENT_RECOVER:
2322                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2323                                             data->ioc_inlbuf1, 0);
2324                 if (err > 0)
2325                         err = 0;
2326                 GOTO(out, err);
2327         case IOC_OSC_SET_ACTIVE:
2328                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2329                                                data->ioc_offset);
2330                 GOTO(out, err);
2331         case OBD_IOC_PING_TARGET:
2332                 err = ptlrpc_obd_ping(obd);
2333                 GOTO(out, err);
2334         default:
2335                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2336                        cmd, current_comm());
2337                 GOTO(out, err = -ENOTTY);
2338         }
2339 out:
2340         module_put(THIS_MODULE);
2341         return err;
2342 }
2343
2344 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2345                               u32 keylen, void *key,
2346                               u32 vallen, void *val,
2347                               struct ptlrpc_request_set *set)
2348 {
2349         struct ptlrpc_request *req;
2350         struct obd_device     *obd = exp->exp_obd;
2351         struct obd_import     *imp = class_exp2cliimp(exp);
2352         char                  *tmp;
2353         int                    rc;
2354         ENTRY;
2355
2356         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2357
2358         if (KEY_IS(KEY_CHECKSUM)) {
2359                 if (vallen != sizeof(int))
2360                         RETURN(-EINVAL);
2361                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2362                 RETURN(0);
2363         }
2364
2365         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2366                 sptlrpc_conf_client_adapt(obd);
2367                 RETURN(0);
2368         }
2369
2370         if (KEY_IS(KEY_FLUSH_CTX)) {
2371                 sptlrpc_import_flush_my_ctx(imp);
2372                 RETURN(0);
2373         }
2374
2375         if (KEY_IS(KEY_CACHE_SET)) {
2376                 struct client_obd *cli = &obd->u.cli;
2377
2378                 LASSERT(cli->cl_cache == NULL); /* only once */
2379                 cli->cl_cache = (struct cl_client_cache *)val;
2380                 cl_cache_incref(cli->cl_cache);
2381                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2382
2383                 /* add this osc into entity list */
2384                 LASSERT(list_empty(&cli->cl_lru_osc));
2385                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2386                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2387                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2388
2389                 RETURN(0);
2390         }
2391
2392         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2393                 struct client_obd *cli = &obd->u.cli;
2394                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2395                 long target = *(long *)val;
2396
2397                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2398                 *(long *)val -= nr;
2399                 RETURN(0);
2400         }
2401
2402         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2403                 RETURN(-EINVAL);
2404
2405         /* We pass all other commands directly to OST. Since nobody calls osc
2406            methods directly and everybody is supposed to go through LOV, we
2407            assume lov checked invalid values for us.
2408            The only recognised values so far are evict_by_nid and mds_conn.
2409            Even if something bad goes through, we'd get a -EINVAL from OST
2410            anyway. */
2411
2412         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2413                                                 &RQF_OST_SET_GRANT_INFO :
2414                                                 &RQF_OBD_SET_INFO);
2415         if (req == NULL)
2416                 RETURN(-ENOMEM);
2417
2418         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2419                              RCL_CLIENT, keylen);
2420         if (!KEY_IS(KEY_GRANT_SHRINK))
2421                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2422                                      RCL_CLIENT, vallen);
2423         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2424         if (rc) {
2425                 ptlrpc_request_free(req);
2426                 RETURN(rc);
2427         }
2428
2429         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2430         memcpy(tmp, key, keylen);
2431         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2432                                                         &RMF_OST_BODY :
2433                                                         &RMF_SETINFO_VAL);
2434         memcpy(tmp, val, vallen);
2435
2436         if (KEY_IS(KEY_GRANT_SHRINK)) {
2437                 struct osc_grant_args *aa;
2438                 struct obdo *oa;
2439
2440                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2441                 aa = ptlrpc_req_async_args(req);
2442                 OBDO_ALLOC(oa);
2443                 if (!oa) {
2444                         ptlrpc_req_finished(req);
2445                         RETURN(-ENOMEM);
2446                 }
2447                 *oa = ((struct ost_body *)val)->oa;
2448                 aa->aa_oa = oa;
2449                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2450         }
2451
2452         ptlrpc_request_set_replen(req);
2453         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2454                 LASSERT(set != NULL);
2455                 ptlrpc_set_add_req(set, req);
2456                 ptlrpc_check_set(NULL, set);
2457         } else {
2458                 ptlrpcd_add_req(req);
2459         }
2460
2461         RETURN(0);
2462 }
2463
2464 static int osc_reconnect(const struct lu_env *env,
2465                          struct obd_export *exp, struct obd_device *obd,
2466                          struct obd_uuid *cluuid,
2467                          struct obd_connect_data *data,
2468                          void *localdata)
2469 {
2470         struct client_obd *cli = &obd->u.cli;
2471
2472         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2473                 long lost_grant;
2474                 long grant;
2475
2476                 spin_lock(&cli->cl_loi_list_lock);
2477                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2478                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2479                         grant += cli->cl_dirty_grant;
2480                 else
2481                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2482                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2483                 lost_grant = cli->cl_lost_grant;
2484                 cli->cl_lost_grant = 0;
2485                 spin_unlock(&cli->cl_loi_list_lock);
2486
2487                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2488                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2489                        data->ocd_version, data->ocd_grant, lost_grant);
2490         }
2491
2492         RETURN(0);
2493 }
2494
2495 static int osc_disconnect(struct obd_export *exp)
2496 {
2497         struct obd_device *obd = class_exp2obd(exp);
2498         int rc;
2499
2500         rc = client_disconnect_export(exp);
2501         /**
2502          * Initially we put del_shrink_grant before disconnect_export, but it
2503          * causes the following problem if setup (connect) and cleanup
2504          * (disconnect) are tangled together.
2505          *      connect p1                     disconnect p2
2506          *   ptlrpc_connect_import
2507          *     ...............               class_manual_cleanup
2508          *                                     osc_disconnect
2509          *                                     del_shrink_grant
2510          *   ptlrpc_connect_interrupt
2511          *     init_grant_shrink
2512          *   add this client to shrink list
2513          *                                      cleanup_osc
2514          * Bang! pinger trigger the shrink.
2515          * So the osc should be disconnected from the shrink list, after we
2516          * are sure the import has been destroyed. BUG18662
2517          */
2518         if (obd->u.cli.cl_import == NULL)
2519                 osc_del_shrink_grant(&obd->u.cli);
2520         return rc;
2521 }
2522
2523 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2524         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2525 {
2526         struct lu_env *env = arg;
2527         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2528         struct ldlm_lock *lock;
2529         struct osc_object *osc = NULL;
2530         ENTRY;
2531
2532         lock_res(res);
2533         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2534                 if (lock->l_ast_data != NULL && osc == NULL) {
2535                         osc = lock->l_ast_data;
2536                         cl_object_get(osc2cl(osc));
2537                 }
2538
2539                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2540                  * by the 2nd round of ldlm_namespace_clean() call in
2541                  * osc_import_event(). */
2542                 ldlm_clear_cleaned(lock);
2543         }
2544         unlock_res(res);
2545
2546         if (osc != NULL) {
2547                 osc_object_invalidate(env, osc);
2548                 cl_object_put(env, osc2cl(osc));
2549         }
2550
2551         RETURN(0);
2552 }
2553
2554 static int osc_import_event(struct obd_device *obd,
2555                             struct obd_import *imp,
2556                             enum obd_import_event event)
2557 {
2558         struct client_obd *cli;
2559         int rc = 0;
2560
2561         ENTRY;
2562         LASSERT(imp->imp_obd == obd);
2563
2564         switch (event) {
2565         case IMP_EVENT_DISCON: {
2566                 cli = &obd->u.cli;
2567                 spin_lock(&cli->cl_loi_list_lock);
2568                 cli->cl_avail_grant = 0;
2569                 cli->cl_lost_grant = 0;
2570                 spin_unlock(&cli->cl_loi_list_lock);
2571                 break;
2572         }
2573         case IMP_EVENT_INACTIVE: {
2574                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2575                 break;
2576         }
2577         case IMP_EVENT_INVALIDATE: {
2578                 struct ldlm_namespace *ns = obd->obd_namespace;
2579                 struct lu_env         *env;
2580                 __u16                  refcheck;
2581
2582                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2583
2584                 env = cl_env_get(&refcheck);
2585                 if (!IS_ERR(env)) {
2586                         osc_io_unplug(env, &obd->u.cli, NULL);
2587
2588                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2589                                                  osc_ldlm_resource_invalidate,
2590                                                  env, 0);
2591                         cl_env_put(env, &refcheck);
2592
2593                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2594                 } else
2595                         rc = PTR_ERR(env);
2596                 break;
2597         }
2598         case IMP_EVENT_ACTIVE: {
2599                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2600                 break;
2601         }
2602         case IMP_EVENT_OCD: {
2603                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2604
2605                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2606                         osc_init_grant(&obd->u.cli, ocd);
2607
2608                 /* See bug 7198 */
2609                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2610                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2611
2612                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2613                 break;
2614         }
2615         case IMP_EVENT_DEACTIVATE: {
2616                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2617                 break;
2618         }
2619         case IMP_EVENT_ACTIVATE: {
2620                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2621                 break;
2622         }
2623         default:
2624                 CERROR("Unknown import event %d\n", event);
2625                 LBUG();
2626         }
2627         RETURN(rc);
2628 }
2629
2630 /**
2631  * Determine whether the lock can be canceled before replaying the lock
2632  * during recovery, see bug16774 for detailed information.
2633  *
2634  * \retval zero the lock can't be canceled
2635  * \retval other ok to cancel
2636  */
2637 static int osc_cancel_weight(struct ldlm_lock *lock)
2638 {
2639         /*
2640          * Cancel all unused and granted extent lock.
2641          */
2642         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2643             lock->l_granted_mode == lock->l_req_mode &&
2644             osc_ldlm_weigh_ast(lock) == 0)
2645                 RETURN(1);
2646
2647         RETURN(0);
2648 }
2649
2650 static int brw_queue_work(const struct lu_env *env, void *data)
2651 {
2652         struct client_obd *cli = data;
2653
2654         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2655
2656         osc_io_unplug(env, cli, NULL);
2657         RETURN(0);
2658 }
2659
2660 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2661 {
2662         struct client_obd *cli = &obd->u.cli;
2663         struct obd_type   *type;
2664         void              *handler;
2665         int                rc;
2666         int                adding;
2667         int                added;
2668         int                req_count;
2669         ENTRY;
2670
2671         rc = ptlrpcd_addref();
2672         if (rc)
2673                 RETURN(rc);
2674
2675         rc = client_obd_setup(obd, lcfg);
2676         if (rc)
2677                 GOTO(out_ptlrpcd, rc);
2678
2679         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2680         if (IS_ERR(handler))
2681                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2682         cli->cl_writeback_work = handler;
2683
2684         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2685         if (IS_ERR(handler))
2686                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2687         cli->cl_lru_work = handler;
2688
2689         rc = osc_quota_setup(obd);
2690         if (rc)
2691                 GOTO(out_ptlrpcd_work, rc);
2692
2693         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2694
2695 #ifdef CONFIG_PROC_FS
2696         obd->obd_vars = lprocfs_osc_obd_vars;
2697 #endif
2698         /* If this is true then both client (osc) and server (osp) are on the
2699          * same node. The osp layer if loaded first will register the osc proc
2700          * directory. In that case this obd_device will be attached its proc
2701          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2702         type = class_search_type(LUSTRE_OSP_NAME);
2703         if (type && type->typ_procsym) {
2704                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2705                                                        type->typ_procsym,
2706                                                        obd->obd_vars, obd);
2707                 if (IS_ERR(obd->obd_proc_entry)) {
2708                         rc = PTR_ERR(obd->obd_proc_entry);
2709                         CERROR("error %d setting up lprocfs for %s\n", rc,
2710                                obd->obd_name);
2711                         obd->obd_proc_entry = NULL;
2712                 }
2713         } else {
2714                 rc = lprocfs_obd_setup(obd);
2715         }
2716
2717         /* If the basic OSC proc tree construction succeeded then
2718          * lets do the rest. */
2719         if (rc == 0) {
2720                 lproc_osc_attach_seqstat(obd);
2721                 sptlrpc_lprocfs_cliobd_attach(obd);
2722                 ptlrpc_lprocfs_register_obd(obd);
2723         }
2724
2725         /*
2726          * We try to control the total number of requests with a upper limit
2727          * osc_reqpool_maxreqcount. There might be some race which will cause
2728          * over-limit allocation, but it is fine.
2729          */
2730         req_count = atomic_read(&osc_pool_req_count);
2731         if (req_count < osc_reqpool_maxreqcount) {
2732                 adding = cli->cl_max_rpcs_in_flight + 2;
2733                 if (req_count + adding > osc_reqpool_maxreqcount)
2734                         adding = osc_reqpool_maxreqcount - req_count;
2735
2736                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2737                 atomic_add(added, &osc_pool_req_count);
2738         }
2739
2740         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2741         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2742
2743         spin_lock(&osc_shrink_lock);
2744         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2745         spin_unlock(&osc_shrink_lock);
2746
2747         RETURN(0);
2748
2749 out_ptlrpcd_work:
2750         if (cli->cl_writeback_work != NULL) {
2751                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2752                 cli->cl_writeback_work = NULL;
2753         }
2754         if (cli->cl_lru_work != NULL) {
2755                 ptlrpcd_destroy_work(cli->cl_lru_work);
2756                 cli->cl_lru_work = NULL;
2757         }
2758 out_client_setup:
2759         client_obd_cleanup(obd);
2760 out_ptlrpcd:
2761         ptlrpcd_decref();
2762         RETURN(rc);
2763 }
2764
2765 static int osc_precleanup(struct obd_device *obd)
2766 {
2767         struct client_obd *cli = &obd->u.cli;
2768         ENTRY;
2769
2770         /* LU-464
2771          * for echo client, export may be on zombie list, wait for
2772          * zombie thread to cull it, because cli.cl_import will be
2773          * cleared in client_disconnect_export():
2774          *   class_export_destroy() -> obd_cleanup() ->
2775          *   echo_device_free() -> echo_client_cleanup() ->
2776          *   obd_disconnect() -> osc_disconnect() ->
2777          *   client_disconnect_export()
2778          */
2779         obd_zombie_barrier();
2780         if (cli->cl_writeback_work) {
2781                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2782                 cli->cl_writeback_work = NULL;
2783         }
2784
2785         if (cli->cl_lru_work) {
2786                 ptlrpcd_destroy_work(cli->cl_lru_work);
2787                 cli->cl_lru_work = NULL;
2788         }
2789
2790         obd_cleanup_client_import(obd);
2791         ptlrpc_lprocfs_unregister_obd(obd);
2792         lprocfs_obd_cleanup(obd);
2793         RETURN(0);
2794 }
2795
2796 int osc_cleanup(struct obd_device *obd)
2797 {
2798         struct client_obd *cli = &obd->u.cli;
2799         int rc;
2800
2801         ENTRY;
2802
2803         spin_lock(&osc_shrink_lock);
2804         list_del(&cli->cl_shrink_list);
2805         spin_unlock(&osc_shrink_lock);
2806
2807         /* lru cleanup */
2808         if (cli->cl_cache != NULL) {
2809                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2810                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2811                 list_del_init(&cli->cl_lru_osc);
2812                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2813                 cli->cl_lru_left = NULL;
2814                 cl_cache_decref(cli->cl_cache);
2815                 cli->cl_cache = NULL;
2816         }
2817
2818         /* free memory of osc quota cache */
2819         osc_quota_cleanup(obd);
2820
2821         rc = client_obd_cleanup(obd);
2822
2823         ptlrpcd_decref();
2824         RETURN(rc);
2825 }
2826
2827 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2828 {
2829         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2830         return rc > 0 ? 0: rc;
2831 }
2832
2833 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2834 {
2835         return osc_process_config_base(obd, buf);
2836 }
2837
2838 static struct obd_ops osc_obd_ops = {
2839         .o_owner                = THIS_MODULE,
2840         .o_setup                = osc_setup,
2841         .o_precleanup           = osc_precleanup,
2842         .o_cleanup              = osc_cleanup,
2843         .o_add_conn             = client_import_add_conn,
2844         .o_del_conn             = client_import_del_conn,
2845         .o_connect              = client_connect_import,
2846         .o_reconnect            = osc_reconnect,
2847         .o_disconnect           = osc_disconnect,
2848         .o_statfs               = osc_statfs,
2849         .o_statfs_async         = osc_statfs_async,
2850         .o_create               = osc_create,
2851         .o_destroy              = osc_destroy,
2852         .o_getattr              = osc_getattr,
2853         .o_setattr              = osc_setattr,
2854         .o_iocontrol            = osc_iocontrol,
2855         .o_set_info_async       = osc_set_info_async,
2856         .o_import_event         = osc_import_event,
2857         .o_process_config       = osc_process_config,
2858         .o_quotactl             = osc_quotactl,
2859 };
2860
2861 static struct shrinker *osc_cache_shrinker;
2862 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2863 DEFINE_SPINLOCK(osc_shrink_lock);
2864
2865 #ifndef HAVE_SHRINKER_COUNT
2866 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2867 {
2868         struct shrink_control scv = {
2869                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2870                 .gfp_mask   = shrink_param(sc, gfp_mask)
2871         };
2872 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2873         struct shrinker *shrinker = NULL;
2874 #endif
2875
2876         (void)osc_cache_shrink_scan(shrinker, &scv);
2877
2878         return osc_cache_shrink_count(shrinker, &scv);
2879 }
2880 #endif
2881
2882 static int __init osc_init(void)
2883 {
2884         bool enable_proc = true;
2885         struct obd_type *type;
2886         unsigned int reqpool_size;
2887         unsigned int reqsize;
2888         int rc;
2889         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2890                          osc_cache_shrink_count, osc_cache_shrink_scan);
2891         ENTRY;
2892
2893         /* print an address of _any_ initialized kernel symbol from this
2894          * module, to allow debugging with gdb that doesn't support data
2895          * symbols from modules.*/
2896         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2897
2898         rc = lu_kmem_init(osc_caches);
2899         if (rc)
2900                 RETURN(rc);
2901
2902         type = class_search_type(LUSTRE_OSP_NAME);
2903         if (type != NULL && type->typ_procsym != NULL)
2904                 enable_proc = false;
2905
2906         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2907                                  LUSTRE_OSC_NAME, &osc_device_type);
2908         if (rc)
2909                 GOTO(out_kmem, rc);
2910
2911         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2912
2913         /* This is obviously too much memory, only prevent overflow here */
2914         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2915                 GOTO(out_type, rc = -EINVAL);
2916
2917         reqpool_size = osc_reqpool_mem_max << 20;
2918
2919         reqsize = 1;
2920         while (reqsize < OST_IO_MAXREQSIZE)
2921                 reqsize = reqsize << 1;
2922
2923         /*
2924          * We don't enlarge the request count in OSC pool according to
2925          * cl_max_rpcs_in_flight. The allocation from the pool will only be
2926          * tried after normal allocation failed. So a small OSC pool won't
2927          * cause much performance degression in most of cases.
2928          */
2929         osc_reqpool_maxreqcount = reqpool_size / reqsize;
2930
2931         atomic_set(&osc_pool_req_count, 0);
2932         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2933                                           ptlrpc_add_rqs_to_pool);
2934
2935         if (osc_rq_pool != NULL)
2936                 GOTO(out, rc);
2937         rc = -ENOMEM;
2938 out_type:
2939         class_unregister_type(LUSTRE_OSC_NAME);
2940 out_kmem:
2941         lu_kmem_fini(osc_caches);
2942 out:
2943         RETURN(rc);
2944 }
2945
2946 static void __exit osc_exit(void)
2947 {
2948         remove_shrinker(osc_cache_shrinker);
2949         class_unregister_type(LUSTRE_OSC_NAME);
2950         lu_kmem_fini(osc_caches);
2951         ptlrpc_free_rq_pool(osc_rq_pool);
2952 }
2953
2954 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2955 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2956 MODULE_VERSION(LUSTRE_VERSION_STRING);
2957 MODULE_LICENSE("GPL");
2958
2959 module_init(osc_init);
2960 module_exit(osc_exit);