Whamcloud - gitweb
LU-6698 kernel: kernel update RHEL 6.6 [2.6.32-504.23.4.el6]
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct cl_req            *aa_clerq;
66 };
67
68 #define osc_grant_args osc_brw_async_args
69
70 struct osc_setattr_args {
71         struct obdo             *sa_oa;
72         obd_enqueue_update_f     sa_upcall;
73         void                    *sa_cookie;
74 };
75
76 struct osc_fsync_args {
77         struct osc_object       *fa_obj;
78         struct obdo             *fa_oa;
79         obd_enqueue_update_f    fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
100 {
101         struct ost_body *body;
102
103         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
104         LASSERT(body);
105
106         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
107 }
108
109 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
110                        struct obdo *oa)
111 {
112         struct ptlrpc_request   *req;
113         struct ost_body         *body;
114         int                      rc;
115
116         ENTRY;
117         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
118         if (req == NULL)
119                 RETURN(-ENOMEM);
120
121         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
122         if (rc) {
123                 ptlrpc_request_free(req);
124                 RETURN(rc);
125         }
126
127         osc_pack_req_body(req, oa);
128
129         ptlrpc_request_set_replen(req);
130
131         rc = ptlrpc_queue_wait(req);
132         if (rc)
133                 GOTO(out, rc);
134
135         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
136         if (body == NULL)
137                 GOTO(out, rc = -EPROTO);
138
139         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
140         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
141
142         oa->o_blksize = cli_brw_size(exp->exp_obd);
143         oa->o_valid |= OBD_MD_FLBLKSZ;
144
145         EXIT;
146 out:
147         ptlrpc_req_finished(req);
148
149         return rc;
150 }
151
152 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
153                        struct obdo *oa)
154 {
155         struct ptlrpc_request   *req;
156         struct ost_body         *body;
157         int                      rc;
158
159         ENTRY;
160         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
161
162         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
163         if (req == NULL)
164                 RETURN(-ENOMEM);
165
166         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
167         if (rc) {
168                 ptlrpc_request_free(req);
169                 RETURN(rc);
170         }
171
172         osc_pack_req_body(req, oa);
173
174         ptlrpc_request_set_replen(req);
175
176         rc = ptlrpc_queue_wait(req);
177         if (rc)
178                 GOTO(out, rc);
179
180         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
181         if (body == NULL)
182                 GOTO(out, rc = -EPROTO);
183
184         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
185
186         EXIT;
187 out:
188         ptlrpc_req_finished(req);
189
190         RETURN(rc);
191 }
192
193 static int osc_setattr_interpret(const struct lu_env *env,
194                                  struct ptlrpc_request *req,
195                                  struct osc_setattr_args *sa, int rc)
196 {
197         struct ost_body *body;
198         ENTRY;
199
200         if (rc != 0)
201                 GOTO(out, rc);
202
203         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204         if (body == NULL)
205                 GOTO(out, rc = -EPROTO);
206
207         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
208                              &body->oa);
209 out:
210         rc = sa->sa_upcall(sa->sa_cookie, rc);
211         RETURN(rc);
212 }
213
214 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
215                       obd_enqueue_update_f upcall, void *cookie,
216                       struct ptlrpc_request_set *rqset)
217 {
218         struct ptlrpc_request   *req;
219         struct osc_setattr_args *sa;
220         int                      rc;
221
222         ENTRY;
223
224         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225         if (req == NULL)
226                 RETURN(-ENOMEM);
227
228         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
229         if (rc) {
230                 ptlrpc_request_free(req);
231                 RETURN(rc);
232         }
233
234         osc_pack_req_body(req, oa);
235
236         ptlrpc_request_set_replen(req);
237
238         /* do mds to ost setattr asynchronously */
239         if (!rqset) {
240                 /* Do not wait for response. */
241                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
242         } else {
243                 req->rq_interpret_reply =
244                         (ptlrpc_interpterer_t)osc_setattr_interpret;
245
246                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
247                 sa = ptlrpc_req_async_args(req);
248                 sa->sa_oa = oa;
249                 sa->sa_upcall = upcall;
250                 sa->sa_cookie = cookie;
251
252                 if (rqset == PTLRPCD_SET)
253                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
254                 else
255                         ptlrpc_set_add_req(rqset, req);
256         }
257
258         RETURN(0);
259 }
260
261 static int osc_create(const struct lu_env *env, struct obd_export *exp,
262                       struct obdo *oa)
263 {
264         struct ptlrpc_request *req;
265         struct ost_body       *body;
266         int                    rc;
267         ENTRY;
268
269         LASSERT(oa != NULL);
270         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
271         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
272
273         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
274         if (req == NULL)
275                 GOTO(out, rc = -ENOMEM);
276
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 GOTO(out, rc);
281         }
282
283         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
284         LASSERT(body);
285
286         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
287
288         ptlrpc_request_set_replen(req);
289
290         rc = ptlrpc_queue_wait(req);
291         if (rc)
292                 GOTO(out_req, rc);
293
294         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295         if (body == NULL)
296                 GOTO(out_req, rc = -EPROTO);
297
298         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
299         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
300
301         oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         CDEBUG(D_HA, "transno: "LPD64"\n",
305                lustre_msg_get_transno(req->rq_repmsg));
306 out_req:
307         ptlrpc_req_finished(req);
308 out:
309         RETURN(rc);
310 }
311
312 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
313                    obd_enqueue_update_f upcall, void *cookie,
314                    struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         struct ost_body         *body;
319         int                      rc;
320         ENTRY;
321
322         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
323         if (req == NULL)
324                 RETURN(-ENOMEM);
325
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
332         ptlrpc_at_set_req_timeout(req);
333
334         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
335         LASSERT(body);
336         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
337
338         ptlrpc_request_set_replen(req);
339
340         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
341         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
342         sa = ptlrpc_req_async_args(req);
343         sa->sa_oa = oa;
344         sa->sa_upcall = upcall;
345         sa->sa_cookie = cookie;
346         if (rqset == PTLRPCD_SET)
347                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
348         else
349                 ptlrpc_set_add_req(rqset, req);
350
351         RETURN(0);
352 }
353
354 static int osc_sync_interpret(const struct lu_env *env,
355                               struct ptlrpc_request *req,
356                               void *arg, int rc)
357 {
358         struct osc_fsync_args   *fa = arg;
359         struct ost_body         *body;
360         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
361         unsigned long           valid = 0;
362         struct cl_object        *obj;
363         ENTRY;
364
365         if (rc != 0)
366                 GOTO(out, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL) {
370                 CERROR("can't unpack ost_body\n");
371                 GOTO(out, rc = -EPROTO);
372         }
373
374         *fa->fa_oa = body->oa;
375         obj = osc2cl(fa->fa_obj);
376
377         /* Update osc object's blocks attribute */
378         cl_object_attr_lock(obj);
379         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
380                 attr->cat_blocks = body->oa.o_blocks;
381                 valid |= CAT_BLOCKS;
382         }
383
384         if (valid != 0)
385                 cl_object_attr_update(env, obj, attr, valid);
386         cl_object_attr_unlock(obj);
387
388 out:
389         rc = fa->fa_upcall(fa->fa_cookie, rc);
390         RETURN(rc);
391 }
392
393 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
394                   obd_enqueue_update_f upcall, void *cookie,
395                   struct ptlrpc_request_set *rqset)
396 {
397         struct obd_export     *exp = osc_export(obj);
398         struct ptlrpc_request *req;
399         struct ost_body       *body;
400         struct osc_fsync_args *fa;
401         int                    rc;
402         ENTRY;
403
404         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
405         if (req == NULL)
406                 RETURN(-ENOMEM);
407
408         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
409         if (rc) {
410                 ptlrpc_request_free(req);
411                 RETURN(rc);
412         }
413
414         /* overload the size and blocks fields in the oa with start/end */
415         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
416         LASSERT(body);
417         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
418
419         ptlrpc_request_set_replen(req);
420         req->rq_interpret_reply = osc_sync_interpret;
421
422         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
423         fa = ptlrpc_req_async_args(req);
424         fa->fa_obj = obj;
425         fa->fa_oa = oa;
426         fa->fa_upcall = upcall;
427         fa->fa_cookie = cookie;
428
429         if (rqset == PTLRPCD_SET)
430                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431         else
432                 ptlrpc_set_add_req(rqset, req);
433
434         RETURN (0);
435 }
436
437 /* Find and cancel locally locks matched by @mode in the resource found by
438  * @objid. Found locks are added into @cancel list. Returns the amount of
439  * locks added to @cancels list. */
440 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
441                                    struct list_head *cancels,
442                                    ldlm_mode_t mode, __u64 lock_flags)
443 {
444         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
445         struct ldlm_res_id res_id;
446         struct ldlm_resource *res;
447         int count;
448         ENTRY;
449
450         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
451          * export) but disabled through procfs (flag in NS).
452          *
453          * This distinguishes from a case when ELC is not supported originally,
454          * when we still want to cancel locks in advance and just cancel them
455          * locally, without sending any RPC. */
456         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
457                 RETURN(0);
458
459         ostid_build_res_name(&oa->o_oi, &res_id);
460         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
461         if (IS_ERR(res))
462                 RETURN(0);
463
464         LDLM_RESOURCE_ADDREF(res);
465         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
466                                            lock_flags, 0, NULL);
467         LDLM_RESOURCE_DELREF(res);
468         ldlm_resource_putref(res);
469         RETURN(count);
470 }
471
472 static int osc_destroy_interpret(const struct lu_env *env,
473                                  struct ptlrpc_request *req, void *data,
474                                  int rc)
475 {
476         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
477
478         atomic_dec(&cli->cl_destroy_in_flight);
479         wake_up(&cli->cl_destroy_waitq);
480         return 0;
481 }
482
483 static int osc_can_send_destroy(struct client_obd *cli)
484 {
485         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
486             cli->cl_max_rpcs_in_flight) {
487                 /* The destroy request can be sent */
488                 return 1;
489         }
490         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
491             cli->cl_max_rpcs_in_flight) {
492                 /*
493                  * The counter has been modified between the two atomic
494                  * operations.
495                  */
496                 wake_up(&cli->cl_destroy_waitq);
497         }
498         return 0;
499 }
500
501 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
502                        struct obdo *oa)
503 {
504         struct client_obd     *cli = &exp->exp_obd->u.cli;
505         struct ptlrpc_request *req;
506         struct ost_body       *body;
507         struct list_head       cancels = LIST_HEAD_INIT(cancels);
508         int rc, count;
509         ENTRY;
510
511         if (!oa) {
512                 CDEBUG(D_INFO, "oa NULL\n");
513                 RETURN(-EINVAL);
514         }
515
516         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
517                                         LDLM_FL_DISCARD_DATA);
518
519         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
520         if (req == NULL) {
521                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
522                 RETURN(-ENOMEM);
523         }
524
525         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
526                                0, &cancels, count);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
538
539         ptlrpc_request_set_replen(req);
540
541         req->rq_interpret_reply = osc_destroy_interpret;
542         if (!osc_can_send_destroy(cli)) {
543                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
544
545                 /*
546                  * Wait until the number of on-going destroy RPCs drops
547                  * under max_rpc_in_flight
548                  */
549                 l_wait_event_exclusive(cli->cl_destroy_waitq,
550                                        osc_can_send_destroy(cli), &lwi);
551         }
552
553         /* Do not wait for response */
554         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         RETURN(0);
556 }
557
558 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
559                                 long writing_bytes)
560 {
561         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
562
563         LASSERT(!(oa->o_valid & bits));
564
565         oa->o_valid |= bits;
566         spin_lock(&cli->cl_loi_list_lock);
567         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
568         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
569                      cli->cl_dirty_max_pages)) {
570                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
571                        cli->cl_dirty_pages, cli->cl_dirty_transit,
572                        cli->cl_dirty_max_pages);
573                 oa->o_undirty = 0;
574         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
575                             atomic_long_read(&obd_dirty_transit_pages) >
576                             (obd_max_dirty_pages + 1))) {
577                 /* The atomic_read() allowing the atomic_inc() are
578                  * not covered by a lock thus they may safely race and trip
579                  * this CERROR() unless we add in a small fudge factor (+1). */
580                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
581                        cli->cl_import->imp_obd->obd_name,
582                        atomic_long_read(&obd_dirty_pages),
583                        atomic_long_read(&obd_dirty_transit_pages),
584                        obd_max_dirty_pages);
585                 oa->o_undirty = 0;
586         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
587                             0x7fffffff)) {
588                 CERROR("dirty %lu - dirty_max %lu too big???\n",
589                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
590                 oa->o_undirty = 0;
591         } else {
592                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
593                                       PAGE_CACHE_SHIFT) *
594                                      (cli->cl_max_rpcs_in_flight + 1);
595                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
596                                     max_in_flight);
597         }
598         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
599         oa->o_dropped = cli->cl_lost_grant;
600         cli->cl_lost_grant = 0;
601         spin_unlock(&cli->cl_loi_list_lock);
602         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
603                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
604
605 }
606
607 void osc_update_next_shrink(struct client_obd *cli)
608 {
609         cli->cl_next_shrink_grant =
610                 cfs_time_shift(cli->cl_grant_shrink_interval);
611         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
612                cli->cl_next_shrink_grant);
613 }
614
615 static void __osc_update_grant(struct client_obd *cli, u64 grant)
616 {
617         spin_lock(&cli->cl_loi_list_lock);
618         cli->cl_avail_grant += grant;
619         spin_unlock(&cli->cl_loi_list_lock);
620 }
621
622 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
623 {
624         if (body->oa.o_valid & OBD_MD_FLGRANT) {
625                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
626                 __osc_update_grant(cli, body->oa.o_grant);
627         }
628 }
629
630 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
631                               u32 keylen, void *key,
632                               u32 vallen, void *val,
633                               struct ptlrpc_request_set *set);
634
635 static int osc_shrink_grant_interpret(const struct lu_env *env,
636                                       struct ptlrpc_request *req,
637                                       void *aa, int rc)
638 {
639         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
640         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
641         struct ost_body *body;
642
643         if (rc != 0) {
644                 __osc_update_grant(cli, oa->o_grant);
645                 GOTO(out, rc);
646         }
647
648         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
649         LASSERT(body);
650         osc_update_grant(cli, body);
651 out:
652         OBDO_FREE(oa);
653         return rc;
654 }
655
656 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
657 {
658         spin_lock(&cli->cl_loi_list_lock);
659         oa->o_grant = cli->cl_avail_grant / 4;
660         cli->cl_avail_grant -= oa->o_grant;
661         spin_unlock(&cli->cl_loi_list_lock);
662         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
663                 oa->o_valid |= OBD_MD_FLFLAGS;
664                 oa->o_flags = 0;
665         }
666         oa->o_flags |= OBD_FL_SHRINK_GRANT;
667         osc_update_next_shrink(cli);
668 }
669
670 /* Shrink the current grant, either from some large amount to enough for a
671  * full set of in-flight RPCs, or if we have already shrunk to that limit
672  * then to enough for a single RPC.  This avoids keeping more grant than
673  * needed, and avoids shrinking the grant piecemeal. */
674 static int osc_shrink_grant(struct client_obd *cli)
675 {
676         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
677                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
678
679         spin_lock(&cli->cl_loi_list_lock);
680         if (cli->cl_avail_grant <= target_bytes)
681                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
682         spin_unlock(&cli->cl_loi_list_lock);
683
684         return osc_shrink_grant_to_target(cli, target_bytes);
685 }
686
687 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
688 {
689         int                     rc = 0;
690         struct ost_body        *body;
691         ENTRY;
692
693         spin_lock(&cli->cl_loi_list_lock);
694         /* Don't shrink if we are already above or below the desired limit
695          * We don't want to shrink below a single RPC, as that will negatively
696          * impact block allocation and long-term performance. */
697         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
698                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
699
700         if (target_bytes >= cli->cl_avail_grant) {
701                 spin_unlock(&cli->cl_loi_list_lock);
702                 RETURN(0);
703         }
704         spin_unlock(&cli->cl_loi_list_lock);
705
706         OBD_ALLOC_PTR(body);
707         if (!body)
708                 RETURN(-ENOMEM);
709
710         osc_announce_cached(cli, &body->oa, 0);
711
712         spin_lock(&cli->cl_loi_list_lock);
713         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
714         cli->cl_avail_grant = target_bytes;
715         spin_unlock(&cli->cl_loi_list_lock);
716         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
717                 body->oa.o_valid |= OBD_MD_FLFLAGS;
718                 body->oa.o_flags = 0;
719         }
720         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
721         osc_update_next_shrink(cli);
722
723         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
724                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
725                                 sizeof(*body), body, NULL);
726         if (rc != 0)
727                 __osc_update_grant(cli, body->oa.o_grant);
728         OBD_FREE_PTR(body);
729         RETURN(rc);
730 }
731
732 static int osc_should_shrink_grant(struct client_obd *client)
733 {
734         cfs_time_t time = cfs_time_current();
735         cfs_time_t next_shrink = client->cl_next_shrink_grant;
736
737         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
738              OBD_CONNECT_GRANT_SHRINK) == 0)
739                 return 0;
740
741         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
742                 /* Get the current RPC size directly, instead of going via:
743                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
744                  * Keep comment here so that it can be found by searching. */
745                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
746
747                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
748                     client->cl_avail_grant > brw_size)
749                         return 1;
750                 else
751                         osc_update_next_shrink(client);
752         }
753         return 0;
754 }
755
756 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
757 {
758         struct client_obd *client;
759
760         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
761                 if (osc_should_shrink_grant(client))
762                         osc_shrink_grant(client);
763         }
764         return 0;
765 }
766
767 static int osc_add_shrink_grant(struct client_obd *client)
768 {
769         int rc;
770
771         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
772                                        TIMEOUT_GRANT,
773                                        osc_grant_shrink_grant_cb, NULL,
774                                        &client->cl_grant_shrink_list);
775         if (rc) {
776                 CERROR("add grant client %s error %d\n",
777                         client->cl_import->imp_obd->obd_name, rc);
778                 return rc;
779         }
780         CDEBUG(D_CACHE, "add grant client %s \n",
781                client->cl_import->imp_obd->obd_name);
782         osc_update_next_shrink(client);
783         return 0;
784 }
785
786 static int osc_del_shrink_grant(struct client_obd *client)
787 {
788         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
789                                          TIMEOUT_GRANT);
790 }
791
792 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 {
794         /*
795          * ocd_grant is the total grant amount we're expect to hold: if we've
796          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
797          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
798          * dirty.
799          *
800          * race is tolerable here: if we're evicted, but imp_state already
801          * left EVICTED state, then cl_dirty_pages must be 0 already.
802          */
803         spin_lock(&cli->cl_loi_list_lock);
804         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
805                 cli->cl_avail_grant = ocd->ocd_grant;
806         else
807                 cli->cl_avail_grant = ocd->ocd_grant -
808                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
809
810         if (cli->cl_avail_grant < 0) {
811                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
812                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
813                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
814                 /* workaround for servers which do not have the patch from
815                  * LU-2679 */
816                 cli->cl_avail_grant = ocd->ocd_grant;
817         }
818
819         /* determine the appropriate chunk size used by osc_extent. */
820         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
824                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
825                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
826
827         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
828             list_empty(&cli->cl_grant_shrink_list))
829                 osc_add_shrink_grant(cli);
830 }
831
832 /* We assume that the reason this OSC got a short read is because it read
833  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
834  * via the LOV, and it _knows_ it's reading inside the file, it's just that
835  * this stripe never got written at or beyond this stripe offset yet. */
836 static void handle_short_read(int nob_read, size_t page_count,
837                               struct brw_page **pga)
838 {
839         char *ptr;
840         int i = 0;
841
842         /* skip bytes read OK */
843         while (nob_read > 0) {
844                 LASSERT (page_count > 0);
845
846                 if (pga[i]->count > nob_read) {
847                         /* EOF inside this page */
848                         ptr = kmap(pga[i]->pg) +
849                                 (pga[i]->off & ~PAGE_MASK);
850                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
851                         kunmap(pga[i]->pg);
852                         page_count--;
853                         i++;
854                         break;
855                 }
856
857                 nob_read -= pga[i]->count;
858                 page_count--;
859                 i++;
860         }
861
862         /* zero remaining pages */
863         while (page_count-- > 0) {
864                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
865                 memset(ptr, 0, pga[i]->count);
866                 kunmap(pga[i]->pg);
867                 i++;
868         }
869 }
870
871 static int check_write_rcs(struct ptlrpc_request *req,
872                            int requested_nob, int niocount,
873                            size_t page_count, struct brw_page **pga)
874 {
875         int     i;
876         __u32   *remote_rcs;
877
878         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
879                                                   sizeof(*remote_rcs) *
880                                                   niocount);
881         if (remote_rcs == NULL) {
882                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
883                 return(-EPROTO);
884         }
885
886         /* return error if any niobuf was in error */
887         for (i = 0; i < niocount; i++) {
888                 if ((int)remote_rcs[i] < 0)
889                         return(remote_rcs[i]);
890
891                 if (remote_rcs[i] != 0) {
892                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
893                                 i, remote_rcs[i], req);
894                         return(-EPROTO);
895                 }
896         }
897
898         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
899                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
900                        req->rq_bulk->bd_nob_transferred, requested_nob);
901                 return(-EPROTO);
902         }
903
904         return (0);
905 }
906
907 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
908 {
909         if (p1->flag != p2->flag) {
910                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
911                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
912                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
913
914                 /* warn if we try to combine flags that we don't know to be
915                  * safe to combine */
916                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
917                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
918                               "report this at https://jira.hpdd.intel.com/\n",
919                               p1->flag, p2->flag);
920                 }
921                 return 0;
922         }
923
924         return (p1->off + p1->count == p2->off);
925 }
926
927 static u32 osc_checksum_bulk(int nob, size_t pg_count,
928                              struct brw_page **pga, int opc,
929                              cksum_type_t cksum_type)
930 {
931         u32                             cksum;
932         int                             i = 0;
933         struct cfs_crypto_hash_desc     *hdesc;
934         unsigned int                    bufsize;
935         int                             err;
936         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
937
938         LASSERT(pg_count > 0);
939
940         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
941         if (IS_ERR(hdesc)) {
942                 CERROR("Unable to initialize checksum hash %s\n",
943                        cfs_crypto_hash_name(cfs_alg));
944                 return PTR_ERR(hdesc);
945         }
946
947         while (nob > 0 && pg_count > 0) {
948                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
949
950                 /* corrupt the data before we compute the checksum, to
951                  * simulate an OST->client data error */
952                 if (i == 0 && opc == OST_READ &&
953                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
954                         unsigned char *ptr = kmap(pga[i]->pg);
955                         int off = pga[i]->off & ~PAGE_MASK;
956
957                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
958                         kunmap(pga[i]->pg);
959                 }
960                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
961                                             pga[i]->off & ~PAGE_MASK,
962                                             count);
963                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
964                                (int)(pga[i]->off & ~PAGE_MASK));
965
966                 nob -= pga[i]->count;
967                 pg_count--;
968                 i++;
969         }
970
971         bufsize = sizeof(cksum);
972         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
973
974         /* For sending we only compute the wrong checksum instead
975          * of corrupting the data so it is still correct on a redo */
976         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
977                 cksum++;
978
979         return cksum;
980 }
981
982 static int
983 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
984                      u32 page_count, struct brw_page **pga,
985                      struct ptlrpc_request **reqp, int resend)
986 {
987         struct ptlrpc_request   *req;
988         struct ptlrpc_bulk_desc *desc;
989         struct ost_body         *body;
990         struct obd_ioobj        *ioobj;
991         struct niobuf_remote    *niobuf;
992         int niocount, i, requested_nob, opc, rc;
993         struct osc_brw_async_args *aa;
994         struct req_capsule      *pill;
995         struct brw_page *pg_prev;
996
997         ENTRY;
998         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
999                 RETURN(-ENOMEM); /* Recoverable */
1000         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1001                 RETURN(-EINVAL); /* Fatal */
1002
1003         if ((cmd & OBD_BRW_WRITE) != 0) {
1004                 opc = OST_WRITE;
1005                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1006                                                 cli->cl_import->imp_rq_pool,
1007                                                 &RQF_OST_BRW_WRITE);
1008         } else {
1009                 opc = OST_READ;
1010                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1011         }
1012         if (req == NULL)
1013                 RETURN(-ENOMEM);
1014
1015         for (niocount = i = 1; i < page_count; i++) {
1016                 if (!can_merge_pages(pga[i - 1], pga[i]))
1017                         niocount++;
1018         }
1019
1020         pill = &req->rq_pill;
1021         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1022                              sizeof(*ioobj));
1023         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1024                              niocount * sizeof(*niobuf));
1025
1026         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1027         if (rc) {
1028                 ptlrpc_request_free(req);
1029                 RETURN(rc);
1030         }
1031         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1032         ptlrpc_at_set_req_timeout(req);
1033         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1034          * retry logic */
1035         req->rq_no_retry_einprogress = 1;
1036
1037         desc = ptlrpc_prep_bulk_imp(req, page_count,
1038                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1039                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1040                         PTLRPC_BULK_PUT_SINK) |
1041                         PTLRPC_BULK_BUF_KIOV,
1042                 OST_BULK_PORTAL,
1043                 &ptlrpc_bulk_kiov_pin_ops);
1044
1045         if (desc == NULL)
1046                 GOTO(out, rc = -ENOMEM);
1047         /* NB request now owns desc and will free it when it gets freed */
1048
1049         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1050         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1051         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1052         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1053
1054         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1055
1056         obdo_to_ioobj(oa, ioobj);
1057         ioobj->ioo_bufcnt = niocount;
1058         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1059          * that might be send for this request.  The actual number is decided
1060          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1061          * "max - 1" for old client compatibility sending "0", and also so the
1062          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1063         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1064         LASSERT(page_count > 0);
1065         pg_prev = pga[0];
1066         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1067                 struct brw_page *pg = pga[i];
1068                 int poff = pg->off & ~PAGE_MASK;
1069
1070                 LASSERT(pg->count > 0);
1071                 /* make sure there is no gap in the middle of page array */
1072                 LASSERTF(page_count == 1 ||
1073                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1074                           ergo(i > 0 && i < page_count - 1,
1075                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1076                           ergo(i == page_count - 1, poff == 0)),
1077                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1078                          i, page_count, pg, pg->off, pg->count);
1079                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1080                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1081                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1082                          i, page_count,
1083                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1084                          pg_prev->pg, page_private(pg_prev->pg),
1085                          pg_prev->pg->index, pg_prev->off);
1086                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1087                         (pg->flag & OBD_BRW_SRVLOCK));
1088
1089                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1090                 requested_nob += pg->count;
1091
1092                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1093                         niobuf--;
1094                         niobuf->rnb_len += pg->count;
1095                 } else {
1096                         niobuf->rnb_offset = pg->off;
1097                         niobuf->rnb_len    = pg->count;
1098                         niobuf->rnb_flags  = pg->flag;
1099                 }
1100                 pg_prev = pg;
1101         }
1102
1103         LASSERTF((void *)(niobuf - niocount) ==
1104                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1105                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1106                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1107
1108         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1109         if (resend) {
1110                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1111                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1112                         body->oa.o_flags = 0;
1113                 }
1114                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1115         }
1116
1117         if (osc_should_shrink_grant(cli))
1118                 osc_shrink_grant_local(cli, &body->oa);
1119
1120         /* size[REQ_REC_OFF] still sizeof (*body) */
1121         if (opc == OST_WRITE) {
1122                 if (cli->cl_checksum &&
1123                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1124                         /* store cl_cksum_type in a local variable since
1125                          * it can be changed via lprocfs */
1126                         cksum_type_t cksum_type = cli->cl_cksum_type;
1127
1128                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1129                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1130                                 body->oa.o_flags = 0;
1131                         }
1132                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1133                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1134                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1135                                                              page_count, pga,
1136                                                              OST_WRITE,
1137                                                              cksum_type);
1138                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1139                                body->oa.o_cksum);
1140                         /* save this in 'oa', too, for later checking */
1141                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1142                         oa->o_flags |= cksum_type_pack(cksum_type);
1143                 } else {
1144                         /* clear out the checksum flag, in case this is a
1145                          * resend but cl_checksum is no longer set. b=11238 */
1146                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1147                 }
1148                 oa->o_cksum = body->oa.o_cksum;
1149                 /* 1 RC per niobuf */
1150                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1151                                      sizeof(__u32) * niocount);
1152         } else {
1153                 if (cli->cl_checksum &&
1154                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1155                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1156                                 body->oa.o_flags = 0;
1157                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1158                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1159                 }
1160         }
1161         ptlrpc_request_set_replen(req);
1162
1163         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1164         aa = ptlrpc_req_async_args(req);
1165         aa->aa_oa = oa;
1166         aa->aa_requested_nob = requested_nob;
1167         aa->aa_nio_count = niocount;
1168         aa->aa_page_count = page_count;
1169         aa->aa_resends = 0;
1170         aa->aa_ppga = pga;
1171         aa->aa_cli = cli;
1172         INIT_LIST_HEAD(&aa->aa_oaps);
1173
1174         *reqp = req;
1175         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1176         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1177                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1178                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1179         RETURN(0);
1180
1181  out:
1182         ptlrpc_req_finished(req);
1183         RETURN(rc);
1184 }
1185
1186 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1187                                 __u32 client_cksum, __u32 server_cksum, int nob,
1188                                 size_t page_count, struct brw_page **pga,
1189                                 cksum_type_t client_cksum_type)
1190 {
1191         __u32 new_cksum;
1192         char *msg;
1193         cksum_type_t cksum_type;
1194
1195         if (server_cksum == client_cksum) {
1196                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1197                 return 0;
1198         }
1199
1200         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1201                                        oa->o_flags : 0);
1202         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1203                                       cksum_type);
1204
1205         if (cksum_type != client_cksum_type)
1206                 msg = "the server did not use the checksum type specified in "
1207                       "the original request - likely a protocol problem";
1208         else if (new_cksum == server_cksum)
1209                 msg = "changed on the client after we checksummed it - "
1210                       "likely false positive due to mmap IO (bug 11742)";
1211         else if (new_cksum == client_cksum)
1212                 msg = "changed in transit before arrival at OST";
1213         else
1214                 msg = "changed in transit AND doesn't match the original - "
1215                       "likely false positive due to mmap IO (bug 11742)";
1216
1217         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1218                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1219                            msg, libcfs_nid2str(peer->nid),
1220                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1221                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1222                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1223                            POSTID(&oa->o_oi), pga[0]->off,
1224                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1225         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1226                "client csum now %x\n", client_cksum, client_cksum_type,
1227                server_cksum, cksum_type, new_cksum);
1228         return 1;
1229 }
1230
1231 /* Note rc enters this function as number of bytes transferred */
1232 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1233 {
1234         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1235         const lnet_process_id_t *peer =
1236                         &req->rq_import->imp_connection->c_peer;
1237         struct client_obd *cli = aa->aa_cli;
1238         struct ost_body *body;
1239         u32 client_cksum = 0;
1240         ENTRY;
1241
1242         if (rc < 0 && rc != -EDQUOT) {
1243                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1244                 RETURN(rc);
1245         }
1246
1247         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1248         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1249         if (body == NULL) {
1250                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1251                 RETURN(-EPROTO);
1252         }
1253
1254         /* set/clear over quota flag for a uid/gid */
1255         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1256             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1257                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1258
1259                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1260                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1261                        body->oa.o_flags);
1262                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1263         }
1264
1265         osc_update_grant(cli, body);
1266
1267         if (rc < 0)
1268                 RETURN(rc);
1269
1270         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1271                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1272
1273         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1274                 if (rc > 0) {
1275                         CERROR("Unexpected +ve rc %d\n", rc);
1276                         RETURN(-EPROTO);
1277                 }
1278                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1279
1280                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1281                         RETURN(-EAGAIN);
1282
1283                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1284                     check_write_checksum(&body->oa, peer, client_cksum,
1285                                          body->oa.o_cksum, aa->aa_requested_nob,
1286                                          aa->aa_page_count, aa->aa_ppga,
1287                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1288                         RETURN(-EAGAIN);
1289
1290                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1291                                      aa->aa_page_count, aa->aa_ppga);
1292                 GOTO(out, rc);
1293         }
1294
1295         /* The rest of this function executes only for OST_READs */
1296
1297         /* if unwrap_bulk failed, return -EAGAIN to retry */
1298         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1299         if (rc < 0)
1300                 GOTO(out, rc = -EAGAIN);
1301
1302         if (rc > aa->aa_requested_nob) {
1303                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1304                        aa->aa_requested_nob);
1305                 RETURN(-EPROTO);
1306         }
1307
1308         if (rc != req->rq_bulk->bd_nob_transferred) {
1309                 CERROR ("Unexpected rc %d (%d transferred)\n",
1310                         rc, req->rq_bulk->bd_nob_transferred);
1311                 return (-EPROTO);
1312         }
1313
1314         if (rc < aa->aa_requested_nob)
1315                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1316
1317         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1318                 static int cksum_counter;
1319                 u32        server_cksum = body->oa.o_cksum;
1320                 char      *via = "";
1321                 char      *router = "";
1322                 cksum_type_t cksum_type;
1323
1324                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1325                                                body->oa.o_flags : 0);
1326                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1327                                                  aa->aa_ppga, OST_READ,
1328                                                  cksum_type);
1329
1330                 if (peer->nid != req->rq_bulk->bd_sender) {
1331                         via = " via ";
1332                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1333                 }
1334
1335                 if (server_cksum != client_cksum) {
1336                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1337                                            "%s%s%s inode "DFID" object "DOSTID
1338                                            " extent ["LPU64"-"LPU64"]\n",
1339                                            req->rq_import->imp_obd->obd_name,
1340                                            libcfs_nid2str(peer->nid),
1341                                            via, router,
1342                                            body->oa.o_valid & OBD_MD_FLFID ?
1343                                                 body->oa.o_parent_seq : (__u64)0,
1344                                            body->oa.o_valid & OBD_MD_FLFID ?
1345                                                 body->oa.o_parent_oid : 0,
1346                                            body->oa.o_valid & OBD_MD_FLFID ?
1347                                                 body->oa.o_parent_ver : 0,
1348                                            POSTID(&body->oa.o_oi),
1349                                            aa->aa_ppga[0]->off,
1350                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1351                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1352                                                                         1);
1353                         CERROR("client %x, server %x, cksum_type %x\n",
1354                                client_cksum, server_cksum, cksum_type);
1355                         cksum_counter = 0;
1356                         aa->aa_oa->o_cksum = client_cksum;
1357                         rc = -EAGAIN;
1358                 } else {
1359                         cksum_counter++;
1360                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1361                         rc = 0;
1362                 }
1363         } else if (unlikely(client_cksum)) {
1364                 static int cksum_missed;
1365
1366                 cksum_missed++;
1367                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1368                         CERROR("Checksum %u requested from %s but not sent\n",
1369                                cksum_missed, libcfs_nid2str(peer->nid));
1370         } else {
1371                 rc = 0;
1372         }
1373 out:
1374         if (rc >= 0)
1375                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1376                                      aa->aa_oa, &body->oa);
1377
1378         RETURN(rc);
1379 }
1380
1381 static int osc_brw_redo_request(struct ptlrpc_request *request,
1382                                 struct osc_brw_async_args *aa, int rc)
1383 {
1384         struct ptlrpc_request *new_req;
1385         struct osc_brw_async_args *new_aa;
1386         struct osc_async_page *oap;
1387         ENTRY;
1388
1389         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1390                   "redo for recoverable error %d", rc);
1391
1392         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1393                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1394                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1395                                   aa->aa_ppga, &new_req, 1);
1396         if (rc)
1397                 RETURN(rc);
1398
1399         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1400                 if (oap->oap_request != NULL) {
1401                         LASSERTF(request == oap->oap_request,
1402                                  "request %p != oap_request %p\n",
1403                                  request, oap->oap_request);
1404                         if (oap->oap_interrupted) {
1405                                 ptlrpc_req_finished(new_req);
1406                                 RETURN(-EINTR);
1407                         }
1408                 }
1409         }
1410         /* New request takes over pga and oaps from old request.
1411          * Note that copying a list_head doesn't work, need to move it... */
1412         aa->aa_resends++;
1413         new_req->rq_interpret_reply = request->rq_interpret_reply;
1414         new_req->rq_async_args = request->rq_async_args;
1415         new_req->rq_commit_cb = request->rq_commit_cb;
1416         /* cap resend delay to the current request timeout, this is similar to
1417          * what ptlrpc does (see after_reply()) */
1418         if (aa->aa_resends > new_req->rq_timeout)
1419                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1420         else
1421                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1422         new_req->rq_generation_set = 1;
1423         new_req->rq_import_generation = request->rq_import_generation;
1424
1425         new_aa = ptlrpc_req_async_args(new_req);
1426
1427         INIT_LIST_HEAD(&new_aa->aa_oaps);
1428         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1429         INIT_LIST_HEAD(&new_aa->aa_exts);
1430         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1431         new_aa->aa_resends = aa->aa_resends;
1432
1433         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1434                 if (oap->oap_request) {
1435                         ptlrpc_req_finished(oap->oap_request);
1436                         oap->oap_request = ptlrpc_request_addref(new_req);
1437                 }
1438         }
1439
1440         /* XXX: This code will run into problem if we're going to support
1441          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1442          * and wait for all of them to be finished. We should inherit request
1443          * set from old request. */
1444         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1445
1446         DEBUG_REQ(D_INFO, new_req, "new request");
1447         RETURN(0);
1448 }
1449
1450 /*
1451  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1452  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1453  * fine for our small page arrays and doesn't require allocation.  its an
1454  * insertion sort that swaps elements that are strides apart, shrinking the
1455  * stride down until its '1' and the array is sorted.
1456  */
1457 static void sort_brw_pages(struct brw_page **array, int num)
1458 {
1459         int stride, i, j;
1460         struct brw_page *tmp;
1461
1462         if (num == 1)
1463                 return;
1464         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1465                 ;
1466
1467         do {
1468                 stride /= 3;
1469                 for (i = stride ; i < num ; i++) {
1470                         tmp = array[i];
1471                         j = i;
1472                         while (j >= stride && array[j - stride]->off > tmp->off) {
1473                                 array[j] = array[j - stride];
1474                                 j -= stride;
1475                         }
1476                         array[j] = tmp;
1477                 }
1478         } while (stride > 1);
1479 }
1480
1481 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1482 {
1483         LASSERT(ppga != NULL);
1484         OBD_FREE(ppga, sizeof(*ppga) * count);
1485 }
1486
1487 static int brw_interpret(const struct lu_env *env,
1488                          struct ptlrpc_request *req, void *data, int rc)
1489 {
1490         struct osc_brw_async_args *aa = data;
1491         struct osc_extent *ext;
1492         struct osc_extent *tmp;
1493         struct client_obd *cli = aa->aa_cli;
1494         ENTRY;
1495
1496         rc = osc_brw_fini_request(req, rc);
1497         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1498         /* When server return -EINPROGRESS, client should always retry
1499          * regardless of the number of times the bulk was resent already. */
1500         if (osc_recoverable_error(rc)) {
1501                 if (req->rq_import_generation !=
1502                     req->rq_import->imp_generation) {
1503                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1504                                ""DOSTID", rc = %d.\n",
1505                                req->rq_import->imp_obd->obd_name,
1506                                POSTID(&aa->aa_oa->o_oi), rc);
1507                 } else if (rc == -EINPROGRESS ||
1508                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1509                         rc = osc_brw_redo_request(req, aa, rc);
1510                 } else {
1511                         CERROR("%s: too many resent retries for object: "
1512                                ""LPU64":"LPU64", rc = %d.\n",
1513                                req->rq_import->imp_obd->obd_name,
1514                                POSTID(&aa->aa_oa->o_oi), rc);
1515                 }
1516
1517                 if (rc == 0)
1518                         RETURN(0);
1519                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1520                         rc = -EIO;
1521         }
1522
1523         if (rc == 0) {
1524                 struct obdo *oa = aa->aa_oa;
1525                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1526                 unsigned long valid = 0;
1527                 struct cl_object *obj;
1528                 struct osc_async_page *last;
1529
1530                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1531                 obj = osc2cl(last->oap_obj);
1532
1533                 cl_object_attr_lock(obj);
1534                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1535                         attr->cat_blocks = oa->o_blocks;
1536                         valid |= CAT_BLOCKS;
1537                 }
1538                 if (oa->o_valid & OBD_MD_FLMTIME) {
1539                         attr->cat_mtime = oa->o_mtime;
1540                         valid |= CAT_MTIME;
1541                 }
1542                 if (oa->o_valid & OBD_MD_FLATIME) {
1543                         attr->cat_atime = oa->o_atime;
1544                         valid |= CAT_ATIME;
1545                 }
1546                 if (oa->o_valid & OBD_MD_FLCTIME) {
1547                         attr->cat_ctime = oa->o_ctime;
1548                         valid |= CAT_CTIME;
1549                 }
1550
1551                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1552                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1553                         loff_t last_off = last->oap_count + last->oap_obj_off +
1554                                 last->oap_page_off;
1555
1556                         /* Change file size if this is an out of quota or
1557                          * direct IO write and it extends the file size */
1558                         if (loi->loi_lvb.lvb_size < last_off) {
1559                                 attr->cat_size = last_off;
1560                                 valid |= CAT_SIZE;
1561                         }
1562                         /* Extend KMS if it's not a lockless write */
1563                         if (loi->loi_kms < last_off &&
1564                             oap2osc_page(last)->ops_srvlock == 0) {
1565                                 attr->cat_kms = last_off;
1566                                 valid |= CAT_KMS;
1567                         }
1568                 }
1569
1570                 if (valid != 0)
1571                         cl_object_attr_update(env, obj, attr, valid);
1572                 cl_object_attr_unlock(obj);
1573         }
1574         OBDO_FREE(aa->aa_oa);
1575
1576         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1577                 osc_inc_unstable_pages(req);
1578
1579         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1580                 list_del_init(&ext->oe_link);
1581                 osc_extent_finish(env, ext, 1, rc);
1582         }
1583         LASSERT(list_empty(&aa->aa_exts));
1584         LASSERT(list_empty(&aa->aa_oaps));
1585
1586         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1587                           req->rq_bulk->bd_nob_transferred);
1588         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1589         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1590
1591         spin_lock(&cli->cl_loi_list_lock);
1592         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1593          * is called so we know whether to go to sync BRWs or wait for more
1594          * RPCs to complete */
1595         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1596                 cli->cl_w_in_flight--;
1597         else
1598                 cli->cl_r_in_flight--;
1599         osc_wake_cache_waiters(cli);
1600         spin_unlock(&cli->cl_loi_list_lock);
1601
1602         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1603         RETURN(rc);
1604 }
1605
1606 static void brw_commit(struct ptlrpc_request *req)
1607 {
1608         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1609          * this called via the rq_commit_cb, I need to ensure
1610          * osc_dec_unstable_pages is still called. Otherwise unstable
1611          * pages may be leaked. */
1612         spin_lock(&req->rq_lock);
1613         if (likely(req->rq_unstable)) {
1614                 req->rq_unstable = 0;
1615                 spin_unlock(&req->rq_lock);
1616
1617                 osc_dec_unstable_pages(req);
1618         } else {
1619                 req->rq_committed = 1;
1620                 spin_unlock(&req->rq_lock);
1621         }
1622 }
1623
1624 /**
1625  * Build an RPC by the list of extent @ext_list. The caller must ensure
1626  * that the total pages in this list are NOT over max pages per RPC.
1627  * Extents in the list must be in OES_RPC state.
1628  */
1629 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1630                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1631 {
1632         struct ptlrpc_request           *req = NULL;
1633         struct osc_extent               *ext;
1634         struct brw_page                 **pga = NULL;
1635         struct osc_brw_async_args       *aa = NULL;
1636         struct obdo                     *oa = NULL;
1637         struct osc_async_page           *oap;
1638         struct osc_async_page           *tmp;
1639         struct cl_req                   *clerq = NULL;
1640         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1641                                                                       CRT_READ;
1642         struct cl_req_attr              *crattr = NULL;
1643         loff_t                          starting_offset = OBD_OBJECT_EOF;
1644         loff_t                          ending_offset = 0;
1645         int                             mpflag = 0;
1646         int                             mem_tight = 0;
1647         int                             page_count = 0;
1648         bool                            soft_sync = false;
1649         int                             i;
1650         int                             rc;
1651         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1652         struct ost_body                 *body;
1653         ENTRY;
1654         LASSERT(!list_empty(ext_list));
1655
1656         /* add pages into rpc_list to build BRW rpc */
1657         list_for_each_entry(ext, ext_list, oe_link) {
1658                 LASSERT(ext->oe_state == OES_RPC);
1659                 mem_tight |= ext->oe_memalloc;
1660                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1661                         ++page_count;
1662                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1663                         if (starting_offset == OBD_OBJECT_EOF ||
1664                             starting_offset > oap->oap_obj_off)
1665                                 starting_offset = oap->oap_obj_off;
1666                         else
1667                                 LASSERT(oap->oap_page_off == 0);
1668                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1669                                 ending_offset = oap->oap_obj_off +
1670                                                 oap->oap_count;
1671                         else
1672                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1673                                         PAGE_CACHE_SIZE);
1674                 }
1675         }
1676
1677         soft_sync = osc_over_unstable_soft_limit(cli);
1678         if (mem_tight)
1679                 mpflag = cfs_memory_pressure_get_and_set();
1680
1681         OBD_ALLOC(crattr, sizeof(*crattr));
1682         if (crattr == NULL)
1683                 GOTO(out, rc = -ENOMEM);
1684
1685         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1686         if (pga == NULL)
1687                 GOTO(out, rc = -ENOMEM);
1688
1689         OBDO_ALLOC(oa);
1690         if (oa == NULL)
1691                 GOTO(out, rc = -ENOMEM);
1692
1693         i = 0;
1694         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1695                 struct cl_page *page = oap2cl_page(oap);
1696                 if (clerq == NULL) {
1697                         clerq = cl_req_alloc(env, page, crt,
1698                                              1 /* only 1-object rpcs for now */);
1699                         if (IS_ERR(clerq))
1700                                 GOTO(out, rc = PTR_ERR(clerq));
1701                 }
1702                 if (mem_tight)
1703                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1704                 if (soft_sync)
1705                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1706                 pga[i] = &oap->oap_brw_page;
1707                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1708                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1709                        pga[i]->pg, page_index(oap->oap_page), oap,
1710                        pga[i]->flag);
1711                 i++;
1712                 cl_req_page_add(env, clerq, page);
1713         }
1714
1715         /* always get the data for the obdo for the rpc */
1716         LASSERT(clerq != NULL);
1717         crattr->cra_oa = oa;
1718         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1719
1720         rc = cl_req_prep(env, clerq);
1721         if (rc != 0) {
1722                 CERROR("cl_req_prep failed: %d\n", rc);
1723                 GOTO(out, rc);
1724         }
1725
1726         sort_brw_pages(pga, page_count);
1727         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1728         if (rc != 0) {
1729                 CERROR("prep_req failed: %d\n", rc);
1730                 GOTO(out, rc);
1731         }
1732
1733         req->rq_commit_cb = brw_commit;
1734         req->rq_interpret_reply = brw_interpret;
1735
1736         if (mem_tight != 0)
1737                 req->rq_memalloc = 1;
1738
1739         /* Need to update the timestamps after the request is built in case
1740          * we race with setattr (locally or in queue at OST).  If OST gets
1741          * later setattr before earlier BRW (as determined by the request xid),
1742          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1743          * way to do this in a single call.  bug 10150 */
1744         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1745         crattr->cra_oa = &body->oa;
1746         cl_req_attr_set(env, clerq, crattr,
1747                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1748
1749         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1750
1751         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1752         aa = ptlrpc_req_async_args(req);
1753         INIT_LIST_HEAD(&aa->aa_oaps);
1754         list_splice_init(&rpc_list, &aa->aa_oaps);
1755         INIT_LIST_HEAD(&aa->aa_exts);
1756         list_splice_init(ext_list, &aa->aa_exts);
1757         aa->aa_clerq = clerq;
1758
1759         /* queued sync pages can be torn down while the pages
1760          * were between the pending list and the rpc */
1761         tmp = NULL;
1762         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1763                 /* only one oap gets a request reference */
1764                 if (tmp == NULL)
1765                         tmp = oap;
1766                 if (oap->oap_interrupted && !req->rq_intr) {
1767                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1768                                         oap, req);
1769                         ptlrpc_mark_interrupted(req);
1770                 }
1771         }
1772         if (tmp != NULL)
1773                 tmp->oap_request = ptlrpc_request_addref(req);
1774
1775         spin_lock(&cli->cl_loi_list_lock);
1776         starting_offset >>= PAGE_CACHE_SHIFT;
1777         if (cmd == OBD_BRW_READ) {
1778                 cli->cl_r_in_flight++;
1779                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1780                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1781                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1782                                       starting_offset + 1);
1783         } else {
1784                 cli->cl_w_in_flight++;
1785                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1786                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1787                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1788                                       starting_offset + 1);
1789         }
1790         spin_unlock(&cli->cl_loi_list_lock);
1791
1792         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1793                   page_count, aa, cli->cl_r_in_flight,
1794                   cli->cl_w_in_flight);
1795
1796         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1797          * see which CPU/NUMA node the majority of pages were allocated
1798          * on, and try to assign the async RPC to the CPU core
1799          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1800          *
1801          * But on the other hand, we expect that multiple ptlrpcd
1802          * threads and the initial write sponsor can run in parallel,
1803          * especially when data checksum is enabled, which is CPU-bound
1804          * operation and single ptlrpcd thread cannot process in time.
1805          * So more ptlrpcd threads sharing BRW load
1806          * (with PDL_POLICY_ROUND) seems better.
1807          */
1808         ptlrpcd_add_req(req, pol, -1);
1809         rc = 0;
1810         EXIT;
1811
1812 out:
1813         if (mem_tight != 0)
1814                 cfs_memory_pressure_restore(mpflag);
1815
1816         if (crattr != NULL)
1817                 OBD_FREE(crattr, sizeof(*crattr));
1818
1819         if (rc != 0) {
1820                 LASSERT(req == NULL);
1821
1822                 if (oa)
1823                         OBDO_FREE(oa);
1824                 if (pga)
1825                         OBD_FREE(pga, sizeof(*pga) * page_count);
1826                 /* this should happen rarely and is pretty bad, it makes the
1827                  * pending list not follow the dirty order */
1828                 while (!list_empty(ext_list)) {
1829                         ext = list_entry(ext_list->next, struct osc_extent,
1830                                          oe_link);
1831                         list_del_init(&ext->oe_link);
1832                         osc_extent_finish(env, ext, 0, rc);
1833                 }
1834                 if (clerq && !IS_ERR(clerq))
1835                         cl_req_completion(env, clerq, rc);
1836         }
1837         RETURN(rc);
1838 }
1839
1840 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1841                                         struct ldlm_enqueue_info *einfo)
1842 {
1843         void *data = einfo->ei_cbdata;
1844         int set = 0;
1845
1846         LASSERT(lock != NULL);
1847         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1848         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1849         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1850         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1851
1852         lock_res_and_lock(lock);
1853
1854         if (lock->l_ast_data == NULL)
1855                 lock->l_ast_data = data;
1856         if (lock->l_ast_data == data)
1857                 set = 1;
1858
1859         unlock_res_and_lock(lock);
1860
1861         return set;
1862 }
1863
1864 static int osc_set_data_with_check(struct lustre_handle *lockh,
1865                                    struct ldlm_enqueue_info *einfo)
1866 {
1867         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1868         int set = 0;
1869
1870         if (lock != NULL) {
1871                 set = osc_set_lock_data_with_check(lock, einfo);
1872                 LDLM_LOCK_PUT(lock);
1873         } else
1874                 CERROR("lockh %p, data %p - client evicted?\n",
1875                        lockh, einfo->ei_cbdata);
1876         return set;
1877 }
1878
1879 static int osc_enqueue_fini(struct ptlrpc_request *req,
1880                             osc_enqueue_upcall_f upcall, void *cookie,
1881                             struct lustre_handle *lockh, ldlm_mode_t mode,
1882                             __u64 *flags, int agl, int errcode)
1883 {
1884         bool intent = *flags & LDLM_FL_HAS_INTENT;
1885         int rc;
1886         ENTRY;
1887
1888         /* The request was created before ldlm_cli_enqueue call. */
1889         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1890                 struct ldlm_reply *rep;
1891
1892                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1893                 LASSERT(rep != NULL);
1894
1895                 rep->lock_policy_res1 =
1896                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1897                 if (rep->lock_policy_res1)
1898                         errcode = rep->lock_policy_res1;
1899                 if (!agl)
1900                         *flags |= LDLM_FL_LVB_READY;
1901         } else if (errcode == ELDLM_OK) {
1902                 *flags |= LDLM_FL_LVB_READY;
1903         }
1904
1905         /* Call the update callback. */
1906         rc = (*upcall)(cookie, lockh, errcode);
1907
1908         /* release the reference taken in ldlm_cli_enqueue() */
1909         if (errcode == ELDLM_LOCK_MATCHED)
1910                 errcode = ELDLM_OK;
1911         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1912                 ldlm_lock_decref(lockh, mode);
1913
1914         RETURN(rc);
1915 }
1916
1917 static int osc_enqueue_interpret(const struct lu_env *env,
1918                                  struct ptlrpc_request *req,
1919                                  struct osc_enqueue_args *aa, int rc)
1920 {
1921         struct ldlm_lock *lock;
1922         struct lustre_handle *lockh = &aa->oa_lockh;
1923         ldlm_mode_t mode = aa->oa_mode;
1924         struct ost_lvb *lvb = aa->oa_lvb;
1925         __u32 lvb_len = sizeof(*lvb);
1926         __u64 flags = 0;
1927
1928         ENTRY;
1929
1930         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1931          * be valid. */
1932         lock = ldlm_handle2lock(lockh);
1933         LASSERTF(lock != NULL,
1934                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1935                  lockh->cookie, req, aa);
1936
1937         /* Take an additional reference so that a blocking AST that
1938          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1939          * to arrive after an upcall has been executed by
1940          * osc_enqueue_fini(). */
1941         ldlm_lock_addref(lockh, mode);
1942
1943         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1944         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1945
1946         /* Let CP AST to grant the lock first. */
1947         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1948
1949         if (aa->oa_agl) {
1950                 LASSERT(aa->oa_lvb == NULL);
1951                 LASSERT(aa->oa_flags == NULL);
1952                 aa->oa_flags = &flags;
1953         }
1954
1955         /* Complete obtaining the lock procedure. */
1956         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1957                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1958                                    lockh, rc);
1959         /* Complete osc stuff. */
1960         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1961                               aa->oa_flags, aa->oa_agl, rc);
1962
1963         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1964
1965         ldlm_lock_decref(lockh, mode);
1966         LDLM_LOCK_PUT(lock);
1967         RETURN(rc);
1968 }
1969
1970 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1971
1972 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1973  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1974  * other synchronous requests, however keeping some locks and trying to obtain
1975  * others may take a considerable amount of time in a case of ost failure; and
1976  * when other sync requests do not get released lock from a client, the client
1977  * is evicted from the cluster -- such scenarious make the life difficult, so
1978  * release locks just after they are obtained. */
1979 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1980                      __u64 *flags, ldlm_policy_data_t *policy,
1981                      struct ost_lvb *lvb, int kms_valid,
1982                      osc_enqueue_upcall_f upcall, void *cookie,
1983                      struct ldlm_enqueue_info *einfo,
1984                      struct ptlrpc_request_set *rqset, int async, int agl)
1985 {
1986         struct obd_device *obd = exp->exp_obd;
1987         struct lustre_handle lockh = { 0 };
1988         struct ptlrpc_request *req = NULL;
1989         int intent = *flags & LDLM_FL_HAS_INTENT;
1990         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1991         ldlm_mode_t mode;
1992         int rc;
1993         ENTRY;
1994
1995         /* Filesystem lock extents are extended to page boundaries so that
1996          * dealing with the page cache is a little smoother.  */
1997         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1998         policy->l_extent.end |= ~PAGE_MASK;
1999
2000         /*
2001          * kms is not valid when either object is completely fresh (so that no
2002          * locks are cached), or object was evicted. In the latter case cached
2003          * lock cannot be used, because it would prime inode state with
2004          * potentially stale LVB.
2005          */
2006         if (!kms_valid)
2007                 goto no_match;
2008
2009         /* Next, search for already existing extent locks that will cover us */
2010         /* If we're trying to read, we also search for an existing PW lock.  The
2011          * VFS and page cache already protect us locally, so lots of readers/
2012          * writers can share a single PW lock.
2013          *
2014          * There are problems with conversion deadlocks, so instead of
2015          * converting a read lock to a write lock, we'll just enqueue a new
2016          * one.
2017          *
2018          * At some point we should cancel the read lock instead of making them
2019          * send us a blocking callback, but there are problems with canceling
2020          * locks out from other users right now, too. */
2021         mode = einfo->ei_mode;
2022         if (einfo->ei_mode == LCK_PR)
2023                 mode |= LCK_PW;
2024         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2025                                einfo->ei_type, policy, mode, &lockh, 0);
2026         if (mode) {
2027                 struct ldlm_lock *matched;
2028
2029                 if (*flags & LDLM_FL_TEST_LOCK)
2030                         RETURN(ELDLM_OK);
2031
2032                 matched = ldlm_handle2lock(&lockh);
2033                 if (agl) {
2034                         /* AGL enqueues DLM locks speculatively. Therefore if
2035                          * it already exists a DLM lock, it wll just inform the
2036                          * caller to cancel the AGL process for this stripe. */
2037                         ldlm_lock_decref(&lockh, mode);
2038                         LDLM_LOCK_PUT(matched);
2039                         RETURN(-ECANCELED);
2040                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2041                         *flags |= LDLM_FL_LVB_READY;
2042
2043                         /* We already have a lock, and it's referenced. */
2044                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2045
2046                         ldlm_lock_decref(&lockh, mode);
2047                         LDLM_LOCK_PUT(matched);
2048                         RETURN(ELDLM_OK);
2049                 } else {
2050                         ldlm_lock_decref(&lockh, mode);
2051                         LDLM_LOCK_PUT(matched);
2052                 }
2053         }
2054
2055 no_match:
2056         if (*flags & LDLM_FL_TEST_LOCK)
2057                 RETURN(-ENOLCK);
2058
2059         if (intent) {
2060                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2061                                            &RQF_LDLM_ENQUEUE_LVB);
2062                 if (req == NULL)
2063                         RETURN(-ENOMEM);
2064
2065                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2066                 if (rc) {
2067                         ptlrpc_request_free(req);
2068                         RETURN(rc);
2069                 }
2070
2071                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2072                                      sizeof *lvb);
2073                 ptlrpc_request_set_replen(req);
2074         }
2075
2076         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2077         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2078
2079         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2080                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2081         if (async) {
2082                 if (!rc) {
2083                         struct osc_enqueue_args *aa;
2084                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2085                         aa = ptlrpc_req_async_args(req);
2086                         aa->oa_exp    = exp;
2087                         aa->oa_mode   = einfo->ei_mode;
2088                         aa->oa_type   = einfo->ei_type;
2089                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2090                         aa->oa_upcall = upcall;
2091                         aa->oa_cookie = cookie;
2092                         aa->oa_agl    = !!agl;
2093                         if (!agl) {
2094                                 aa->oa_flags  = flags;
2095                                 aa->oa_lvb    = lvb;
2096                         } else {
2097                                 /* AGL is essentially to enqueue an DLM lock
2098                                  * in advance, so we don't care about the
2099                                  * result of AGL enqueue. */
2100                                 aa->oa_lvb    = NULL;
2101                                 aa->oa_flags  = NULL;
2102                         }
2103
2104                         req->rq_interpret_reply =
2105                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2106                         if (rqset == PTLRPCD_SET)
2107                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2108                         else
2109                                 ptlrpc_set_add_req(rqset, req);
2110                 } else if (intent) {
2111                         ptlrpc_req_finished(req);
2112                 }
2113                 RETURN(rc);
2114         }
2115
2116         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2117                               flags, agl, rc);
2118         if (intent)
2119                 ptlrpc_req_finished(req);
2120
2121         RETURN(rc);
2122 }
2123
2124 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2125                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2126                    __u64 *flags, void *data, struct lustre_handle *lockh,
2127                    int unref)
2128 {
2129         struct obd_device *obd = exp->exp_obd;
2130         __u64 lflags = *flags;
2131         ldlm_mode_t rc;
2132         ENTRY;
2133
2134         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2135                 RETURN(-EIO);
2136
2137         /* Filesystem lock extents are extended to page boundaries so that
2138          * dealing with the page cache is a little smoother */
2139         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2140         policy->l_extent.end |= ~PAGE_MASK;
2141
2142         /* Next, search for already existing extent locks that will cover us */
2143         /* If we're trying to read, we also search for an existing PW lock.  The
2144          * VFS and page cache already protect us locally, so lots of readers/
2145          * writers can share a single PW lock. */
2146         rc = mode;
2147         if (mode == LCK_PR)
2148                 rc |= LCK_PW;
2149         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2150                              res_id, type, policy, rc, lockh, unref);
2151         if (rc) {
2152                 if (data != NULL) {
2153                         if (!osc_set_data_with_check(lockh, data)) {
2154                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2155                                         ldlm_lock_decref(lockh, rc);
2156                                 RETURN(0);
2157                         }
2158                 }
2159                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2160                         ldlm_lock_addref(lockh, LCK_PR);
2161                         ldlm_lock_decref(lockh, LCK_PW);
2162                 }
2163                 RETURN(rc);
2164         }
2165         RETURN(rc);
2166 }
2167
2168 static int osc_statfs_interpret(const struct lu_env *env,
2169                                 struct ptlrpc_request *req,
2170                                 struct osc_async_args *aa, int rc)
2171 {
2172         struct obd_statfs *msfs;
2173         ENTRY;
2174
2175         if (rc == -EBADR)
2176                 /* The request has in fact never been sent
2177                  * due to issues at a higher level (LOV).
2178                  * Exit immediately since the caller is
2179                  * aware of the problem and takes care
2180                  * of the clean up */
2181                  RETURN(rc);
2182
2183         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2184             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2185                 GOTO(out, rc = 0);
2186
2187         if (rc != 0)
2188                 GOTO(out, rc);
2189
2190         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2191         if (msfs == NULL) {
2192                 GOTO(out, rc = -EPROTO);
2193         }
2194
2195         *aa->aa_oi->oi_osfs = *msfs;
2196 out:
2197         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2198         RETURN(rc);
2199 }
2200
2201 static int osc_statfs_async(struct obd_export *exp,
2202                             struct obd_info *oinfo, __u64 max_age,
2203                             struct ptlrpc_request_set *rqset)
2204 {
2205         struct obd_device     *obd = class_exp2obd(exp);
2206         struct ptlrpc_request *req;
2207         struct osc_async_args *aa;
2208         int                    rc;
2209         ENTRY;
2210
2211         /* We could possibly pass max_age in the request (as an absolute
2212          * timestamp or a "seconds.usec ago") so the target can avoid doing
2213          * extra calls into the filesystem if that isn't necessary (e.g.
2214          * during mount that would help a bit).  Having relative timestamps
2215          * is not so great if request processing is slow, while absolute
2216          * timestamps are not ideal because they need time synchronization. */
2217         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2218         if (req == NULL)
2219                 RETURN(-ENOMEM);
2220
2221         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2222         if (rc) {
2223                 ptlrpc_request_free(req);
2224                 RETURN(rc);
2225         }
2226         ptlrpc_request_set_replen(req);
2227         req->rq_request_portal = OST_CREATE_PORTAL;
2228         ptlrpc_at_set_req_timeout(req);
2229
2230         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2231                 /* procfs requests not want stat in wait for avoid deadlock */
2232                 req->rq_no_resend = 1;
2233                 req->rq_no_delay = 1;
2234         }
2235
2236         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2237         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2238         aa = ptlrpc_req_async_args(req);
2239         aa->aa_oi = oinfo;
2240
2241         ptlrpc_set_add_req(rqset, req);
2242         RETURN(0);
2243 }
2244
2245 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2246                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2247 {
2248         struct obd_device     *obd = class_exp2obd(exp);
2249         struct obd_statfs     *msfs;
2250         struct ptlrpc_request *req;
2251         struct obd_import     *imp = NULL;
2252         int rc;
2253         ENTRY;
2254
2255         /*Since the request might also come from lprocfs, so we need
2256          *sync this with client_disconnect_export Bug15684*/
2257         down_read(&obd->u.cli.cl_sem);
2258         if (obd->u.cli.cl_import)
2259                 imp = class_import_get(obd->u.cli.cl_import);
2260         up_read(&obd->u.cli.cl_sem);
2261         if (!imp)
2262                 RETURN(-ENODEV);
2263
2264         /* We could possibly pass max_age in the request (as an absolute
2265          * timestamp or a "seconds.usec ago") so the target can avoid doing
2266          * extra calls into the filesystem if that isn't necessary (e.g.
2267          * during mount that would help a bit).  Having relative timestamps
2268          * is not so great if request processing is slow, while absolute
2269          * timestamps are not ideal because they need time synchronization. */
2270         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2271
2272         class_import_put(imp);
2273
2274         if (req == NULL)
2275                 RETURN(-ENOMEM);
2276
2277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2278         if (rc) {
2279                 ptlrpc_request_free(req);
2280                 RETURN(rc);
2281         }
2282         ptlrpc_request_set_replen(req);
2283         req->rq_request_portal = OST_CREATE_PORTAL;
2284         ptlrpc_at_set_req_timeout(req);
2285
2286         if (flags & OBD_STATFS_NODELAY) {
2287                 /* procfs requests not want stat in wait for avoid deadlock */
2288                 req->rq_no_resend = 1;
2289                 req->rq_no_delay = 1;
2290         }
2291
2292         rc = ptlrpc_queue_wait(req);
2293         if (rc)
2294                 GOTO(out, rc);
2295
2296         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2297         if (msfs == NULL) {
2298                 GOTO(out, rc = -EPROTO);
2299         }
2300
2301         *osfs = *msfs;
2302
2303         EXIT;
2304  out:
2305         ptlrpc_req_finished(req);
2306         return rc;
2307 }
2308
2309 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2310                          void *karg, void *uarg)
2311 {
2312         struct obd_device *obd = exp->exp_obd;
2313         struct obd_ioctl_data *data = karg;
2314         int err = 0;
2315         ENTRY;
2316
2317         if (!try_module_get(THIS_MODULE)) {
2318                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2319                        module_name(THIS_MODULE));
2320                 return -EINVAL;
2321         }
2322         switch (cmd) {
2323         case OBD_IOC_CLIENT_RECOVER:
2324                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2325                                             data->ioc_inlbuf1, 0);
2326                 if (err > 0)
2327                         err = 0;
2328                 GOTO(out, err);
2329         case IOC_OSC_SET_ACTIVE:
2330                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2331                                                data->ioc_offset);
2332                 GOTO(out, err);
2333         case OBD_IOC_PING_TARGET:
2334                 err = ptlrpc_obd_ping(obd);
2335                 GOTO(out, err);
2336         default:
2337                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2338                        cmd, current_comm());
2339                 GOTO(out, err = -ENOTTY);
2340         }
2341 out:
2342         module_put(THIS_MODULE);
2343         return err;
2344 }
2345
2346 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2347                               u32 keylen, void *key,
2348                               u32 vallen, void *val,
2349                               struct ptlrpc_request_set *set)
2350 {
2351         struct ptlrpc_request *req;
2352         struct obd_device     *obd = exp->exp_obd;
2353         struct obd_import     *imp = class_exp2cliimp(exp);
2354         char                  *tmp;
2355         int                    rc;
2356         ENTRY;
2357
2358         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2359
2360         if (KEY_IS(KEY_CHECKSUM)) {
2361                 if (vallen != sizeof(int))
2362                         RETURN(-EINVAL);
2363                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2364                 RETURN(0);
2365         }
2366
2367         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2368                 sptlrpc_conf_client_adapt(obd);
2369                 RETURN(0);
2370         }
2371
2372         if (KEY_IS(KEY_FLUSH_CTX)) {
2373                 sptlrpc_import_flush_my_ctx(imp);
2374                 RETURN(0);
2375         }
2376
2377         if (KEY_IS(KEY_CACHE_SET)) {
2378                 struct client_obd *cli = &obd->u.cli;
2379
2380                 LASSERT(cli->cl_cache == NULL); /* only once */
2381                 cli->cl_cache = (struct cl_client_cache *)val;
2382                 cl_cache_incref(cli->cl_cache);
2383                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2384
2385                 /* add this osc into entity list */
2386                 LASSERT(list_empty(&cli->cl_lru_osc));
2387                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2388                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2389                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2390
2391                 RETURN(0);
2392         }
2393
2394         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2395                 struct client_obd *cli = &obd->u.cli;
2396                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2397                 long target = *(long *)val;
2398
2399                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2400                 *(long *)val -= nr;
2401                 RETURN(0);
2402         }
2403
2404         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2405                 RETURN(-EINVAL);
2406
2407         /* We pass all other commands directly to OST. Since nobody calls osc
2408            methods directly and everybody is supposed to go through LOV, we
2409            assume lov checked invalid values for us.
2410            The only recognised values so far are evict_by_nid and mds_conn.
2411            Even if something bad goes through, we'd get a -EINVAL from OST
2412            anyway. */
2413
2414         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2415                                                 &RQF_OST_SET_GRANT_INFO :
2416                                                 &RQF_OBD_SET_INFO);
2417         if (req == NULL)
2418                 RETURN(-ENOMEM);
2419
2420         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2421                              RCL_CLIENT, keylen);
2422         if (!KEY_IS(KEY_GRANT_SHRINK))
2423                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2424                                      RCL_CLIENT, vallen);
2425         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2426         if (rc) {
2427                 ptlrpc_request_free(req);
2428                 RETURN(rc);
2429         }
2430
2431         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2432         memcpy(tmp, key, keylen);
2433         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2434                                                         &RMF_OST_BODY :
2435                                                         &RMF_SETINFO_VAL);
2436         memcpy(tmp, val, vallen);
2437
2438         if (KEY_IS(KEY_GRANT_SHRINK)) {
2439                 struct osc_grant_args *aa;
2440                 struct obdo *oa;
2441
2442                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2443                 aa = ptlrpc_req_async_args(req);
2444                 OBDO_ALLOC(oa);
2445                 if (!oa) {
2446                         ptlrpc_req_finished(req);
2447                         RETURN(-ENOMEM);
2448                 }
2449                 *oa = ((struct ost_body *)val)->oa;
2450                 aa->aa_oa = oa;
2451                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2452         }
2453
2454         ptlrpc_request_set_replen(req);
2455         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2456                 LASSERT(set != NULL);
2457                 ptlrpc_set_add_req(set, req);
2458                 ptlrpc_check_set(NULL, set);
2459         } else
2460                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2461
2462         RETURN(0);
2463 }
2464
2465 static int osc_reconnect(const struct lu_env *env,
2466                          struct obd_export *exp, struct obd_device *obd,
2467                          struct obd_uuid *cluuid,
2468                          struct obd_connect_data *data,
2469                          void *localdata)
2470 {
2471         struct client_obd *cli = &obd->u.cli;
2472
2473         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2474                 long lost_grant;
2475
2476                 spin_lock(&cli->cl_loi_list_lock);
2477                 data->ocd_grant = (cli->cl_avail_grant +
2478                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2479                                   2 * cli_brw_size(obd);
2480                 lost_grant = cli->cl_lost_grant;
2481                 cli->cl_lost_grant = 0;
2482                 spin_unlock(&cli->cl_loi_list_lock);
2483
2484                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2485                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2486                        data->ocd_version, data->ocd_grant, lost_grant);
2487         }
2488
2489         RETURN(0);
2490 }
2491
2492 static int osc_disconnect(struct obd_export *exp)
2493 {
2494         struct obd_device *obd = class_exp2obd(exp);
2495         int rc;
2496
2497         rc = client_disconnect_export(exp);
2498         /**
2499          * Initially we put del_shrink_grant before disconnect_export, but it
2500          * causes the following problem if setup (connect) and cleanup
2501          * (disconnect) are tangled together.
2502          *      connect p1                     disconnect p2
2503          *   ptlrpc_connect_import
2504          *     ...............               class_manual_cleanup
2505          *                                     osc_disconnect
2506          *                                     del_shrink_grant
2507          *   ptlrpc_connect_interrupt
2508          *     init_grant_shrink
2509          *   add this client to shrink list
2510          *                                      cleanup_osc
2511          * Bang! pinger trigger the shrink.
2512          * So the osc should be disconnected from the shrink list, after we
2513          * are sure the import has been destroyed. BUG18662
2514          */
2515         if (obd->u.cli.cl_import == NULL)
2516                 osc_del_shrink_grant(&obd->u.cli);
2517         return rc;
2518 }
2519
2520 static int osc_import_event(struct obd_device *obd,
2521                             struct obd_import *imp,
2522                             enum obd_import_event event)
2523 {
2524         struct client_obd *cli;
2525         int rc = 0;
2526
2527         ENTRY;
2528         LASSERT(imp->imp_obd == obd);
2529
2530         switch (event) {
2531         case IMP_EVENT_DISCON: {
2532                 cli = &obd->u.cli;
2533                 spin_lock(&cli->cl_loi_list_lock);
2534                 cli->cl_avail_grant = 0;
2535                 cli->cl_lost_grant = 0;
2536                 spin_unlock(&cli->cl_loi_list_lock);
2537                 break;
2538         }
2539         case IMP_EVENT_INACTIVE: {
2540                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2541                 break;
2542         }
2543         case IMP_EVENT_INVALIDATE: {
2544                 struct ldlm_namespace *ns = obd->obd_namespace;
2545                 struct lu_env         *env;
2546                 int                    refcheck;
2547
2548                 env = cl_env_get(&refcheck);
2549                 if (!IS_ERR(env)) {
2550                         /* Reset grants */
2551                         cli = &obd->u.cli;
2552                         /* all pages go to failing rpcs due to the invalid
2553                          * import */
2554                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2555
2556                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2557                         cl_env_put(env, &refcheck);
2558                 } else
2559                         rc = PTR_ERR(env);
2560                 break;
2561         }
2562         case IMP_EVENT_ACTIVE: {
2563                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2564                 break;
2565         }
2566         case IMP_EVENT_OCD: {
2567                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2568
2569                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2570                         osc_init_grant(&obd->u.cli, ocd);
2571
2572                 /* See bug 7198 */
2573                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2574                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2575
2576                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2577                 break;
2578         }
2579         case IMP_EVENT_DEACTIVATE: {
2580                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2581                 break;
2582         }
2583         case IMP_EVENT_ACTIVATE: {
2584                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2585                 break;
2586         }
2587         default:
2588                 CERROR("Unknown import event %d\n", event);
2589                 LBUG();
2590         }
2591         RETURN(rc);
2592 }
2593
2594 /**
2595  * Determine whether the lock can be canceled before replaying the lock
2596  * during recovery, see bug16774 for detailed information.
2597  *
2598  * \retval zero the lock can't be canceled
2599  * \retval other ok to cancel
2600  */
2601 static int osc_cancel_weight(struct ldlm_lock *lock)
2602 {
2603         /*
2604          * Cancel all unused and granted extent lock.
2605          */
2606         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2607             lock->l_granted_mode == lock->l_req_mode &&
2608             osc_ldlm_weigh_ast(lock) == 0)
2609                 RETURN(1);
2610
2611         RETURN(0);
2612 }
2613
2614 static int brw_queue_work(const struct lu_env *env, void *data)
2615 {
2616         struct client_obd *cli = data;
2617
2618         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2619
2620         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2621         RETURN(0);
2622 }
2623
2624 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2625 {
2626         struct client_obd *cli = &obd->u.cli;
2627         struct obd_type   *type;
2628         void              *handler;
2629         int                rc;
2630         ENTRY;
2631
2632         rc = ptlrpcd_addref();
2633         if (rc)
2634                 RETURN(rc);
2635
2636         rc = client_obd_setup(obd, lcfg);
2637         if (rc)
2638                 GOTO(out_ptlrpcd, rc);
2639
2640         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2641         if (IS_ERR(handler))
2642                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2643         cli->cl_writeback_work = handler;
2644
2645         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2646         if (IS_ERR(handler))
2647                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2648         cli->cl_lru_work = handler;
2649
2650         rc = osc_quota_setup(obd);
2651         if (rc)
2652                 GOTO(out_ptlrpcd_work, rc);
2653
2654         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2655
2656 #ifdef CONFIG_PROC_FS
2657         obd->obd_vars = lprocfs_osc_obd_vars;
2658 #endif
2659         /* If this is true then both client (osc) and server (osp) are on the
2660          * same node. The osp layer if loaded first will register the osc proc
2661          * directory. In that case this obd_device will be attached its proc
2662          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2663         type = class_search_type(LUSTRE_OSP_NAME);
2664         if (type && type->typ_procsym) {
2665                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2666                                                        type->typ_procsym,
2667                                                        obd->obd_vars, obd);
2668                 if (IS_ERR(obd->obd_proc_entry)) {
2669                         rc = PTR_ERR(obd->obd_proc_entry);
2670                         CERROR("error %d setting up lprocfs for %s\n", rc,
2671                                obd->obd_name);
2672                         obd->obd_proc_entry = NULL;
2673                 }
2674         } else {
2675                 rc = lprocfs_obd_setup(obd);
2676         }
2677
2678         /* If the basic OSC proc tree construction succeeded then
2679          * lets do the rest. */
2680         if (rc == 0) {
2681                 lproc_osc_attach_seqstat(obd);
2682                 sptlrpc_lprocfs_cliobd_attach(obd);
2683                 ptlrpc_lprocfs_register_obd(obd);
2684         }
2685
2686         /* We need to allocate a few requests more, because
2687          * brw_interpret tries to create new requests before freeing
2688          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2689          * reserved, but I'm afraid that might be too much wasted RAM
2690          * in fact, so 2 is just my guess and still should work. */
2691         cli->cl_import->imp_rq_pool =
2692                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2693                                     OST_MAXREQSIZE,
2694                                     ptlrpc_add_rqs_to_pool);
2695
2696         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2697         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2698         RETURN(0);
2699
2700 out_ptlrpcd_work:
2701         if (cli->cl_writeback_work != NULL) {
2702                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2703                 cli->cl_writeback_work = NULL;
2704         }
2705         if (cli->cl_lru_work != NULL) {
2706                 ptlrpcd_destroy_work(cli->cl_lru_work);
2707                 cli->cl_lru_work = NULL;
2708         }
2709 out_client_setup:
2710         client_obd_cleanup(obd);
2711 out_ptlrpcd:
2712         ptlrpcd_decref();
2713         RETURN(rc);
2714 }
2715
2716 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2717 {
2718         int rc = 0;
2719         ENTRY;
2720
2721         switch (stage) {
2722         case OBD_CLEANUP_EARLY: {
2723                 struct obd_import *imp;
2724                 imp = obd->u.cli.cl_import;
2725                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2726                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2727                 ptlrpc_deactivate_import(imp);
2728                 spin_lock(&imp->imp_lock);
2729                 imp->imp_pingable = 0;
2730                 spin_unlock(&imp->imp_lock);
2731                 break;
2732         }
2733         case OBD_CLEANUP_EXPORTS: {
2734                 struct client_obd *cli = &obd->u.cli;
2735                 /* LU-464
2736                  * for echo client, export may be on zombie list, wait for
2737                  * zombie thread to cull it, because cli.cl_import will be
2738                  * cleared in client_disconnect_export():
2739                  *   class_export_destroy() -> obd_cleanup() ->
2740                  *   echo_device_free() -> echo_client_cleanup() ->
2741                  *   obd_disconnect() -> osc_disconnect() ->
2742                  *   client_disconnect_export()
2743                  */
2744                 obd_zombie_barrier();
2745                 if (cli->cl_writeback_work) {
2746                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2747                         cli->cl_writeback_work = NULL;
2748                 }
2749                 if (cli->cl_lru_work) {
2750                         ptlrpcd_destroy_work(cli->cl_lru_work);
2751                         cli->cl_lru_work = NULL;
2752                 }
2753                 obd_cleanup_client_import(obd);
2754                 ptlrpc_lprocfs_unregister_obd(obd);
2755                 lprocfs_obd_cleanup(obd);
2756                 break;
2757                 }
2758         }
2759         RETURN(rc);
2760 }
2761
2762 int osc_cleanup(struct obd_device *obd)
2763 {
2764         struct client_obd *cli = &obd->u.cli;
2765         int rc;
2766
2767         ENTRY;
2768
2769         /* lru cleanup */
2770         if (cli->cl_cache != NULL) {
2771                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2772                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2773                 list_del_init(&cli->cl_lru_osc);
2774                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2775                 cli->cl_lru_left = NULL;
2776                 cl_cache_decref(cli->cl_cache);
2777                 cli->cl_cache = NULL;
2778         }
2779
2780         /* free memory of osc quota cache */
2781         osc_quota_cleanup(obd);
2782
2783         rc = client_obd_cleanup(obd);
2784
2785         ptlrpcd_decref();
2786         RETURN(rc);
2787 }
2788
2789 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2790 {
2791         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2792         return rc > 0 ? 0: rc;
2793 }
2794
2795 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2796 {
2797         return osc_process_config_base(obd, buf);
2798 }
2799
2800 static struct obd_ops osc_obd_ops = {
2801         .o_owner                = THIS_MODULE,
2802         .o_setup                = osc_setup,
2803         .o_precleanup           = osc_precleanup,
2804         .o_cleanup              = osc_cleanup,
2805         .o_add_conn             = client_import_add_conn,
2806         .o_del_conn             = client_import_del_conn,
2807         .o_connect              = client_connect_import,
2808         .o_reconnect            = osc_reconnect,
2809         .o_disconnect           = osc_disconnect,
2810         .o_statfs               = osc_statfs,
2811         .o_statfs_async         = osc_statfs_async,
2812         .o_create               = osc_create,
2813         .o_destroy              = osc_destroy,
2814         .o_getattr              = osc_getattr,
2815         .o_setattr              = osc_setattr,
2816         .o_iocontrol            = osc_iocontrol,
2817         .o_set_info_async       = osc_set_info_async,
2818         .o_import_event         = osc_import_event,
2819         .o_process_config       = osc_process_config,
2820         .o_quotactl             = osc_quotactl,
2821 };
2822
2823 static int __init osc_init(void)
2824 {
2825         bool enable_proc = true;
2826         struct obd_type *type;
2827         int rc;
2828         ENTRY;
2829
2830         /* print an address of _any_ initialized kernel symbol from this
2831          * module, to allow debugging with gdb that doesn't support data
2832          * symbols from modules.*/
2833         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2834
2835         rc = lu_kmem_init(osc_caches);
2836         if (rc)
2837                 RETURN(rc);
2838
2839         type = class_search_type(LUSTRE_OSP_NAME);
2840         if (type != NULL && type->typ_procsym != NULL)
2841                 enable_proc = false;
2842
2843         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2844                                  LUSTRE_OSC_NAME, &osc_device_type);
2845         if (rc) {
2846                 lu_kmem_fini(osc_caches);
2847                 RETURN(rc);
2848         }
2849
2850         RETURN(rc);
2851 }
2852
2853 static void /*__exit*/ osc_exit(void)
2854 {
2855         class_unregister_type(LUSTRE_OSC_NAME);
2856         lu_kmem_fini(osc_caches);
2857 }
2858
2859 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2860 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2861 MODULE_VERSION(LUSTRE_VERSION_STRING);
2862 MODULE_LICENSE("GPL");
2863
2864 module_init(osc_init);
2865 module_exit(osc_exit);