Whamcloud - gitweb
LU-5683 clio: add CIT_DATA_VERSION
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
139                        struct obd_info *oinfo)
140 {
141         struct ptlrpc_request *req;
142         struct ost_body       *body;
143         int                    rc;
144         ENTRY;
145
146         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
147         if (req == NULL)
148                 RETURN(-ENOMEM);
149
150         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
151         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
152         if (rc) {
153                 ptlrpc_request_free(req);
154                 RETURN(rc);
155         }
156
157         osc_pack_req_body(req, oinfo);
158
159         ptlrpc_request_set_replen(req);
160
161         rc = ptlrpc_queue_wait(req);
162         if (rc)
163                 GOTO(out, rc);
164
165         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 GOTO(out, rc = -EPROTO);
168
169         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
170         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
171                              &body->oa);
172
173         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
174         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
175
176         EXIT;
177  out:
178         ptlrpc_req_finished(req);
179         return rc;
180 }
181
182 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
183                        struct obd_info *oinfo)
184 {
185         struct ptlrpc_request *req;
186         struct ost_body       *body;
187         int                    rc;
188         ENTRY;
189
190         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
191
192         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
193         if (req == NULL)
194                 RETURN(-ENOMEM);
195
196         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
197         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
198         if (rc) {
199                 ptlrpc_request_free(req);
200                 RETURN(rc);
201         }
202
203         osc_pack_req_body(req, oinfo);
204
205         ptlrpc_request_set_replen(req);
206
207         rc = ptlrpc_queue_wait(req);
208         if (rc)
209                 GOTO(out, rc);
210
211         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
212         if (body == NULL)
213                 GOTO(out, rc = -EPROTO);
214
215         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
216                              &body->oa);
217
218         EXIT;
219 out:
220         ptlrpc_req_finished(req);
221         RETURN(rc);
222 }
223
224 static int osc_setattr_interpret(const struct lu_env *env,
225                                  struct ptlrpc_request *req,
226                                  struct osc_setattr_args *sa, int rc)
227 {
228         struct ost_body *body;
229         ENTRY;
230
231         if (rc != 0)
232                 GOTO(out, rc);
233
234         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
235         if (body == NULL)
236                 GOTO(out, rc = -EPROTO);
237
238         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
239                              &body->oa);
240 out:
241         rc = sa->sa_upcall(sa->sa_cookie, rc);
242         RETURN(rc);
243 }
244
245 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
246                       obd_enqueue_update_f upcall, void *cookie,
247                       struct ptlrpc_request_set *rqset)
248 {
249         struct ptlrpc_request   *req;
250         struct osc_setattr_args *sa;
251         int                      rc;
252         ENTRY;
253
254         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
255         if (req == NULL)
256                 RETURN(-ENOMEM);
257
258         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
259         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
260         if (rc) {
261                 ptlrpc_request_free(req);
262                 RETURN(rc);
263         }
264
265         osc_pack_req_body(req, oinfo);
266
267         ptlrpc_request_set_replen(req);
268
269         /* do mds to ost setattr asynchronously */
270         if (!rqset) {
271                 /* Do not wait for response. */
272                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
273         } else {
274                 req->rq_interpret_reply =
275                         (ptlrpc_interpterer_t)osc_setattr_interpret;
276
277                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
278                 sa = ptlrpc_req_async_args(req);
279                 sa->sa_oa = oinfo->oi_oa;
280                 sa->sa_upcall = upcall;
281                 sa->sa_cookie = cookie;
282
283                 if (rqset == PTLRPCD_SET)
284                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
285                 else
286                         ptlrpc_set_add_req(rqset, req);
287         }
288
289         RETURN(0);
290 }
291
292 static int osc_create(const struct lu_env *env, struct obd_export *exp,
293                       struct obdo *oa)
294 {
295         struct ptlrpc_request *req;
296         struct ost_body       *body;
297         int                    rc;
298         ENTRY;
299
300         LASSERT(oa != NULL);
301         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
302         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
303
304         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
305         if (req == NULL)
306                 GOTO(out, rc = -ENOMEM);
307
308         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
309         if (rc) {
310                 ptlrpc_request_free(req);
311                 GOTO(out, rc);
312         }
313
314         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
315         LASSERT(body);
316
317         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
318
319         ptlrpc_request_set_replen(req);
320
321         rc = ptlrpc_queue_wait(req);
322         if (rc)
323                 GOTO(out_req, rc);
324
325         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
326         if (body == NULL)
327                 GOTO(out_req, rc = -EPROTO);
328
329         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
330         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
331
332         oa->o_blksize = cli_brw_size(exp->exp_obd);
333         oa->o_valid |= OBD_MD_FLBLKSZ;
334
335         CDEBUG(D_HA, "transno: "LPD64"\n",
336                lustre_msg_get_transno(req->rq_repmsg));
337 out_req:
338         ptlrpc_req_finished(req);
339 out:
340         RETURN(rc);
341 }
342
343 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
344                    obd_enqueue_update_f upcall, void *cookie,
345                    struct ptlrpc_request_set *rqset)
346 {
347         struct ptlrpc_request   *req;
348         struct osc_setattr_args *sa;
349         struct ost_body         *body;
350         int                      rc;
351         ENTRY;
352
353         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
354         if (req == NULL)
355                 RETURN(-ENOMEM);
356
357         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
358         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
359         if (rc) {
360                 ptlrpc_request_free(req);
361                 RETURN(rc);
362         }
363         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
364         ptlrpc_at_set_req_timeout(req);
365
366         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
367         LASSERT(body);
368         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
369                              oinfo->oi_oa);
370         osc_pack_capa(req, body, oinfo->oi_capa);
371
372         ptlrpc_request_set_replen(req);
373
374         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
375         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
376         sa = ptlrpc_req_async_args(req);
377         sa->sa_oa     = oinfo->oi_oa;
378         sa->sa_upcall = upcall;
379         sa->sa_cookie = cookie;
380         if (rqset == PTLRPCD_SET)
381                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
382         else
383                 ptlrpc_set_add_req(rqset, req);
384
385         RETURN(0);
386 }
387
388 static int osc_sync_interpret(const struct lu_env *env,
389                               struct ptlrpc_request *req,
390                               void *arg, int rc)
391 {
392         struct osc_fsync_args *fa = arg;
393         struct ost_body *body;
394         ENTRY;
395
396         if (rc)
397                 GOTO(out, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL) {
401                 CERROR ("can't unpack ost_body\n");
402                 GOTO(out, rc = -EPROTO);
403         }
404
405         *fa->fa_oi->oi_oa = body->oa;
406 out:
407         rc = fa->fa_upcall(fa->fa_cookie, rc);
408         RETURN(rc);
409 }
410
411 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
412                   obd_enqueue_update_f upcall, void *cookie,
413                   struct ptlrpc_request_set *rqset)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct osc_fsync_args *fa;
418         int                    rc;
419         ENTRY;
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
422         if (req == NULL)
423                 RETURN(-ENOMEM);
424
425         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
426         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
427         if (rc) {
428                 ptlrpc_request_free(req);
429                 RETURN(rc);
430         }
431
432         /* overload the size and blocks fields in the oa with start/end */
433         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
434         LASSERT(body);
435         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
436                              oinfo->oi_oa);
437         osc_pack_capa(req, body, oinfo->oi_capa);
438
439         ptlrpc_request_set_replen(req);
440         req->rq_interpret_reply = osc_sync_interpret;
441
442         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
443         fa = ptlrpc_req_async_args(req);
444         fa->fa_oi = oinfo;
445         fa->fa_upcall = upcall;
446         fa->fa_cookie = cookie;
447
448         if (rqset == PTLRPCD_SET)
449                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
450         else
451                 ptlrpc_set_add_req(rqset, req);
452
453         RETURN (0);
454 }
455
456 /* Find and cancel locally locks matched by @mode in the resource found by
457  * @objid. Found locks are added into @cancel list. Returns the amount of
458  * locks added to @cancels list. */
459 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
460                                    struct list_head *cancels,
461                                    ldlm_mode_t mode, __u64 lock_flags)
462 {
463         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
464         struct ldlm_res_id res_id;
465         struct ldlm_resource *res;
466         int count;
467         ENTRY;
468
469         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
470          * export) but disabled through procfs (flag in NS).
471          *
472          * This distinguishes from a case when ELC is not supported originally,
473          * when we still want to cancel locks in advance and just cancel them
474          * locally, without sending any RPC. */
475         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
476                 RETURN(0);
477
478         ostid_build_res_name(&oa->o_oi, &res_id);
479         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
480         if (IS_ERR(res))
481                 RETURN(0);
482
483         LDLM_RESOURCE_ADDREF(res);
484         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
485                                            lock_flags, 0, NULL);
486         LDLM_RESOURCE_DELREF(res);
487         ldlm_resource_putref(res);
488         RETURN(count);
489 }
490
491 static int osc_destroy_interpret(const struct lu_env *env,
492                                  struct ptlrpc_request *req, void *data,
493                                  int rc)
494 {
495         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
496
497         atomic_dec(&cli->cl_destroy_in_flight);
498         wake_up(&cli->cl_destroy_waitq);
499         return 0;
500 }
501
502 static int osc_can_send_destroy(struct client_obd *cli)
503 {
504         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
505             cli->cl_max_rpcs_in_flight) {
506                 /* The destroy request can be sent */
507                 return 1;
508         }
509         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
510             cli->cl_max_rpcs_in_flight) {
511                 /*
512                  * The counter has been modified between the two atomic
513                  * operations.
514                  */
515                 wake_up(&cli->cl_destroy_waitq);
516         }
517         return 0;
518 }
519
520 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
521                        struct obdo *oa)
522 {
523         struct client_obd     *cli = &exp->exp_obd->u.cli;
524         struct ptlrpc_request *req;
525         struct ost_body       *body;
526         struct list_head       cancels = LIST_HEAD_INIT(cancels);
527         int rc, count;
528         ENTRY;
529
530         if (!oa) {
531                 CDEBUG(D_INFO, "oa NULL\n");
532                 RETURN(-EINVAL);
533         }
534
535         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
536                                         LDLM_FL_DISCARD_DATA);
537
538         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
539         if (req == NULL) {
540                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
541                 RETURN(-ENOMEM);
542         }
543
544         osc_set_capa_size(req, &RMF_CAPA1, NULL);
545         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
546                                0, &cancels, count);
547         if (rc) {
548                 ptlrpc_request_free(req);
549                 RETURN(rc);
550         }
551
552         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
553         ptlrpc_at_set_req_timeout(req);
554
555         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
556         LASSERT(body);
557         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
558
559         ptlrpc_request_set_replen(req);
560
561         req->rq_interpret_reply = osc_destroy_interpret;
562         if (!osc_can_send_destroy(cli)) {
563                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
564
565                 /*
566                  * Wait until the number of on-going destroy RPCs drops
567                  * under max_rpc_in_flight
568                  */
569                 l_wait_event_exclusive(cli->cl_destroy_waitq,
570                                        osc_can_send_destroy(cli), &lwi);
571         }
572
573         /* Do not wait for response */
574         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
575         RETURN(0);
576 }
577
578 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
579                                 long writing_bytes)
580 {
581         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
582
583         LASSERT(!(oa->o_valid & bits));
584
585         oa->o_valid |= bits;
586         spin_lock(&cli->cl_loi_list_lock);
587         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
588         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
589                      cli->cl_dirty_max_pages)) {
590                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
591                        cli->cl_dirty_pages, cli->cl_dirty_transit,
592                        cli->cl_dirty_max_pages);
593                 oa->o_undirty = 0;
594         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
595                             atomic_long_read(&obd_dirty_transit_pages) >
596                             (obd_max_dirty_pages + 1))) {
597                 /* The atomic_read() allowing the atomic_inc() are
598                  * not covered by a lock thus they may safely race and trip
599                  * this CERROR() unless we add in a small fudge factor (+1). */
600                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
601                        cli->cl_import->imp_obd->obd_name,
602                        atomic_long_read(&obd_dirty_pages),
603                        atomic_long_read(&obd_dirty_transit_pages),
604                        obd_max_dirty_pages);
605                 oa->o_undirty = 0;
606         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
607                             0x7fffffff)) {
608                 CERROR("dirty %lu - dirty_max %lu too big???\n",
609                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
610                 oa->o_undirty = 0;
611         } else {
612                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
613                                       PAGE_CACHE_SHIFT) *
614                                      (cli->cl_max_rpcs_in_flight + 1);
615                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
616                                     max_in_flight);
617         }
618         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
619         oa->o_dropped = cli->cl_lost_grant;
620         cli->cl_lost_grant = 0;
621         spin_unlock(&cli->cl_loi_list_lock);
622         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
623                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
624
625 }
626
627 void osc_update_next_shrink(struct client_obd *cli)
628 {
629         cli->cl_next_shrink_grant =
630                 cfs_time_shift(cli->cl_grant_shrink_interval);
631         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
632                cli->cl_next_shrink_grant);
633 }
634
635 static void __osc_update_grant(struct client_obd *cli, u64 grant)
636 {
637         spin_lock(&cli->cl_loi_list_lock);
638         cli->cl_avail_grant += grant;
639         spin_unlock(&cli->cl_loi_list_lock);
640 }
641
642 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
643 {
644         if (body->oa.o_valid & OBD_MD_FLGRANT) {
645                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
646                 __osc_update_grant(cli, body->oa.o_grant);
647         }
648 }
649
650 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
651                               u32 keylen, void *key,
652                               u32 vallen, void *val,
653                               struct ptlrpc_request_set *set);
654
655 static int osc_shrink_grant_interpret(const struct lu_env *env,
656                                       struct ptlrpc_request *req,
657                                       void *aa, int rc)
658 {
659         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
660         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
661         struct ost_body *body;
662
663         if (rc != 0) {
664                 __osc_update_grant(cli, oa->o_grant);
665                 GOTO(out, rc);
666         }
667
668         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
669         LASSERT(body);
670         osc_update_grant(cli, body);
671 out:
672         OBDO_FREE(oa);
673         return rc;
674 }
675
676 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
677 {
678         spin_lock(&cli->cl_loi_list_lock);
679         oa->o_grant = cli->cl_avail_grant / 4;
680         cli->cl_avail_grant -= oa->o_grant;
681         spin_unlock(&cli->cl_loi_list_lock);
682         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
683                 oa->o_valid |= OBD_MD_FLFLAGS;
684                 oa->o_flags = 0;
685         }
686         oa->o_flags |= OBD_FL_SHRINK_GRANT;
687         osc_update_next_shrink(cli);
688 }
689
690 /* Shrink the current grant, either from some large amount to enough for a
691  * full set of in-flight RPCs, or if we have already shrunk to that limit
692  * then to enough for a single RPC.  This avoids keeping more grant than
693  * needed, and avoids shrinking the grant piecemeal. */
694 static int osc_shrink_grant(struct client_obd *cli)
695 {
696         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
697                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
698
699         spin_lock(&cli->cl_loi_list_lock);
700         if (cli->cl_avail_grant <= target_bytes)
701                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
702         spin_unlock(&cli->cl_loi_list_lock);
703
704         return osc_shrink_grant_to_target(cli, target_bytes);
705 }
706
707 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
708 {
709         int                     rc = 0;
710         struct ost_body        *body;
711         ENTRY;
712
713         spin_lock(&cli->cl_loi_list_lock);
714         /* Don't shrink if we are already above or below the desired limit
715          * We don't want to shrink below a single RPC, as that will negatively
716          * impact block allocation and long-term performance. */
717         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
718                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
719
720         if (target_bytes >= cli->cl_avail_grant) {
721                 spin_unlock(&cli->cl_loi_list_lock);
722                 RETURN(0);
723         }
724         spin_unlock(&cli->cl_loi_list_lock);
725
726         OBD_ALLOC_PTR(body);
727         if (!body)
728                 RETURN(-ENOMEM);
729
730         osc_announce_cached(cli, &body->oa, 0);
731
732         spin_lock(&cli->cl_loi_list_lock);
733         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
734         cli->cl_avail_grant = target_bytes;
735         spin_unlock(&cli->cl_loi_list_lock);
736         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
737                 body->oa.o_valid |= OBD_MD_FLFLAGS;
738                 body->oa.o_flags = 0;
739         }
740         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
741         osc_update_next_shrink(cli);
742
743         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
744                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
745                                 sizeof(*body), body, NULL);
746         if (rc != 0)
747                 __osc_update_grant(cli, body->oa.o_grant);
748         OBD_FREE_PTR(body);
749         RETURN(rc);
750 }
751
752 static int osc_should_shrink_grant(struct client_obd *client)
753 {
754         cfs_time_t time = cfs_time_current();
755         cfs_time_t next_shrink = client->cl_next_shrink_grant;
756
757         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
758              OBD_CONNECT_GRANT_SHRINK) == 0)
759                 return 0;
760
761         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
762                 /* Get the current RPC size directly, instead of going via:
763                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
764                  * Keep comment here so that it can be found by searching. */
765                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
766
767                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
768                     client->cl_avail_grant > brw_size)
769                         return 1;
770                 else
771                         osc_update_next_shrink(client);
772         }
773         return 0;
774 }
775
776 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
777 {
778         struct client_obd *client;
779
780         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
781                 if (osc_should_shrink_grant(client))
782                         osc_shrink_grant(client);
783         }
784         return 0;
785 }
786
787 static int osc_add_shrink_grant(struct client_obd *client)
788 {
789         int rc;
790
791         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
792                                        TIMEOUT_GRANT,
793                                        osc_grant_shrink_grant_cb, NULL,
794                                        &client->cl_grant_shrink_list);
795         if (rc) {
796                 CERROR("add grant client %s error %d\n",
797                         client->cl_import->imp_obd->obd_name, rc);
798                 return rc;
799         }
800         CDEBUG(D_CACHE, "add grant client %s \n",
801                client->cl_import->imp_obd->obd_name);
802         osc_update_next_shrink(client);
803         return 0;
804 }
805
806 static int osc_del_shrink_grant(struct client_obd *client)
807 {
808         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
809                                          TIMEOUT_GRANT);
810 }
811
812 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
813 {
814         /*
815          * ocd_grant is the total grant amount we're expect to hold: if we've
816          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
817          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
818          * dirty.
819          *
820          * race is tolerable here: if we're evicted, but imp_state already
821          * left EVICTED state, then cl_dirty_pages must be 0 already.
822          */
823         spin_lock(&cli->cl_loi_list_lock);
824         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
825                 cli->cl_avail_grant = ocd->ocd_grant;
826         else
827                 cli->cl_avail_grant = ocd->ocd_grant -
828                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
829
830         if (cli->cl_avail_grant < 0) {
831                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
832                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
833                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
834                 /* workaround for servers which do not have the patch from
835                  * LU-2679 */
836                 cli->cl_avail_grant = ocd->ocd_grant;
837         }
838
839         /* determine the appropriate chunk size used by osc_extent. */
840         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
841         spin_unlock(&cli->cl_loi_list_lock);
842
843         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
844                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
845                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
846
847         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
848             list_empty(&cli->cl_grant_shrink_list))
849                 osc_add_shrink_grant(cli);
850 }
851
852 /* We assume that the reason this OSC got a short read is because it read
853  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
854  * via the LOV, and it _knows_ it's reading inside the file, it's just that
855  * this stripe never got written at or beyond this stripe offset yet. */
856 static void handle_short_read(int nob_read, size_t page_count,
857                               struct brw_page **pga)
858 {
859         char *ptr;
860         int i = 0;
861
862         /* skip bytes read OK */
863         while (nob_read > 0) {
864                 LASSERT (page_count > 0);
865
866                 if (pga[i]->count > nob_read) {
867                         /* EOF inside this page */
868                         ptr = kmap(pga[i]->pg) +
869                                 (pga[i]->off & ~PAGE_MASK);
870                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
871                         kunmap(pga[i]->pg);
872                         page_count--;
873                         i++;
874                         break;
875                 }
876
877                 nob_read -= pga[i]->count;
878                 page_count--;
879                 i++;
880         }
881
882         /* zero remaining pages */
883         while (page_count-- > 0) {
884                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
885                 memset(ptr, 0, pga[i]->count);
886                 kunmap(pga[i]->pg);
887                 i++;
888         }
889 }
890
891 static int check_write_rcs(struct ptlrpc_request *req,
892                            int requested_nob, int niocount,
893                            size_t page_count, struct brw_page **pga)
894 {
895         int     i;
896         __u32   *remote_rcs;
897
898         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
899                                                   sizeof(*remote_rcs) *
900                                                   niocount);
901         if (remote_rcs == NULL) {
902                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
903                 return(-EPROTO);
904         }
905
906         /* return error if any niobuf was in error */
907         for (i = 0; i < niocount; i++) {
908                 if ((int)remote_rcs[i] < 0)
909                         return(remote_rcs[i]);
910
911                 if (remote_rcs[i] != 0) {
912                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
913                                 i, remote_rcs[i], req);
914                         return(-EPROTO);
915                 }
916         }
917
918         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
919                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
920                        req->rq_bulk->bd_nob_transferred, requested_nob);
921                 return(-EPROTO);
922         }
923
924         return (0);
925 }
926
927 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
928 {
929         if (p1->flag != p2->flag) {
930                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
931                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
932                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
933
934                 /* warn if we try to combine flags that we don't know to be
935                  * safe to combine */
936                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
937                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
938                               "report this at https://jira.hpdd.intel.com/\n",
939                               p1->flag, p2->flag);
940                 }
941                 return 0;
942         }
943
944         return (p1->off + p1->count == p2->off);
945 }
946
947 static u32 osc_checksum_bulk(int nob, size_t pg_count,
948                              struct brw_page **pga, int opc,
949                              cksum_type_t cksum_type)
950 {
951         u32                             cksum;
952         int                             i = 0;
953         struct cfs_crypto_hash_desc     *hdesc;
954         unsigned int                    bufsize;
955         int                             err;
956         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
957
958         LASSERT(pg_count > 0);
959
960         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
961         if (IS_ERR(hdesc)) {
962                 CERROR("Unable to initialize checksum hash %s\n",
963                        cfs_crypto_hash_name(cfs_alg));
964                 return PTR_ERR(hdesc);
965         }
966
967         while (nob > 0 && pg_count > 0) {
968                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
969
970                 /* corrupt the data before we compute the checksum, to
971                  * simulate an OST->client data error */
972                 if (i == 0 && opc == OST_READ &&
973                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
974                         unsigned char *ptr = kmap(pga[i]->pg);
975                         int off = pga[i]->off & ~PAGE_MASK;
976
977                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
978                         kunmap(pga[i]->pg);
979                 }
980                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
981                                             pga[i]->off & ~PAGE_MASK,
982                                             count);
983                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
984                                (int)(pga[i]->off & ~PAGE_MASK));
985
986                 nob -= pga[i]->count;
987                 pg_count--;
988                 i++;
989         }
990
991         bufsize = sizeof(cksum);
992         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
993
994         /* For sending we only compute the wrong checksum instead
995          * of corrupting the data so it is still correct on a redo */
996         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
997                 cksum++;
998
999         return cksum;
1000 }
1001
1002 static int
1003 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1004                      u32 page_count, struct brw_page **pga,
1005                      struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1006                      int reserve, int resend)
1007 {
1008         struct ptlrpc_request   *req;
1009         struct ptlrpc_bulk_desc *desc;
1010         struct ost_body         *body;
1011         struct obd_ioobj        *ioobj;
1012         struct niobuf_remote    *niobuf;
1013         int niocount, i, requested_nob, opc, rc;
1014         struct osc_brw_async_args *aa;
1015         struct req_capsule      *pill;
1016         struct brw_page *pg_prev;
1017
1018         ENTRY;
1019         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1020                 RETURN(-ENOMEM); /* Recoverable */
1021         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1022                 RETURN(-EINVAL); /* Fatal */
1023
1024         if ((cmd & OBD_BRW_WRITE) != 0) {
1025                 opc = OST_WRITE;
1026                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1027                                                 cli->cl_import->imp_rq_pool,
1028                                                 &RQF_OST_BRW_WRITE);
1029         } else {
1030                 opc = OST_READ;
1031                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1032         }
1033         if (req == NULL)
1034                 RETURN(-ENOMEM);
1035
1036         for (niocount = i = 1; i < page_count; i++) {
1037                 if (!can_merge_pages(pga[i - 1], pga[i]))
1038                         niocount++;
1039         }
1040
1041         pill = &req->rq_pill;
1042         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1043                              sizeof(*ioobj));
1044         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1045                              niocount * sizeof(*niobuf));
1046         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1047
1048         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1049         if (rc) {
1050                 ptlrpc_request_free(req);
1051                 RETURN(rc);
1052         }
1053         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1054         ptlrpc_at_set_req_timeout(req);
1055         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1056          * retry logic */
1057         req->rq_no_retry_einprogress = 1;
1058
1059         desc = ptlrpc_prep_bulk_imp(req, page_count,
1060                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1061                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1062                 OST_BULK_PORTAL);
1063
1064         if (desc == NULL)
1065                 GOTO(out, rc = -ENOMEM);
1066         /* NB request now owns desc and will free it when it gets freed */
1067
1068         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1069         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1070         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1071         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1072
1073         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1074
1075         obdo_to_ioobj(oa, ioobj);
1076         ioobj->ioo_bufcnt = niocount;
1077         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1078          * that might be send for this request.  The actual number is decided
1079          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1080          * "max - 1" for old client compatibility sending "0", and also so the
1081          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1082         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1083         osc_pack_capa(req, body, ocapa);
1084         LASSERT(page_count > 0);
1085         pg_prev = pga[0];
1086         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1087                 struct brw_page *pg = pga[i];
1088                 int poff = pg->off & ~PAGE_MASK;
1089
1090                 LASSERT(pg->count > 0);
1091                 /* make sure there is no gap in the middle of page array */
1092                 LASSERTF(page_count == 1 ||
1093                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1094                           ergo(i > 0 && i < page_count - 1,
1095                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1096                           ergo(i == page_count - 1, poff == 0)),
1097                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1098                          i, page_count, pg, pg->off, pg->count);
1099                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1100                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1101                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1102                          i, page_count,
1103                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1104                          pg_prev->pg, page_private(pg_prev->pg),
1105                          pg_prev->pg->index, pg_prev->off);
1106                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1107                         (pg->flag & OBD_BRW_SRVLOCK));
1108
1109                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1110                 requested_nob += pg->count;
1111
1112                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1113                         niobuf--;
1114                         niobuf->rnb_len += pg->count;
1115                 } else {
1116                         niobuf->rnb_offset = pg->off;
1117                         niobuf->rnb_len    = pg->count;
1118                         niobuf->rnb_flags  = pg->flag;
1119                 }
1120                 pg_prev = pg;
1121         }
1122
1123         LASSERTF((void *)(niobuf - niocount) ==
1124                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1125                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1126                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1127
1128         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1129         if (resend) {
1130                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1131                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1132                         body->oa.o_flags = 0;
1133                 }
1134                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1135         }
1136
1137         if (osc_should_shrink_grant(cli))
1138                 osc_shrink_grant_local(cli, &body->oa);
1139
1140         /* size[REQ_REC_OFF] still sizeof (*body) */
1141         if (opc == OST_WRITE) {
1142                 if (cli->cl_checksum &&
1143                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1144                         /* store cl_cksum_type in a local variable since
1145                          * it can be changed via lprocfs */
1146                         cksum_type_t cksum_type = cli->cl_cksum_type;
1147
1148                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1149                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1150                                 body->oa.o_flags = 0;
1151                         }
1152                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1153                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1154                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1155                                                              page_count, pga,
1156                                                              OST_WRITE,
1157                                                              cksum_type);
1158                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1159                                body->oa.o_cksum);
1160                         /* save this in 'oa', too, for later checking */
1161                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1162                         oa->o_flags |= cksum_type_pack(cksum_type);
1163                 } else {
1164                         /* clear out the checksum flag, in case this is a
1165                          * resend but cl_checksum is no longer set. b=11238 */
1166                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1167                 }
1168                 oa->o_cksum = body->oa.o_cksum;
1169                 /* 1 RC per niobuf */
1170                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1171                                      sizeof(__u32) * niocount);
1172         } else {
1173                 if (cli->cl_checksum &&
1174                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1175                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1176                                 body->oa.o_flags = 0;
1177                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1178                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1179                 }
1180         }
1181         ptlrpc_request_set_replen(req);
1182
1183         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1184         aa = ptlrpc_req_async_args(req);
1185         aa->aa_oa = oa;
1186         aa->aa_requested_nob = requested_nob;
1187         aa->aa_nio_count = niocount;
1188         aa->aa_page_count = page_count;
1189         aa->aa_resends = 0;
1190         aa->aa_ppga = pga;
1191         aa->aa_cli = cli;
1192         INIT_LIST_HEAD(&aa->aa_oaps);
1193         if (ocapa && reserve)
1194                 aa->aa_ocapa = capa_get(ocapa);
1195
1196         *reqp = req;
1197         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1198         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1199                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1200                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1201         RETURN(0);
1202
1203  out:
1204         ptlrpc_req_finished(req);
1205         RETURN(rc);
1206 }
1207
1208 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1209                                 __u32 client_cksum, __u32 server_cksum, int nob,
1210                                 size_t page_count, struct brw_page **pga,
1211                                 cksum_type_t client_cksum_type)
1212 {
1213         __u32 new_cksum;
1214         char *msg;
1215         cksum_type_t cksum_type;
1216
1217         if (server_cksum == client_cksum) {
1218                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1219                 return 0;
1220         }
1221
1222         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1223                                        oa->o_flags : 0);
1224         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1225                                       cksum_type);
1226
1227         if (cksum_type != client_cksum_type)
1228                 msg = "the server did not use the checksum type specified in "
1229                       "the original request - likely a protocol problem";
1230         else if (new_cksum == server_cksum)
1231                 msg = "changed on the client after we checksummed it - "
1232                       "likely false positive due to mmap IO (bug 11742)";
1233         else if (new_cksum == client_cksum)
1234                 msg = "changed in transit before arrival at OST";
1235         else
1236                 msg = "changed in transit AND doesn't match the original - "
1237                       "likely false positive due to mmap IO (bug 11742)";
1238
1239         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1240                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1241                            msg, libcfs_nid2str(peer->nid),
1242                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1243                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1244                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1245                            POSTID(&oa->o_oi), pga[0]->off,
1246                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1247         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1248                "client csum now %x\n", client_cksum, client_cksum_type,
1249                server_cksum, cksum_type, new_cksum);
1250         return 1;
1251 }
1252
1253 /* Note rc enters this function as number of bytes transferred */
1254 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1255 {
1256         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1257         const lnet_process_id_t *peer =
1258                         &req->rq_import->imp_connection->c_peer;
1259         struct client_obd *cli = aa->aa_cli;
1260         struct ost_body *body;
1261         u32 client_cksum = 0;
1262         ENTRY;
1263
1264         if (rc < 0 && rc != -EDQUOT) {
1265                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1266                 RETURN(rc);
1267         }
1268
1269         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1270         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1271         if (body == NULL) {
1272                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1273                 RETURN(-EPROTO);
1274         }
1275
1276         /* set/clear over quota flag for a uid/gid */
1277         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1278             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1279                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1280
1281                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1282                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1283                        body->oa.o_flags);
1284                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1285         }
1286
1287         osc_update_grant(cli, body);
1288
1289         if (rc < 0)
1290                 RETURN(rc);
1291
1292         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1293                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1294
1295         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1296                 if (rc > 0) {
1297                         CERROR("Unexpected +ve rc %d\n", rc);
1298                         RETURN(-EPROTO);
1299                 }
1300                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1301
1302                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1303                         RETURN(-EAGAIN);
1304
1305                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1306                     check_write_checksum(&body->oa, peer, client_cksum,
1307                                          body->oa.o_cksum, aa->aa_requested_nob,
1308                                          aa->aa_page_count, aa->aa_ppga,
1309                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1310                         RETURN(-EAGAIN);
1311
1312                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1313                                      aa->aa_page_count, aa->aa_ppga);
1314                 GOTO(out, rc);
1315         }
1316
1317         /* The rest of this function executes only for OST_READs */
1318
1319         /* if unwrap_bulk failed, return -EAGAIN to retry */
1320         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1321         if (rc < 0)
1322                 GOTO(out, rc = -EAGAIN);
1323
1324         if (rc > aa->aa_requested_nob) {
1325                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1326                        aa->aa_requested_nob);
1327                 RETURN(-EPROTO);
1328         }
1329
1330         if (rc != req->rq_bulk->bd_nob_transferred) {
1331                 CERROR ("Unexpected rc %d (%d transferred)\n",
1332                         rc, req->rq_bulk->bd_nob_transferred);
1333                 return (-EPROTO);
1334         }
1335
1336         if (rc < aa->aa_requested_nob)
1337                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1338
1339         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1340                 static int cksum_counter;
1341                 u32        server_cksum = body->oa.o_cksum;
1342                 char      *via = "";
1343                 char      *router = "";
1344                 cksum_type_t cksum_type;
1345
1346                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1347                                                body->oa.o_flags : 0);
1348                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1349                                                  aa->aa_ppga, OST_READ,
1350                                                  cksum_type);
1351
1352                 if (peer->nid != req->rq_bulk->bd_sender) {
1353                         via = " via ";
1354                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1355                 }
1356
1357                 if (server_cksum != client_cksum) {
1358                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1359                                            "%s%s%s inode "DFID" object "DOSTID
1360                                            " extent ["LPU64"-"LPU64"]\n",
1361                                            req->rq_import->imp_obd->obd_name,
1362                                            libcfs_nid2str(peer->nid),
1363                                            via, router,
1364                                            body->oa.o_valid & OBD_MD_FLFID ?
1365                                                 body->oa.o_parent_seq : (__u64)0,
1366                                            body->oa.o_valid & OBD_MD_FLFID ?
1367                                                 body->oa.o_parent_oid : 0,
1368                                            body->oa.o_valid & OBD_MD_FLFID ?
1369                                                 body->oa.o_parent_ver : 0,
1370                                            POSTID(&body->oa.o_oi),
1371                                            aa->aa_ppga[0]->off,
1372                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1373                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1374                                                                         1);
1375                         CERROR("client %x, server %x, cksum_type %x\n",
1376                                client_cksum, server_cksum, cksum_type);
1377                         cksum_counter = 0;
1378                         aa->aa_oa->o_cksum = client_cksum;
1379                         rc = -EAGAIN;
1380                 } else {
1381                         cksum_counter++;
1382                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1383                         rc = 0;
1384                 }
1385         } else if (unlikely(client_cksum)) {
1386                 static int cksum_missed;
1387
1388                 cksum_missed++;
1389                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1390                         CERROR("Checksum %u requested from %s but not sent\n",
1391                                cksum_missed, libcfs_nid2str(peer->nid));
1392         } else {
1393                 rc = 0;
1394         }
1395 out:
1396         if (rc >= 0)
1397                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1398                                      aa->aa_oa, &body->oa);
1399
1400         RETURN(rc);
1401 }
1402
1403 static int osc_brw_redo_request(struct ptlrpc_request *request,
1404                                 struct osc_brw_async_args *aa, int rc)
1405 {
1406         struct ptlrpc_request *new_req;
1407         struct osc_brw_async_args *new_aa;
1408         struct osc_async_page *oap;
1409         ENTRY;
1410
1411         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1412                   "redo for recoverable error %d", rc);
1413
1414         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1415                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1416                                   aa->aa_cli, aa->aa_oa,
1417                                   aa->aa_page_count, aa->aa_ppga,
1418                                   &new_req, aa->aa_ocapa, 0, 1);
1419         if (rc)
1420                 RETURN(rc);
1421
1422         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1423                 if (oap->oap_request != NULL) {
1424                         LASSERTF(request == oap->oap_request,
1425                                  "request %p != oap_request %p\n",
1426                                  request, oap->oap_request);
1427                         if (oap->oap_interrupted) {
1428                                 ptlrpc_req_finished(new_req);
1429                                 RETURN(-EINTR);
1430                         }
1431                 }
1432         }
1433         /* New request takes over pga and oaps from old request.
1434          * Note that copying a list_head doesn't work, need to move it... */
1435         aa->aa_resends++;
1436         new_req->rq_interpret_reply = request->rq_interpret_reply;
1437         new_req->rq_async_args = request->rq_async_args;
1438         new_req->rq_commit_cb = request->rq_commit_cb;
1439         /* cap resend delay to the current request timeout, this is similar to
1440          * what ptlrpc does (see after_reply()) */
1441         if (aa->aa_resends > new_req->rq_timeout)
1442                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1443         else
1444                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1445         new_req->rq_generation_set = 1;
1446         new_req->rq_import_generation = request->rq_import_generation;
1447
1448         new_aa = ptlrpc_req_async_args(new_req);
1449
1450         INIT_LIST_HEAD(&new_aa->aa_oaps);
1451         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1452         INIT_LIST_HEAD(&new_aa->aa_exts);
1453         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1454         new_aa->aa_resends = aa->aa_resends;
1455
1456         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1457                 if (oap->oap_request) {
1458                         ptlrpc_req_finished(oap->oap_request);
1459                         oap->oap_request = ptlrpc_request_addref(new_req);
1460                 }
1461         }
1462
1463         new_aa->aa_ocapa = aa->aa_ocapa;
1464         aa->aa_ocapa = NULL;
1465
1466         /* XXX: This code will run into problem if we're going to support
1467          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1468          * and wait for all of them to be finished. We should inherit request
1469          * set from old request. */
1470         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1471
1472         DEBUG_REQ(D_INFO, new_req, "new request");
1473         RETURN(0);
1474 }
1475
1476 /*
1477  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1478  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1479  * fine for our small page arrays and doesn't require allocation.  its an
1480  * insertion sort that swaps elements that are strides apart, shrinking the
1481  * stride down until its '1' and the array is sorted.
1482  */
1483 static void sort_brw_pages(struct brw_page **array, int num)
1484 {
1485         int stride, i, j;
1486         struct brw_page *tmp;
1487
1488         if (num == 1)
1489                 return;
1490         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1491                 ;
1492
1493         do {
1494                 stride /= 3;
1495                 for (i = stride ; i < num ; i++) {
1496                         tmp = array[i];
1497                         j = i;
1498                         while (j >= stride && array[j - stride]->off > tmp->off) {
1499                                 array[j] = array[j - stride];
1500                                 j -= stride;
1501                         }
1502                         array[j] = tmp;
1503                 }
1504         } while (stride > 1);
1505 }
1506
1507 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1508 {
1509         LASSERT(ppga != NULL);
1510         OBD_FREE(ppga, sizeof(*ppga) * count);
1511 }
1512
1513 static int brw_interpret(const struct lu_env *env,
1514                          struct ptlrpc_request *req, void *data, int rc)
1515 {
1516         struct osc_brw_async_args *aa = data;
1517         struct osc_extent *ext;
1518         struct osc_extent *tmp;
1519         struct client_obd *cli = aa->aa_cli;
1520         ENTRY;
1521
1522         rc = osc_brw_fini_request(req, rc);
1523         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1524         /* When server return -EINPROGRESS, client should always retry
1525          * regardless of the number of times the bulk was resent already. */
1526         if (osc_recoverable_error(rc)) {
1527                 if (req->rq_import_generation !=
1528                     req->rq_import->imp_generation) {
1529                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1530                                ""DOSTID", rc = %d.\n",
1531                                req->rq_import->imp_obd->obd_name,
1532                                POSTID(&aa->aa_oa->o_oi), rc);
1533                 } else if (rc == -EINPROGRESS ||
1534                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1535                         rc = osc_brw_redo_request(req, aa, rc);
1536                 } else {
1537                         CERROR("%s: too many resent retries for object: "
1538                                ""LPU64":"LPU64", rc = %d.\n",
1539                                req->rq_import->imp_obd->obd_name,
1540                                POSTID(&aa->aa_oa->o_oi), rc);
1541                 }
1542
1543                 if (rc == 0)
1544                         RETURN(0);
1545                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1546                         rc = -EIO;
1547         }
1548
1549         if (aa->aa_ocapa) {
1550                 capa_put(aa->aa_ocapa);
1551                 aa->aa_ocapa = NULL;
1552         }
1553
1554         if (rc == 0) {
1555                 struct obdo *oa = aa->aa_oa;
1556                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1557                 unsigned long valid = 0;
1558                 struct cl_object *obj;
1559                 struct osc_async_page *last;
1560
1561                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1562                 obj = osc2cl(last->oap_obj);
1563
1564                 cl_object_attr_lock(obj);
1565                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1566                         attr->cat_blocks = oa->o_blocks;
1567                         valid |= CAT_BLOCKS;
1568                 }
1569                 if (oa->o_valid & OBD_MD_FLMTIME) {
1570                         attr->cat_mtime = oa->o_mtime;
1571                         valid |= CAT_MTIME;
1572                 }
1573                 if (oa->o_valid & OBD_MD_FLATIME) {
1574                         attr->cat_atime = oa->o_atime;
1575                         valid |= CAT_ATIME;
1576                 }
1577                 if (oa->o_valid & OBD_MD_FLCTIME) {
1578                         attr->cat_ctime = oa->o_ctime;
1579                         valid |= CAT_CTIME;
1580                 }
1581
1582                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1583                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1584                         loff_t last_off = last->oap_count + last->oap_obj_off +
1585                                 last->oap_page_off;
1586
1587                         /* Change file size if this is an out of quota or
1588                          * direct IO write and it extends the file size */
1589                         if (loi->loi_lvb.lvb_size < last_off) {
1590                                 attr->cat_size = last_off;
1591                                 valid |= CAT_SIZE;
1592                         }
1593                         /* Extend KMS if it's not a lockless write */
1594                         if (loi->loi_kms < last_off &&
1595                             oap2osc_page(last)->ops_srvlock == 0) {
1596                                 attr->cat_kms = last_off;
1597                                 valid |= CAT_KMS;
1598                         }
1599                 }
1600
1601                 if (valid != 0)
1602                         cl_object_attr_update(env, obj, attr, valid);
1603                 cl_object_attr_unlock(obj);
1604         }
1605         OBDO_FREE(aa->aa_oa);
1606
1607         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1608                 osc_inc_unstable_pages(req);
1609
1610         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1611                 list_del_init(&ext->oe_link);
1612                 osc_extent_finish(env, ext, 1, rc);
1613         }
1614         LASSERT(list_empty(&aa->aa_exts));
1615         LASSERT(list_empty(&aa->aa_oaps));
1616
1617         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1618                           req->rq_bulk->bd_nob_transferred);
1619         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1620         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1621
1622         spin_lock(&cli->cl_loi_list_lock);
1623         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1624          * is called so we know whether to go to sync BRWs or wait for more
1625          * RPCs to complete */
1626         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1627                 cli->cl_w_in_flight--;
1628         else
1629                 cli->cl_r_in_flight--;
1630         osc_wake_cache_waiters(cli);
1631         spin_unlock(&cli->cl_loi_list_lock);
1632
1633         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1634         RETURN(rc);
1635 }
1636
1637 static void brw_commit(struct ptlrpc_request *req)
1638 {
1639         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1640          * this called via the rq_commit_cb, I need to ensure
1641          * osc_dec_unstable_pages is still called. Otherwise unstable
1642          * pages may be leaked. */
1643         spin_lock(&req->rq_lock);
1644         if (likely(req->rq_unstable)) {
1645                 req->rq_unstable = 0;
1646                 spin_unlock(&req->rq_lock);
1647
1648                 osc_dec_unstable_pages(req);
1649         } else {
1650                 req->rq_committed = 1;
1651                 spin_unlock(&req->rq_lock);
1652         }
1653 }
1654
1655 /**
1656  * Build an RPC by the list of extent @ext_list. The caller must ensure
1657  * that the total pages in this list are NOT over max pages per RPC.
1658  * Extents in the list must be in OES_RPC state.
1659  */
1660 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1661                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1662 {
1663         struct ptlrpc_request           *req = NULL;
1664         struct osc_extent               *ext;
1665         struct brw_page                 **pga = NULL;
1666         struct osc_brw_async_args       *aa = NULL;
1667         struct obdo                     *oa = NULL;
1668         struct osc_async_page           *oap;
1669         struct osc_async_page           *tmp;
1670         struct cl_req                   *clerq = NULL;
1671         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1672                                                                       CRT_READ;
1673         struct cl_req_attr              *crattr = NULL;
1674         loff_t                          starting_offset = OBD_OBJECT_EOF;
1675         loff_t                          ending_offset = 0;
1676         int                             mpflag = 0;
1677         int                             mem_tight = 0;
1678         int                             page_count = 0;
1679         bool                            soft_sync = false;
1680         int                             i;
1681         int                             rc;
1682         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1683         struct ost_body                 *body;
1684         ENTRY;
1685         LASSERT(!list_empty(ext_list));
1686
1687         /* add pages into rpc_list to build BRW rpc */
1688         list_for_each_entry(ext, ext_list, oe_link) {
1689                 LASSERT(ext->oe_state == OES_RPC);
1690                 mem_tight |= ext->oe_memalloc;
1691                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1692                         ++page_count;
1693                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1694                         if (starting_offset == OBD_OBJECT_EOF ||
1695                             starting_offset > oap->oap_obj_off)
1696                                 starting_offset = oap->oap_obj_off;
1697                         else
1698                                 LASSERT(oap->oap_page_off == 0);
1699                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1700                                 ending_offset = oap->oap_obj_off +
1701                                                 oap->oap_count;
1702                         else
1703                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1704                                         PAGE_CACHE_SIZE);
1705                 }
1706         }
1707
1708         soft_sync = osc_over_unstable_soft_limit(cli);
1709         if (mem_tight)
1710                 mpflag = cfs_memory_pressure_get_and_set();
1711
1712         OBD_ALLOC(crattr, sizeof(*crattr));
1713         if (crattr == NULL)
1714                 GOTO(out, rc = -ENOMEM);
1715
1716         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1717         if (pga == NULL)
1718                 GOTO(out, rc = -ENOMEM);
1719
1720         OBDO_ALLOC(oa);
1721         if (oa == NULL)
1722                 GOTO(out, rc = -ENOMEM);
1723
1724         i = 0;
1725         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1726                 struct cl_page *page = oap2cl_page(oap);
1727                 if (clerq == NULL) {
1728                         clerq = cl_req_alloc(env, page, crt,
1729                                              1 /* only 1-object rpcs for now */);
1730                         if (IS_ERR(clerq))
1731                                 GOTO(out, rc = PTR_ERR(clerq));
1732                 }
1733                 if (mem_tight)
1734                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1735                 if (soft_sync)
1736                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1737                 pga[i] = &oap->oap_brw_page;
1738                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1739                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1740                        pga[i]->pg, page_index(oap->oap_page), oap,
1741                        pga[i]->flag);
1742                 i++;
1743                 cl_req_page_add(env, clerq, page);
1744         }
1745
1746         /* always get the data for the obdo for the rpc */
1747         LASSERT(clerq != NULL);
1748         crattr->cra_oa = oa;
1749         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1750
1751         rc = cl_req_prep(env, clerq);
1752         if (rc != 0) {
1753                 CERROR("cl_req_prep failed: %d\n", rc);
1754                 GOTO(out, rc);
1755         }
1756
1757         sort_brw_pages(pga, page_count);
1758         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1759                                   crattr->cra_capa, 1, 0);
1760         if (rc != 0) {
1761                 CERROR("prep_req failed: %d\n", rc);
1762                 GOTO(out, rc);
1763         }
1764
1765         req->rq_commit_cb = brw_commit;
1766         req->rq_interpret_reply = brw_interpret;
1767
1768         if (mem_tight != 0)
1769                 req->rq_memalloc = 1;
1770
1771         /* Need to update the timestamps after the request is built in case
1772          * we race with setattr (locally or in queue at OST).  If OST gets
1773          * later setattr before earlier BRW (as determined by the request xid),
1774          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1775          * way to do this in a single call.  bug 10150 */
1776         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1777         crattr->cra_oa = &body->oa;
1778         cl_req_attr_set(env, clerq, crattr,
1779                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1780
1781         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1782
1783         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1784         aa = ptlrpc_req_async_args(req);
1785         INIT_LIST_HEAD(&aa->aa_oaps);
1786         list_splice_init(&rpc_list, &aa->aa_oaps);
1787         INIT_LIST_HEAD(&aa->aa_exts);
1788         list_splice_init(ext_list, &aa->aa_exts);
1789         aa->aa_clerq = clerq;
1790
1791         /* queued sync pages can be torn down while the pages
1792          * were between the pending list and the rpc */
1793         tmp = NULL;
1794         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1795                 /* only one oap gets a request reference */
1796                 if (tmp == NULL)
1797                         tmp = oap;
1798                 if (oap->oap_interrupted && !req->rq_intr) {
1799                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1800                                         oap, req);
1801                         ptlrpc_mark_interrupted(req);
1802                 }
1803         }
1804         if (tmp != NULL)
1805                 tmp->oap_request = ptlrpc_request_addref(req);
1806
1807         spin_lock(&cli->cl_loi_list_lock);
1808         starting_offset >>= PAGE_CACHE_SHIFT;
1809         if (cmd == OBD_BRW_READ) {
1810                 cli->cl_r_in_flight++;
1811                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1812                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1813                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1814                                       starting_offset + 1);
1815         } else {
1816                 cli->cl_w_in_flight++;
1817                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1818                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1819                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1820                                       starting_offset + 1);
1821         }
1822         spin_unlock(&cli->cl_loi_list_lock);
1823
1824         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1825                   page_count, aa, cli->cl_r_in_flight,
1826                   cli->cl_w_in_flight);
1827
1828         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1829          * see which CPU/NUMA node the majority of pages were allocated
1830          * on, and try to assign the async RPC to the CPU core
1831          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1832          *
1833          * But on the other hand, we expect that multiple ptlrpcd
1834          * threads and the initial write sponsor can run in parallel,
1835          * especially when data checksum is enabled, which is CPU-bound
1836          * operation and single ptlrpcd thread cannot process in time.
1837          * So more ptlrpcd threads sharing BRW load
1838          * (with PDL_POLICY_ROUND) seems better.
1839          */
1840         ptlrpcd_add_req(req, pol, -1);
1841         rc = 0;
1842         EXIT;
1843
1844 out:
1845         if (mem_tight != 0)
1846                 cfs_memory_pressure_restore(mpflag);
1847
1848         if (crattr != NULL) {
1849                 capa_put(crattr->cra_capa);
1850                 OBD_FREE(crattr, sizeof(*crattr));
1851         }
1852
1853         if (rc != 0) {
1854                 LASSERT(req == NULL);
1855
1856                 if (oa)
1857                         OBDO_FREE(oa);
1858                 if (pga)
1859                         OBD_FREE(pga, sizeof(*pga) * page_count);
1860                 /* this should happen rarely and is pretty bad, it makes the
1861                  * pending list not follow the dirty order */
1862                 while (!list_empty(ext_list)) {
1863                         ext = list_entry(ext_list->next, struct osc_extent,
1864                                          oe_link);
1865                         list_del_init(&ext->oe_link);
1866                         osc_extent_finish(env, ext, 0, rc);
1867                 }
1868                 if (clerq && !IS_ERR(clerq))
1869                         cl_req_completion(env, clerq, rc);
1870         }
1871         RETURN(rc);
1872 }
1873
1874 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1875                                         struct ldlm_enqueue_info *einfo)
1876 {
1877         void *data = einfo->ei_cbdata;
1878         int set = 0;
1879
1880         LASSERT(lock != NULL);
1881         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1882         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1883         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1884         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1885
1886         lock_res_and_lock(lock);
1887
1888         if (lock->l_ast_data == NULL)
1889                 lock->l_ast_data = data;
1890         if (lock->l_ast_data == data)
1891                 set = 1;
1892
1893         unlock_res_and_lock(lock);
1894
1895         return set;
1896 }
1897
1898 static int osc_set_data_with_check(struct lustre_handle *lockh,
1899                                    struct ldlm_enqueue_info *einfo)
1900 {
1901         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1902         int set = 0;
1903
1904         if (lock != NULL) {
1905                 set = osc_set_lock_data_with_check(lock, einfo);
1906                 LDLM_LOCK_PUT(lock);
1907         } else
1908                 CERROR("lockh %p, data %p - client evicted?\n",
1909                        lockh, einfo->ei_cbdata);
1910         return set;
1911 }
1912
1913 static int osc_enqueue_fini(struct ptlrpc_request *req,
1914                             osc_enqueue_upcall_f upcall, void *cookie,
1915                             struct lustre_handle *lockh, ldlm_mode_t mode,
1916                             __u64 *flags, int agl, int errcode)
1917 {
1918         bool intent = *flags & LDLM_FL_HAS_INTENT;
1919         int rc;
1920         ENTRY;
1921
1922         /* The request was created before ldlm_cli_enqueue call. */
1923         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1924                 struct ldlm_reply *rep;
1925
1926                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1927                 LASSERT(rep != NULL);
1928
1929                 rep->lock_policy_res1 =
1930                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1931                 if (rep->lock_policy_res1)
1932                         errcode = rep->lock_policy_res1;
1933                 if (!agl)
1934                         *flags |= LDLM_FL_LVB_READY;
1935         } else if (errcode == ELDLM_OK) {
1936                 *flags |= LDLM_FL_LVB_READY;
1937         }
1938
1939         /* Call the update callback. */
1940         rc = (*upcall)(cookie, lockh, errcode);
1941
1942         /* release the reference taken in ldlm_cli_enqueue() */
1943         if (errcode == ELDLM_LOCK_MATCHED)
1944                 errcode = ELDLM_OK;
1945         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1946                 ldlm_lock_decref(lockh, mode);
1947
1948         RETURN(rc);
1949 }
1950
1951 static int osc_enqueue_interpret(const struct lu_env *env,
1952                                  struct ptlrpc_request *req,
1953                                  struct osc_enqueue_args *aa, int rc)
1954 {
1955         struct ldlm_lock *lock;
1956         struct lustre_handle *lockh = &aa->oa_lockh;
1957         ldlm_mode_t mode = aa->oa_mode;
1958         struct ost_lvb *lvb = aa->oa_lvb;
1959         __u32 lvb_len = sizeof(*lvb);
1960         __u64 flags = 0;
1961
1962         ENTRY;
1963
1964         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1965          * be valid. */
1966         lock = ldlm_handle2lock(lockh);
1967         LASSERTF(lock != NULL,
1968                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1969                  lockh->cookie, req, aa);
1970
1971         /* Take an additional reference so that a blocking AST that
1972          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1973          * to arrive after an upcall has been executed by
1974          * osc_enqueue_fini(). */
1975         ldlm_lock_addref(lockh, mode);
1976
1977         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1978         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1979
1980         /* Let CP AST to grant the lock first. */
1981         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1982
1983         if (aa->oa_agl) {
1984                 LASSERT(aa->oa_lvb == NULL);
1985                 LASSERT(aa->oa_flags == NULL);
1986                 aa->oa_flags = &flags;
1987         }
1988
1989         /* Complete obtaining the lock procedure. */
1990         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1991                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1992                                    lockh, rc);
1993         /* Complete osc stuff. */
1994         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1995                               aa->oa_flags, aa->oa_agl, rc);
1996
1997         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1998
1999         ldlm_lock_decref(lockh, mode);
2000         LDLM_LOCK_PUT(lock);
2001         RETURN(rc);
2002 }
2003
2004 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2005
2006 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2007  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2008  * other synchronous requests, however keeping some locks and trying to obtain
2009  * others may take a considerable amount of time in a case of ost failure; and
2010  * when other sync requests do not get released lock from a client, the client
2011  * is evicted from the cluster -- such scenarious make the life difficult, so
2012  * release locks just after they are obtained. */
2013 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2014                      __u64 *flags, ldlm_policy_data_t *policy,
2015                      struct ost_lvb *lvb, int kms_valid,
2016                      osc_enqueue_upcall_f upcall, void *cookie,
2017                      struct ldlm_enqueue_info *einfo,
2018                      struct ptlrpc_request_set *rqset, int async, int agl)
2019 {
2020         struct obd_device *obd = exp->exp_obd;
2021         struct lustre_handle lockh = { 0 };
2022         struct ptlrpc_request *req = NULL;
2023         int intent = *flags & LDLM_FL_HAS_INTENT;
2024         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2025         ldlm_mode_t mode;
2026         int rc;
2027         ENTRY;
2028
2029         /* Filesystem lock extents are extended to page boundaries so that
2030          * dealing with the page cache is a little smoother.  */
2031         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2032         policy->l_extent.end |= ~PAGE_MASK;
2033
2034         /*
2035          * kms is not valid when either object is completely fresh (so that no
2036          * locks are cached), or object was evicted. In the latter case cached
2037          * lock cannot be used, because it would prime inode state with
2038          * potentially stale LVB.
2039          */
2040         if (!kms_valid)
2041                 goto no_match;
2042
2043         /* Next, search for already existing extent locks that will cover us */
2044         /* If we're trying to read, we also search for an existing PW lock.  The
2045          * VFS and page cache already protect us locally, so lots of readers/
2046          * writers can share a single PW lock.
2047          *
2048          * There are problems with conversion deadlocks, so instead of
2049          * converting a read lock to a write lock, we'll just enqueue a new
2050          * one.
2051          *
2052          * At some point we should cancel the read lock instead of making them
2053          * send us a blocking callback, but there are problems with canceling
2054          * locks out from other users right now, too. */
2055         mode = einfo->ei_mode;
2056         if (einfo->ei_mode == LCK_PR)
2057                 mode |= LCK_PW;
2058         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2059                                einfo->ei_type, policy, mode, &lockh, 0);
2060         if (mode) {
2061                 struct ldlm_lock *matched;
2062
2063                 if (*flags & LDLM_FL_TEST_LOCK)
2064                         RETURN(ELDLM_OK);
2065
2066                 matched = ldlm_handle2lock(&lockh);
2067                 if (agl) {
2068                         /* AGL enqueues DLM locks speculatively. Therefore if
2069                          * it already exists a DLM lock, it wll just inform the
2070                          * caller to cancel the AGL process for this stripe. */
2071                         ldlm_lock_decref(&lockh, mode);
2072                         LDLM_LOCK_PUT(matched);
2073                         RETURN(-ECANCELED);
2074                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2075                         *flags |= LDLM_FL_LVB_READY;
2076
2077                         /* We already have a lock, and it's referenced. */
2078                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2079
2080                         ldlm_lock_decref(&lockh, mode);
2081                         LDLM_LOCK_PUT(matched);
2082                         RETURN(ELDLM_OK);
2083                 } else {
2084                         ldlm_lock_decref(&lockh, mode);
2085                         LDLM_LOCK_PUT(matched);
2086                 }
2087         }
2088
2089 no_match:
2090         if (*flags & LDLM_FL_TEST_LOCK)
2091                 RETURN(-ENOLCK);
2092
2093         if (intent) {
2094                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2095                                            &RQF_LDLM_ENQUEUE_LVB);
2096                 if (req == NULL)
2097                         RETURN(-ENOMEM);
2098
2099                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2100                 if (rc < 0) {
2101                         ptlrpc_request_free(req);
2102                         RETURN(rc);
2103                 }
2104
2105                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2106                                      sizeof *lvb);
2107                 ptlrpc_request_set_replen(req);
2108         }
2109
2110         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2111         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2112
2113         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2114                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2115         if (async) {
2116                 if (!rc) {
2117                         struct osc_enqueue_args *aa;
2118                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2119                         aa = ptlrpc_req_async_args(req);
2120                         aa->oa_exp    = exp;
2121                         aa->oa_mode   = einfo->ei_mode;
2122                         aa->oa_type   = einfo->ei_type;
2123                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2124                         aa->oa_upcall = upcall;
2125                         aa->oa_cookie = cookie;
2126                         aa->oa_agl    = !!agl;
2127                         if (!agl) {
2128                                 aa->oa_flags  = flags;
2129                                 aa->oa_lvb    = lvb;
2130                         } else {
2131                                 /* AGL is essentially to enqueue an DLM lock
2132                                  * in advance, so we don't care about the
2133                                  * result of AGL enqueue. */
2134                                 aa->oa_lvb    = NULL;
2135                                 aa->oa_flags  = NULL;
2136                         }
2137
2138                         req->rq_interpret_reply =
2139                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2140                         if (rqset == PTLRPCD_SET)
2141                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2142                         else
2143                                 ptlrpc_set_add_req(rqset, req);
2144                 } else if (intent) {
2145                         ptlrpc_req_finished(req);
2146                 }
2147                 RETURN(rc);
2148         }
2149
2150         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2151                               flags, agl, rc);
2152         if (intent)
2153                 ptlrpc_req_finished(req);
2154
2155         RETURN(rc);
2156 }
2157
2158 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2159                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2160                    __u64 *flags, void *data, struct lustre_handle *lockh,
2161                    int unref)
2162 {
2163         struct obd_device *obd = exp->exp_obd;
2164         __u64 lflags = *flags;
2165         ldlm_mode_t rc;
2166         ENTRY;
2167
2168         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2169                 RETURN(-EIO);
2170
2171         /* Filesystem lock extents are extended to page boundaries so that
2172          * dealing with the page cache is a little smoother */
2173         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2174         policy->l_extent.end |= ~PAGE_MASK;
2175
2176         /* Next, search for already existing extent locks that will cover us */
2177         /* If we're trying to read, we also search for an existing PW lock.  The
2178          * VFS and page cache already protect us locally, so lots of readers/
2179          * writers can share a single PW lock. */
2180         rc = mode;
2181         if (mode == LCK_PR)
2182                 rc |= LCK_PW;
2183         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2184                              res_id, type, policy, rc, lockh, unref);
2185         if (rc) {
2186                 if (data != NULL) {
2187                         if (!osc_set_data_with_check(lockh, data)) {
2188                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2189                                         ldlm_lock_decref(lockh, rc);
2190                                 RETURN(0);
2191                         }
2192                 }
2193                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2194                         ldlm_lock_addref(lockh, LCK_PR);
2195                         ldlm_lock_decref(lockh, LCK_PW);
2196                 }
2197                 RETURN(rc);
2198         }
2199         RETURN(rc);
2200 }
2201
2202 static int osc_statfs_interpret(const struct lu_env *env,
2203                                 struct ptlrpc_request *req,
2204                                 struct osc_async_args *aa, int rc)
2205 {
2206         struct obd_statfs *msfs;
2207         ENTRY;
2208
2209         if (rc == -EBADR)
2210                 /* The request has in fact never been sent
2211                  * due to issues at a higher level (LOV).
2212                  * Exit immediately since the caller is
2213                  * aware of the problem and takes care
2214                  * of the clean up */
2215                  RETURN(rc);
2216
2217         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2218             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2219                 GOTO(out, rc = 0);
2220
2221         if (rc != 0)
2222                 GOTO(out, rc);
2223
2224         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2225         if (msfs == NULL) {
2226                 GOTO(out, rc = -EPROTO);
2227         }
2228
2229         *aa->aa_oi->oi_osfs = *msfs;
2230 out:
2231         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2232         RETURN(rc);
2233 }
2234
2235 static int osc_statfs_async(struct obd_export *exp,
2236                             struct obd_info *oinfo, __u64 max_age,
2237                             struct ptlrpc_request_set *rqset)
2238 {
2239         struct obd_device     *obd = class_exp2obd(exp);
2240         struct ptlrpc_request *req;
2241         struct osc_async_args *aa;
2242         int                    rc;
2243         ENTRY;
2244
2245         /* We could possibly pass max_age in the request (as an absolute
2246          * timestamp or a "seconds.usec ago") so the target can avoid doing
2247          * extra calls into the filesystem if that isn't necessary (e.g.
2248          * during mount that would help a bit).  Having relative timestamps
2249          * is not so great if request processing is slow, while absolute
2250          * timestamps are not ideal because they need time synchronization. */
2251         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2252         if (req == NULL)
2253                 RETURN(-ENOMEM);
2254
2255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2256         if (rc) {
2257                 ptlrpc_request_free(req);
2258                 RETURN(rc);
2259         }
2260         ptlrpc_request_set_replen(req);
2261         req->rq_request_portal = OST_CREATE_PORTAL;
2262         ptlrpc_at_set_req_timeout(req);
2263
2264         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2265                 /* procfs requests not want stat in wait for avoid deadlock */
2266                 req->rq_no_resend = 1;
2267                 req->rq_no_delay = 1;
2268         }
2269
2270         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2271         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2272         aa = ptlrpc_req_async_args(req);
2273         aa->aa_oi = oinfo;
2274
2275         ptlrpc_set_add_req(rqset, req);
2276         RETURN(0);
2277 }
2278
2279 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2280                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2281 {
2282         struct obd_device     *obd = class_exp2obd(exp);
2283         struct obd_statfs     *msfs;
2284         struct ptlrpc_request *req;
2285         struct obd_import     *imp = NULL;
2286         int rc;
2287         ENTRY;
2288
2289         /*Since the request might also come from lprocfs, so we need
2290          *sync this with client_disconnect_export Bug15684*/
2291         down_read(&obd->u.cli.cl_sem);
2292         if (obd->u.cli.cl_import)
2293                 imp = class_import_get(obd->u.cli.cl_import);
2294         up_read(&obd->u.cli.cl_sem);
2295         if (!imp)
2296                 RETURN(-ENODEV);
2297
2298         /* We could possibly pass max_age in the request (as an absolute
2299          * timestamp or a "seconds.usec ago") so the target can avoid doing
2300          * extra calls into the filesystem if that isn't necessary (e.g.
2301          * during mount that would help a bit).  Having relative timestamps
2302          * is not so great if request processing is slow, while absolute
2303          * timestamps are not ideal because they need time synchronization. */
2304         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2305
2306         class_import_put(imp);
2307
2308         if (req == NULL)
2309                 RETURN(-ENOMEM);
2310
2311         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2312         if (rc) {
2313                 ptlrpc_request_free(req);
2314                 RETURN(rc);
2315         }
2316         ptlrpc_request_set_replen(req);
2317         req->rq_request_portal = OST_CREATE_PORTAL;
2318         ptlrpc_at_set_req_timeout(req);
2319
2320         if (flags & OBD_STATFS_NODELAY) {
2321                 /* procfs requests not want stat in wait for avoid deadlock */
2322                 req->rq_no_resend = 1;
2323                 req->rq_no_delay = 1;
2324         }
2325
2326         rc = ptlrpc_queue_wait(req);
2327         if (rc)
2328                 GOTO(out, rc);
2329
2330         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2331         if (msfs == NULL) {
2332                 GOTO(out, rc = -EPROTO);
2333         }
2334
2335         *osfs = *msfs;
2336
2337         EXIT;
2338  out:
2339         ptlrpc_req_finished(req);
2340         return rc;
2341 }
2342
2343 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2344                          void *karg, void *uarg)
2345 {
2346         struct obd_device *obd = exp->exp_obd;
2347         struct obd_ioctl_data *data = karg;
2348         int err = 0;
2349         ENTRY;
2350
2351         if (!try_module_get(THIS_MODULE)) {
2352                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2353                        module_name(THIS_MODULE));
2354                 return -EINVAL;
2355         }
2356         switch (cmd) {
2357         case OBD_IOC_CLIENT_RECOVER:
2358                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2359                                             data->ioc_inlbuf1, 0);
2360                 if (err > 0)
2361                         err = 0;
2362                 GOTO(out, err);
2363         case IOC_OSC_SET_ACTIVE:
2364                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2365                                                data->ioc_offset);
2366                 GOTO(out, err);
2367         case OBD_IOC_PING_TARGET:
2368                 err = ptlrpc_obd_ping(obd);
2369                 GOTO(out, err);
2370         default:
2371                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2372                        cmd, current_comm());
2373                 GOTO(out, err = -ENOTTY);
2374         }
2375 out:
2376         module_put(THIS_MODULE);
2377         return err;
2378 }
2379
2380 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2381                               u32 keylen, void *key,
2382                               u32 vallen, void *val,
2383                               struct ptlrpc_request_set *set)
2384 {
2385         struct ptlrpc_request *req;
2386         struct obd_device     *obd = exp->exp_obd;
2387         struct obd_import     *imp = class_exp2cliimp(exp);
2388         char                  *tmp;
2389         int                    rc;
2390         ENTRY;
2391
2392         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2393
2394         if (KEY_IS(KEY_CHECKSUM)) {
2395                 if (vallen != sizeof(int))
2396                         RETURN(-EINVAL);
2397                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2398                 RETURN(0);
2399         }
2400
2401         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2402                 sptlrpc_conf_client_adapt(obd);
2403                 RETURN(0);
2404         }
2405
2406         if (KEY_IS(KEY_FLUSH_CTX)) {
2407                 sptlrpc_import_flush_my_ctx(imp);
2408                 RETURN(0);
2409         }
2410
2411         if (KEY_IS(KEY_CACHE_SET)) {
2412                 struct client_obd *cli = &obd->u.cli;
2413
2414                 LASSERT(cli->cl_cache == NULL); /* only once */
2415                 cli->cl_cache = (struct cl_client_cache *)val;
2416                 cl_cache_incref(cli->cl_cache);
2417                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2418
2419                 /* add this osc into entity list */
2420                 LASSERT(list_empty(&cli->cl_lru_osc));
2421                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2422                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2423                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2424
2425                 RETURN(0);
2426         }
2427
2428         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2429                 struct client_obd *cli = &obd->u.cli;
2430                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2431                 long target = *(long *)val;
2432
2433                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2434                 *(long *)val -= nr;
2435                 RETURN(0);
2436         }
2437
2438         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2439                 RETURN(-EINVAL);
2440
2441         /* We pass all other commands directly to OST. Since nobody calls osc
2442            methods directly and everybody is supposed to go through LOV, we
2443            assume lov checked invalid values for us.
2444            The only recognised values so far are evict_by_nid and mds_conn.
2445            Even if something bad goes through, we'd get a -EINVAL from OST
2446            anyway. */
2447
2448         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2449                                                 &RQF_OST_SET_GRANT_INFO :
2450                                                 &RQF_OBD_SET_INFO);
2451         if (req == NULL)
2452                 RETURN(-ENOMEM);
2453
2454         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2455                              RCL_CLIENT, keylen);
2456         if (!KEY_IS(KEY_GRANT_SHRINK))
2457                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2458                                      RCL_CLIENT, vallen);
2459         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2460         if (rc) {
2461                 ptlrpc_request_free(req);
2462                 RETURN(rc);
2463         }
2464
2465         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2466         memcpy(tmp, key, keylen);
2467         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2468                                                         &RMF_OST_BODY :
2469                                                         &RMF_SETINFO_VAL);
2470         memcpy(tmp, val, vallen);
2471
2472         if (KEY_IS(KEY_GRANT_SHRINK)) {
2473                 struct osc_grant_args *aa;
2474                 struct obdo *oa;
2475
2476                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2477                 aa = ptlrpc_req_async_args(req);
2478                 OBDO_ALLOC(oa);
2479                 if (!oa) {
2480                         ptlrpc_req_finished(req);
2481                         RETURN(-ENOMEM);
2482                 }
2483                 *oa = ((struct ost_body *)val)->oa;
2484                 aa->aa_oa = oa;
2485                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2486         }
2487
2488         ptlrpc_request_set_replen(req);
2489         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2490                 LASSERT(set != NULL);
2491                 ptlrpc_set_add_req(set, req);
2492                 ptlrpc_check_set(NULL, set);
2493         } else
2494                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2495
2496         RETURN(0);
2497 }
2498
2499 static int osc_reconnect(const struct lu_env *env,
2500                          struct obd_export *exp, struct obd_device *obd,
2501                          struct obd_uuid *cluuid,
2502                          struct obd_connect_data *data,
2503                          void *localdata)
2504 {
2505         struct client_obd *cli = &obd->u.cli;
2506
2507         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2508                 long lost_grant;
2509
2510                 spin_lock(&cli->cl_loi_list_lock);
2511                 data->ocd_grant = (cli->cl_avail_grant +
2512                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2513                                   2 * cli_brw_size(obd);
2514                 lost_grant = cli->cl_lost_grant;
2515                 cli->cl_lost_grant = 0;
2516                 spin_unlock(&cli->cl_loi_list_lock);
2517
2518                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2519                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2520                        data->ocd_version, data->ocd_grant, lost_grant);
2521         }
2522
2523         RETURN(0);
2524 }
2525
2526 static int osc_disconnect(struct obd_export *exp)
2527 {
2528         struct obd_device *obd = class_exp2obd(exp);
2529         int rc;
2530
2531         rc = client_disconnect_export(exp);
2532         /**
2533          * Initially we put del_shrink_grant before disconnect_export, but it
2534          * causes the following problem if setup (connect) and cleanup
2535          * (disconnect) are tangled together.
2536          *      connect p1                     disconnect p2
2537          *   ptlrpc_connect_import
2538          *     ...............               class_manual_cleanup
2539          *                                     osc_disconnect
2540          *                                     del_shrink_grant
2541          *   ptlrpc_connect_interrupt
2542          *     init_grant_shrink
2543          *   add this client to shrink list
2544          *                                      cleanup_osc
2545          * Bang! pinger trigger the shrink.
2546          * So the osc should be disconnected from the shrink list, after we
2547          * are sure the import has been destroyed. BUG18662
2548          */
2549         if (obd->u.cli.cl_import == NULL)
2550                 osc_del_shrink_grant(&obd->u.cli);
2551         return rc;
2552 }
2553
2554 static int osc_import_event(struct obd_device *obd,
2555                             struct obd_import *imp,
2556                             enum obd_import_event event)
2557 {
2558         struct client_obd *cli;
2559         int rc = 0;
2560
2561         ENTRY;
2562         LASSERT(imp->imp_obd == obd);
2563
2564         switch (event) {
2565         case IMP_EVENT_DISCON: {
2566                 cli = &obd->u.cli;
2567                 spin_lock(&cli->cl_loi_list_lock);
2568                 cli->cl_avail_grant = 0;
2569                 cli->cl_lost_grant = 0;
2570                 spin_unlock(&cli->cl_loi_list_lock);
2571                 break;
2572         }
2573         case IMP_EVENT_INACTIVE: {
2574                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2575                 break;
2576         }
2577         case IMP_EVENT_INVALIDATE: {
2578                 struct ldlm_namespace *ns = obd->obd_namespace;
2579                 struct lu_env         *env;
2580                 int                    refcheck;
2581
2582                 env = cl_env_get(&refcheck);
2583                 if (!IS_ERR(env)) {
2584                         /* Reset grants */
2585                         cli = &obd->u.cli;
2586                         /* all pages go to failing rpcs due to the invalid
2587                          * import */
2588                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2589
2590                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2591                         cl_env_put(env, &refcheck);
2592                 } else
2593                         rc = PTR_ERR(env);
2594                 break;
2595         }
2596         case IMP_EVENT_ACTIVE: {
2597                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2598                 break;
2599         }
2600         case IMP_EVENT_OCD: {
2601                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2602
2603                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2604                         osc_init_grant(&obd->u.cli, ocd);
2605
2606                 /* See bug 7198 */
2607                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2608                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2609
2610                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2611                 break;
2612         }
2613         case IMP_EVENT_DEACTIVATE: {
2614                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2615                 break;
2616         }
2617         case IMP_EVENT_ACTIVATE: {
2618                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2619                 break;
2620         }
2621         default:
2622                 CERROR("Unknown import event %d\n", event);
2623                 LBUG();
2624         }
2625         RETURN(rc);
2626 }
2627
2628 /**
2629  * Determine whether the lock can be canceled before replaying the lock
2630  * during recovery, see bug16774 for detailed information.
2631  *
2632  * \retval zero the lock can't be canceled
2633  * \retval other ok to cancel
2634  */
2635 static int osc_cancel_weight(struct ldlm_lock *lock)
2636 {
2637         /*
2638          * Cancel all unused and granted extent lock.
2639          */
2640         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2641             lock->l_granted_mode == lock->l_req_mode &&
2642             osc_ldlm_weigh_ast(lock) == 0)
2643                 RETURN(1);
2644
2645         RETURN(0);
2646 }
2647
2648 static int brw_queue_work(const struct lu_env *env, void *data)
2649 {
2650         struct client_obd *cli = data;
2651
2652         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2653
2654         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2655         RETURN(0);
2656 }
2657
2658 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2659 {
2660         struct client_obd *cli = &obd->u.cli;
2661         struct obd_type   *type;
2662         void              *handler;
2663         int                rc;
2664         ENTRY;
2665
2666         rc = ptlrpcd_addref();
2667         if (rc)
2668                 RETURN(rc);
2669
2670         rc = client_obd_setup(obd, lcfg);
2671         if (rc)
2672                 GOTO(out_ptlrpcd, rc);
2673
2674         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2675         if (IS_ERR(handler))
2676                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2677         cli->cl_writeback_work = handler;
2678
2679         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2680         if (IS_ERR(handler))
2681                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2682         cli->cl_lru_work = handler;
2683
2684         rc = osc_quota_setup(obd);
2685         if (rc)
2686                 GOTO(out_ptlrpcd_work, rc);
2687
2688         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2689
2690 #ifdef CONFIG_PROC_FS
2691         obd->obd_vars = lprocfs_osc_obd_vars;
2692 #endif
2693         /* If this is true then both client (osc) and server (osp) are on the
2694          * same node. The osp layer if loaded first will register the osc proc
2695          * directory. In that case this obd_device will be attached its proc
2696          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2697         type = class_search_type(LUSTRE_OSP_NAME);
2698         if (type && type->typ_procsym) {
2699                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2700                                                        type->typ_procsym,
2701                                                        obd->obd_vars, obd);
2702                 if (IS_ERR(obd->obd_proc_entry)) {
2703                         rc = PTR_ERR(obd->obd_proc_entry);
2704                         CERROR("error %d setting up lprocfs for %s\n", rc,
2705                                obd->obd_name);
2706                         obd->obd_proc_entry = NULL;
2707                 }
2708         } else {
2709                 rc = lprocfs_obd_setup(obd);
2710         }
2711
2712         /* If the basic OSC proc tree construction succeeded then
2713          * lets do the rest. */
2714         if (rc == 0) {
2715                 lproc_osc_attach_seqstat(obd);
2716                 sptlrpc_lprocfs_cliobd_attach(obd);
2717                 ptlrpc_lprocfs_register_obd(obd);
2718         }
2719
2720         /* We need to allocate a few requests more, because
2721          * brw_interpret tries to create new requests before freeing
2722          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2723          * reserved, but I'm afraid that might be too much wasted RAM
2724          * in fact, so 2 is just my guess and still should work. */
2725         cli->cl_import->imp_rq_pool =
2726                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2727                                     OST_MAXREQSIZE,
2728                                     ptlrpc_add_rqs_to_pool);
2729
2730         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2731         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2732         RETURN(0);
2733
2734 out_ptlrpcd_work:
2735         if (cli->cl_writeback_work != NULL) {
2736                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2737                 cli->cl_writeback_work = NULL;
2738         }
2739         if (cli->cl_lru_work != NULL) {
2740                 ptlrpcd_destroy_work(cli->cl_lru_work);
2741                 cli->cl_lru_work = NULL;
2742         }
2743 out_client_setup:
2744         client_obd_cleanup(obd);
2745 out_ptlrpcd:
2746         ptlrpcd_decref();
2747         RETURN(rc);
2748 }
2749
2750 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2751 {
2752         int rc = 0;
2753         ENTRY;
2754
2755         switch (stage) {
2756         case OBD_CLEANUP_EARLY: {
2757                 struct obd_import *imp;
2758                 imp = obd->u.cli.cl_import;
2759                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2760                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2761                 ptlrpc_deactivate_import(imp);
2762                 spin_lock(&imp->imp_lock);
2763                 imp->imp_pingable = 0;
2764                 spin_unlock(&imp->imp_lock);
2765                 break;
2766         }
2767         case OBD_CLEANUP_EXPORTS: {
2768                 struct client_obd *cli = &obd->u.cli;
2769                 /* LU-464
2770                  * for echo client, export may be on zombie list, wait for
2771                  * zombie thread to cull it, because cli.cl_import will be
2772                  * cleared in client_disconnect_export():
2773                  *   class_export_destroy() -> obd_cleanup() ->
2774                  *   echo_device_free() -> echo_client_cleanup() ->
2775                  *   obd_disconnect() -> osc_disconnect() ->
2776                  *   client_disconnect_export()
2777                  */
2778                 obd_zombie_barrier();
2779                 if (cli->cl_writeback_work) {
2780                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2781                         cli->cl_writeback_work = NULL;
2782                 }
2783                 if (cli->cl_lru_work) {
2784                         ptlrpcd_destroy_work(cli->cl_lru_work);
2785                         cli->cl_lru_work = NULL;
2786                 }
2787                 obd_cleanup_client_import(obd);
2788                 ptlrpc_lprocfs_unregister_obd(obd);
2789                 lprocfs_obd_cleanup(obd);
2790                 break;
2791                 }
2792         }
2793         RETURN(rc);
2794 }
2795
2796 int osc_cleanup(struct obd_device *obd)
2797 {
2798         struct client_obd *cli = &obd->u.cli;
2799         int rc;
2800
2801         ENTRY;
2802
2803         /* lru cleanup */
2804         if (cli->cl_cache != NULL) {
2805                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2806                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2807                 list_del_init(&cli->cl_lru_osc);
2808                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2809                 cli->cl_lru_left = NULL;
2810                 cl_cache_decref(cli->cl_cache);
2811                 cli->cl_cache = NULL;
2812         }
2813
2814         /* free memory of osc quota cache */
2815         osc_quota_cleanup(obd);
2816
2817         rc = client_obd_cleanup(obd);
2818
2819         ptlrpcd_decref();
2820         RETURN(rc);
2821 }
2822
2823 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2824 {
2825         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2826         return rc > 0 ? 0: rc;
2827 }
2828
2829 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2830 {
2831         return osc_process_config_base(obd, buf);
2832 }
2833
2834 static struct obd_ops osc_obd_ops = {
2835         .o_owner                = THIS_MODULE,
2836         .o_setup                = osc_setup,
2837         .o_precleanup           = osc_precleanup,
2838         .o_cleanup              = osc_cleanup,
2839         .o_add_conn             = client_import_add_conn,
2840         .o_del_conn             = client_import_del_conn,
2841         .o_connect              = client_connect_import,
2842         .o_reconnect            = osc_reconnect,
2843         .o_disconnect           = osc_disconnect,
2844         .o_statfs               = osc_statfs,
2845         .o_statfs_async         = osc_statfs_async,
2846         .o_create               = osc_create,
2847         .o_destroy              = osc_destroy,
2848         .o_getattr              = osc_getattr,
2849         .o_setattr              = osc_setattr,
2850         .o_iocontrol            = osc_iocontrol,
2851         .o_set_info_async       = osc_set_info_async,
2852         .o_import_event         = osc_import_event,
2853         .o_process_config       = osc_process_config,
2854         .o_quotactl             = osc_quotactl,
2855 };
2856
2857 static int __init osc_init(void)
2858 {
2859         bool enable_proc = true;
2860         struct obd_type *type;
2861         int rc;
2862         ENTRY;
2863
2864         /* print an address of _any_ initialized kernel symbol from this
2865          * module, to allow debugging with gdb that doesn't support data
2866          * symbols from modules.*/
2867         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2868
2869         rc = lu_kmem_init(osc_caches);
2870         if (rc)
2871                 RETURN(rc);
2872
2873         type = class_search_type(LUSTRE_OSP_NAME);
2874         if (type != NULL && type->typ_procsym != NULL)
2875                 enable_proc = false;
2876
2877         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2878                                  LUSTRE_OSC_NAME, &osc_device_type);
2879         if (rc) {
2880                 lu_kmem_fini(osc_caches);
2881                 RETURN(rc);
2882         }
2883
2884         RETURN(rc);
2885 }
2886
2887 static void /*__exit*/ osc_exit(void)
2888 {
2889         class_unregister_type(LUSTRE_OSC_NAME);
2890         lu_kmem_fini(osc_caches);
2891 }
2892
2893 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2894 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2895 MODULE_VERSION(LUSTRE_VERSION_STRING);
2896 MODULE_LICENSE("GPL");
2897
2898 module_init(osc_init);
2899 module_exit(osc_exit);