Whamcloud - gitweb
LU-4931 ladvise: Add feature of giving file access advices
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_ladvise_args {
95         struct obdo             *la_oa;
96         obd_enqueue_update_f     la_upcall;
97         void                    *la_cookie;
98 };
99
100 struct osc_enqueue_args {
101         struct obd_export       *oa_exp;
102         enum ldlm_type          oa_type;
103         enum ldlm_mode          oa_mode;
104         __u64                   *oa_flags;
105         osc_enqueue_upcall_f    oa_upcall;
106         void                    *oa_cookie;
107         struct ost_lvb          *oa_lvb;
108         struct lustre_handle    oa_lockh;
109         unsigned int            oa_agl:1;
110 };
111
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
114                          void *data, int rc);
115
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
117 {
118         struct ost_body *body;
119
120         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
121         LASSERT(body);
122
123         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
124 }
125
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
127                        struct obdo *oa)
128 {
129         struct ptlrpc_request   *req;
130         struct ost_body         *body;
131         int                      rc;
132
133         ENTRY;
134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
135         if (req == NULL)
136                 RETURN(-ENOMEM);
137
138         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
139         if (rc) {
140                 ptlrpc_request_free(req);
141                 RETURN(rc);
142         }
143
144         osc_pack_req_body(req, oa);
145
146         ptlrpc_request_set_replen(req);
147
148         rc = ptlrpc_queue_wait(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
153         if (body == NULL)
154                 GOTO(out, rc = -EPROTO);
155
156         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
158
159         oa->o_blksize = cli_brw_size(exp->exp_obd);
160         oa->o_valid |= OBD_MD_FLBLKSZ;
161
162         EXIT;
163 out:
164         ptlrpc_req_finished(req);
165
166         return rc;
167 }
168
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
170                        struct obdo *oa)
171 {
172         struct ptlrpc_request   *req;
173         struct ost_body         *body;
174         int                      rc;
175
176         ENTRY;
177         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
178
179         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
180         if (req == NULL)
181                 RETURN(-ENOMEM);
182
183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
184         if (rc) {
185                 ptlrpc_request_free(req);
186                 RETURN(rc);
187         }
188
189         osc_pack_req_body(req, oa);
190
191         ptlrpc_request_set_replen(req);
192
193         rc = ptlrpc_queue_wait(req);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
198         if (body == NULL)
199                 GOTO(out, rc = -EPROTO);
200
201         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
202
203         EXIT;
204 out:
205         ptlrpc_req_finished(req);
206
207         RETURN(rc);
208 }
209
210 static int osc_setattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_setattr_args *sa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 GOTO(out, rc = -EPROTO);
223
224         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
225                              &body->oa);
226 out:
227         rc = sa->sa_upcall(sa->sa_cookie, rc);
228         RETURN(rc);
229 }
230
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232                       obd_enqueue_update_f upcall, void *cookie,
233                       struct ptlrpc_request_set *rqset)
234 {
235         struct ptlrpc_request   *req;
236         struct osc_setattr_args *sa;
237         int                      rc;
238
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oa);
252
253         ptlrpc_request_set_replen(req);
254
255         /* do mds to ost setattr asynchronously */
256         if (!rqset) {
257                 /* Do not wait for response. */
258                 ptlrpcd_add_req(req);
259         } else {
260                 req->rq_interpret_reply =
261                         (ptlrpc_interpterer_t)osc_setattr_interpret;
262
263                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264                 sa = ptlrpc_req_async_args(req);
265                 sa->sa_oa = oa;
266                 sa->sa_upcall = upcall;
267                 sa->sa_cookie = cookie;
268
269                 if (rqset == PTLRPCD_SET)
270                         ptlrpcd_add_req(req);
271                 else
272                         ptlrpc_set_add_req(rqset, req);
273         }
274
275         RETURN(0);
276 }
277
278 static int osc_ladvise_interpret(const struct lu_env *env,
279                                  struct ptlrpc_request *req,
280                                  void *arg, int rc)
281 {
282         struct osc_ladvise_args *la = arg;
283         struct ost_body *body;
284         ENTRY;
285
286         if (rc != 0)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         *la->la_oa = body->oa;
294 out:
295         rc = la->la_upcall(la->la_cookie, rc);
296         RETURN(rc);
297 }
298
299 /**
300  * If rqset is NULL, do not wait for response. Upcall and cookie could also
301  * be NULL in this case
302  */
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304                      struct ladvise_hdr *ladvise_hdr,
305                      obd_enqueue_update_f upcall, void *cookie,
306                      struct ptlrpc_request_set *rqset)
307 {
308         struct ptlrpc_request   *req;
309         struct ost_body         *body;
310         struct osc_ladvise_args *la;
311         int                      rc;
312         struct lu_ladvise       *req_ladvise;
313         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
314         int                      num_advise = ladvise_hdr->lah_count;
315         struct ladvise_hdr      *req_ladvise_hdr;
316         ENTRY;
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323                              num_advise * sizeof(*ladvise));
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
325         if (rc != 0) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329         req->rq_request_portal = OST_IO_PORTAL;
330         ptlrpc_at_set_req_timeout(req);
331
332         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
333         LASSERT(body);
334         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
335                              oa);
336
337         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338                                                  &RMF_OST_LADVISE_HDR);
339         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
340
341         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343         ptlrpc_request_set_replen(req);
344
345         if (rqset == NULL) {
346                 /* Do not wait for response. */
347                 ptlrpcd_add_req(req);
348                 RETURN(0);
349         }
350
351         req->rq_interpret_reply = osc_ladvise_interpret;
352         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353         la = ptlrpc_req_async_args(req);
354         la->la_oa = oa;
355         la->la_upcall = upcall;
356         la->la_cookie = cookie;
357
358         if (rqset == PTLRPCD_SET)
359                 ptlrpcd_add_req(req);
360         else
361                 ptlrpc_set_add_req(rqset, req);
362
363         RETURN(0);
364 }
365
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
367                       struct obdo *oa)
368 {
369         struct ptlrpc_request *req;
370         struct ost_body       *body;
371         int                    rc;
372         ENTRY;
373
374         LASSERT(oa != NULL);
375         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
379         if (req == NULL)
380                 GOTO(out, rc = -ENOMEM);
381
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 GOTO(out, rc);
386         }
387
388         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
389         LASSERT(body);
390
391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
392
393         ptlrpc_request_set_replen(req);
394
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL)
401                 GOTO(out_req, rc = -EPROTO);
402
403         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
405
406         oa->o_blksize = cli_brw_size(exp->exp_obd);
407         oa->o_valid |= OBD_MD_FLBLKSZ;
408
409         CDEBUG(D_HA, "transno: "LPD64"\n",
410                lustre_msg_get_transno(req->rq_repmsg));
411 out_req:
412         ptlrpc_req_finished(req);
413 out:
414         RETURN(rc);
415 }
416
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418                    obd_enqueue_update_f upcall, void *cookie,
419                    struct ptlrpc_request_set *rqset)
420 {
421         struct ptlrpc_request   *req;
422         struct osc_setattr_args *sa;
423         struct ost_body         *body;
424         int                      rc;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
428         if (req == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(rc);
435         }
436         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437         ptlrpc_at_set_req_timeout(req);
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
442
443         ptlrpc_request_set_replen(req);
444
445         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447         sa = ptlrpc_req_async_args(req);
448         sa->sa_oa = oa;
449         sa->sa_upcall = upcall;
450         sa->sa_cookie = cookie;
451         if (rqset == PTLRPCD_SET)
452                 ptlrpcd_add_req(req);
453         else
454                 ptlrpc_set_add_req(rqset, req);
455
456         RETURN(0);
457 }
458
459 static int osc_sync_interpret(const struct lu_env *env,
460                               struct ptlrpc_request *req,
461                               void *arg, int rc)
462 {
463         struct osc_fsync_args   *fa = arg;
464         struct ost_body         *body;
465         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
466         unsigned long           valid = 0;
467         struct cl_object        *obj;
468         ENTRY;
469
470         if (rc != 0)
471                 GOTO(out, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL) {
475                 CERROR("can't unpack ost_body\n");
476                 GOTO(out, rc = -EPROTO);
477         }
478
479         *fa->fa_oa = body->oa;
480         obj = osc2cl(fa->fa_obj);
481
482         /* Update osc object's blocks attribute */
483         cl_object_attr_lock(obj);
484         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485                 attr->cat_blocks = body->oa.o_blocks;
486                 valid |= CAT_BLOCKS;
487         }
488
489         if (valid != 0)
490                 cl_object_attr_update(env, obj, attr, valid);
491         cl_object_attr_unlock(obj);
492
493 out:
494         rc = fa->fa_upcall(fa->fa_cookie, rc);
495         RETURN(rc);
496 }
497
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499                   obd_enqueue_update_f upcall, void *cookie,
500                   struct ptlrpc_request_set *rqset)
501 {
502         struct obd_export     *exp = osc_export(obj);
503         struct ptlrpc_request *req;
504         struct ost_body       *body;
505         struct osc_fsync_args *fa;
506         int                    rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
510         if (req == NULL)
511                 RETURN(-ENOMEM);
512
513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(rc);
517         }
518
519         /* overload the size and blocks fields in the oa with start/end */
520         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
521         LASSERT(body);
522         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
523
524         ptlrpc_request_set_replen(req);
525         req->rq_interpret_reply = osc_sync_interpret;
526
527         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528         fa = ptlrpc_req_async_args(req);
529         fa->fa_obj = obj;
530         fa->fa_oa = oa;
531         fa->fa_upcall = upcall;
532         fa->fa_cookie = cookie;
533
534         if (rqset == PTLRPCD_SET)
535                 ptlrpcd_add_req(req);
536         else
537                 ptlrpc_set_add_req(rqset, req);
538
539         RETURN (0);
540 }
541
542 /* Find and cancel locally locks matched by @mode in the resource found by
543  * @objid. Found locks are added into @cancel list. Returns the amount of
544  * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546                                    struct list_head *cancels,
547                                    enum ldlm_mode mode, __u64 lock_flags)
548 {
549         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550         struct ldlm_res_id res_id;
551         struct ldlm_resource *res;
552         int count;
553         ENTRY;
554
555         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556          * export) but disabled through procfs (flag in NS).
557          *
558          * This distinguishes from a case when ELC is not supported originally,
559          * when we still want to cancel locks in advance and just cancel them
560          * locally, without sending any RPC. */
561         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
562                 RETURN(0);
563
564         ostid_build_res_name(&oa->o_oi, &res_id);
565         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
566         if (IS_ERR(res))
567                 RETURN(0);
568
569         LDLM_RESOURCE_ADDREF(res);
570         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571                                            lock_flags, 0, NULL);
572         LDLM_RESOURCE_DELREF(res);
573         ldlm_resource_putref(res);
574         RETURN(count);
575 }
576
577 static int osc_destroy_interpret(const struct lu_env *env,
578                                  struct ptlrpc_request *req, void *data,
579                                  int rc)
580 {
581         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
582
583         atomic_dec(&cli->cl_destroy_in_flight);
584         wake_up(&cli->cl_destroy_waitq);
585         return 0;
586 }
587
588 static int osc_can_send_destroy(struct client_obd *cli)
589 {
590         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591             cli->cl_max_rpcs_in_flight) {
592                 /* The destroy request can be sent */
593                 return 1;
594         }
595         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596             cli->cl_max_rpcs_in_flight) {
597                 /*
598                  * The counter has been modified between the two atomic
599                  * operations.
600                  */
601                 wake_up(&cli->cl_destroy_waitq);
602         }
603         return 0;
604 }
605
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
607                        struct obdo *oa)
608 {
609         struct client_obd     *cli = &exp->exp_obd->u.cli;
610         struct ptlrpc_request *req;
611         struct ost_body       *body;
612         struct list_head       cancels = LIST_HEAD_INIT(cancels);
613         int rc, count;
614         ENTRY;
615
616         if (!oa) {
617                 CDEBUG(D_INFO, "oa NULL\n");
618                 RETURN(-EINVAL);
619         }
620
621         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622                                         LDLM_FL_DISCARD_DATA);
623
624         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
625         if (req == NULL) {
626                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
627                 RETURN(-ENOMEM);
628         }
629
630         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
631                                0, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(rc);
635         }
636
637         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638         ptlrpc_at_set_req_timeout(req);
639
640         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
641         LASSERT(body);
642         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
643
644         ptlrpc_request_set_replen(req);
645
646         req->rq_interpret_reply = osc_destroy_interpret;
647         if (!osc_can_send_destroy(cli)) {
648                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
649
650                 /*
651                  * Wait until the number of on-going destroy RPCs drops
652                  * under max_rpc_in_flight
653                  */
654                 l_wait_event_exclusive(cli->cl_destroy_waitq,
655                                        osc_can_send_destroy(cli), &lwi);
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
927         }
928
929         if (cli->cl_avail_grant < 0) {
930                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931                       cli_name(cli), cli->cl_avail_grant,
932                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933                 /* workaround for servers which do not have the patch from
934                  * LU-2679 */
935                 cli->cl_avail_grant = ocd->ocd_grant;
936         }
937
938         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
939                 u64 size;
940
941                 /* overhead for each extent insertion */
942                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943                 /* determine the appropriate chunk size used by osc_extent. */
944                 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945                                           ocd->ocd_grant_blkbits);
946                 /* determine maximum extent size, in #pages */
947                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948                 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949                 if (cli->cl_max_extent_pages == 0)
950                         cli->cl_max_extent_pages = 1;
951         } else {
952                 cli->cl_grant_extent_tax = 0;
953                 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
955         }
956         spin_unlock(&cli->cl_loi_list_lock);
957
958         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959                 "chunk bits: %d cl_max_extent_pages: %d\n",
960                 cli_name(cli),
961                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962                 cli->cl_max_extent_pages);
963
964         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965             list_empty(&cli->cl_grant_shrink_list))
966                 osc_add_shrink_grant(cli);
967 }
968
969 /* We assume that the reason this OSC got a short read is because it read
970  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971  * via the LOV, and it _knows_ it's reading inside the file, it's just that
972  * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974                               struct brw_page **pga)
975 {
976         char *ptr;
977         int i = 0;
978
979         /* skip bytes read OK */
980         while (nob_read > 0) {
981                 LASSERT (page_count > 0);
982
983                 if (pga[i]->count > nob_read) {
984                         /* EOF inside this page */
985                         ptr = kmap(pga[i]->pg) +
986                                 (pga[i]->off & ~PAGE_MASK);
987                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988                         kunmap(pga[i]->pg);
989                         page_count--;
990                         i++;
991                         break;
992                 }
993
994                 nob_read -= pga[i]->count;
995                 page_count--;
996                 i++;
997         }
998
999         /* zero remaining pages */
1000         while (page_count-- > 0) {
1001                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002                 memset(ptr, 0, pga[i]->count);
1003                 kunmap(pga[i]->pg);
1004                 i++;
1005         }
1006 }
1007
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009                            int requested_nob, int niocount,
1010                            size_t page_count, struct brw_page **pga)
1011 {
1012         int     i;
1013         __u32   *remote_rcs;
1014
1015         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016                                                   sizeof(*remote_rcs) *
1017                                                   niocount);
1018         if (remote_rcs == NULL) {
1019                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1020                 return(-EPROTO);
1021         }
1022
1023         /* return error if any niobuf was in error */
1024         for (i = 0; i < niocount; i++) {
1025                 if ((int)remote_rcs[i] < 0)
1026                         return(remote_rcs[i]);
1027
1028                 if (remote_rcs[i] != 0) {
1029                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030                                 i, remote_rcs[i], req);
1031                         return(-EPROTO);
1032                 }
1033         }
1034
1035         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037                        req->rq_bulk->bd_nob_transferred, requested_nob);
1038                 return(-EPROTO);
1039         }
1040
1041         return (0);
1042 }
1043
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1045 {
1046         if (p1->flag != p2->flag) {
1047                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1049                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1050
1051                 /* warn if we try to combine flags that we don't know to be
1052                  * safe to combine */
1053                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055                               "report this at https://jira.hpdd.intel.com/\n",
1056                               p1->flag, p2->flag);
1057                 }
1058                 return 0;
1059         }
1060
1061         return (p1->off + p1->count == p2->off);
1062 }
1063
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065                              struct brw_page **pga, int opc,
1066                              cksum_type_t cksum_type)
1067 {
1068         u32                             cksum;
1069         int                             i = 0;
1070         struct cfs_crypto_hash_desc     *hdesc;
1071         unsigned int                    bufsize;
1072         int                             err;
1073         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1074
1075         LASSERT(pg_count > 0);
1076
1077         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078         if (IS_ERR(hdesc)) {
1079                 CERROR("Unable to initialize checksum hash %s\n",
1080                        cfs_crypto_hash_name(cfs_alg));
1081                 return PTR_ERR(hdesc);
1082         }
1083
1084         while (nob > 0 && pg_count > 0) {
1085                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1086
1087                 /* corrupt the data before we compute the checksum, to
1088                  * simulate an OST->client data error */
1089                 if (i == 0 && opc == OST_READ &&
1090                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091                         unsigned char *ptr = kmap(pga[i]->pg);
1092                         int off = pga[i]->off & ~PAGE_MASK;
1093
1094                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1095                         kunmap(pga[i]->pg);
1096                 }
1097                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098                                             pga[i]->off & ~PAGE_MASK,
1099                                             count);
1100                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101                                (int)(pga[i]->off & ~PAGE_MASK));
1102
1103                 nob -= pga[i]->count;
1104                 pg_count--;
1105                 i++;
1106         }
1107
1108         bufsize = sizeof(cksum);
1109         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1110
1111         /* For sending we only compute the wrong checksum instead
1112          * of corrupting the data so it is still correct on a redo */
1113         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1114                 cksum++;
1115
1116         return cksum;
1117 }
1118
1119 static int
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121                      u32 page_count, struct brw_page **pga,
1122                      struct ptlrpc_request **reqp, int resend)
1123 {
1124         struct ptlrpc_request   *req;
1125         struct ptlrpc_bulk_desc *desc;
1126         struct ost_body         *body;
1127         struct obd_ioobj        *ioobj;
1128         struct niobuf_remote    *niobuf;
1129         int niocount, i, requested_nob, opc, rc;
1130         struct osc_brw_async_args *aa;
1131         struct req_capsule      *pill;
1132         struct brw_page *pg_prev;
1133
1134         ENTRY;
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136                 RETURN(-ENOMEM); /* Recoverable */
1137         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138                 RETURN(-EINVAL); /* Fatal */
1139
1140         if ((cmd & OBD_BRW_WRITE) != 0) {
1141                 opc = OST_WRITE;
1142                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1143                                                 osc_rq_pool,
1144                                                 &RQF_OST_BRW_WRITE);
1145         } else {
1146                 opc = OST_READ;
1147                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1148         }
1149         if (req == NULL)
1150                 RETURN(-ENOMEM);
1151
1152         for (niocount = i = 1; i < page_count; i++) {
1153                 if (!can_merge_pages(pga[i - 1], pga[i]))
1154                         niocount++;
1155         }
1156
1157         pill = &req->rq_pill;
1158         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1159                              sizeof(*ioobj));
1160         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161                              niocount * sizeof(*niobuf));
1162
1163         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1164         if (rc) {
1165                 ptlrpc_request_free(req);
1166                 RETURN(rc);
1167         }
1168         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169         ptlrpc_at_set_req_timeout(req);
1170         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1171          * retry logic */
1172         req->rq_no_retry_einprogress = 1;
1173
1174         desc = ptlrpc_prep_bulk_imp(req, page_count,
1175                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177                         PTLRPC_BULK_PUT_SINK) |
1178                         PTLRPC_BULK_BUF_KIOV,
1179                 OST_BULK_PORTAL,
1180                 &ptlrpc_bulk_kiov_pin_ops);
1181
1182         if (desc == NULL)
1183                 GOTO(out, rc = -ENOMEM);
1184         /* NB request now owns desc and will free it when it gets freed */
1185
1186         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1190
1191         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1192
1193         obdo_to_ioobj(oa, ioobj);
1194         ioobj->ioo_bufcnt = niocount;
1195         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196          * that might be send for this request.  The actual number is decided
1197          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198          * "max - 1" for old client compatibility sending "0", and also so the
1199          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201         LASSERT(page_count > 0);
1202         pg_prev = pga[0];
1203         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204                 struct brw_page *pg = pga[i];
1205                 int poff = pg->off & ~PAGE_MASK;
1206
1207                 LASSERT(pg->count > 0);
1208                 /* make sure there is no gap in the middle of page array */
1209                 LASSERTF(page_count == 1 ||
1210                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211                           ergo(i > 0 && i < page_count - 1,
1212                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1213                           ergo(i == page_count - 1, poff == 0)),
1214                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215                          i, page_count, pg, pg->off, pg->count);
1216                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1219                          i, page_count,
1220                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221                          pg_prev->pg, page_private(pg_prev->pg),
1222                          pg_prev->pg->index, pg_prev->off);
1223                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224                         (pg->flag & OBD_BRW_SRVLOCK));
1225
1226                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227                 requested_nob += pg->count;
1228
1229                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1230                         niobuf--;
1231                         niobuf->rnb_len += pg->count;
1232                 } else {
1233                         niobuf->rnb_offset = pg->off;
1234                         niobuf->rnb_len    = pg->count;
1235                         niobuf->rnb_flags  = pg->flag;
1236                 }
1237                 pg_prev = pg;
1238         }
1239
1240         LASSERTF((void *)(niobuf - niocount) ==
1241                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1244
1245         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1246         if (resend) {
1247                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1249                         body->oa.o_flags = 0;
1250                 }
1251                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1252         }
1253
1254         if (osc_should_shrink_grant(cli))
1255                 osc_shrink_grant_local(cli, &body->oa);
1256
1257         /* size[REQ_REC_OFF] still sizeof (*body) */
1258         if (opc == OST_WRITE) {
1259                 if (cli->cl_checksum &&
1260                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261                         /* store cl_cksum_type in a local variable since
1262                          * it can be changed via lprocfs */
1263                         cksum_type_t cksum_type = cli->cl_cksum_type;
1264
1265                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267                                 body->oa.o_flags = 0;
1268                         }
1269                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1270                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1272                                                              page_count, pga,
1273                                                              OST_WRITE,
1274                                                              cksum_type);
1275                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1276                                body->oa.o_cksum);
1277                         /* save this in 'oa', too, for later checking */
1278                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279                         oa->o_flags |= cksum_type_pack(cksum_type);
1280                 } else {
1281                         /* clear out the checksum flag, in case this is a
1282                          * resend but cl_checksum is no longer set. b=11238 */
1283                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1284                 }
1285                 oa->o_cksum = body->oa.o_cksum;
1286                 /* 1 RC per niobuf */
1287                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288                                      sizeof(__u32) * niocount);
1289         } else {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293                                 body->oa.o_flags = 0;
1294                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                 }
1297         }
1298         ptlrpc_request_set_replen(req);
1299
1300         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301         aa = ptlrpc_req_async_args(req);
1302         aa->aa_oa = oa;
1303         aa->aa_requested_nob = requested_nob;
1304         aa->aa_nio_count = niocount;
1305         aa->aa_page_count = page_count;
1306         aa->aa_resends = 0;
1307         aa->aa_ppga = pga;
1308         aa->aa_cli = cli;
1309         INIT_LIST_HEAD(&aa->aa_oaps);
1310
1311         *reqp = req;
1312         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316         RETURN(0);
1317
1318  out:
1319         ptlrpc_req_finished(req);
1320         RETURN(rc);
1321 }
1322
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324                                 __u32 client_cksum, __u32 server_cksum, int nob,
1325                                 size_t page_count, struct brw_page **pga,
1326                                 cksum_type_t client_cksum_type)
1327 {
1328         __u32 new_cksum;
1329         char *msg;
1330         cksum_type_t cksum_type;
1331
1332         if (server_cksum == client_cksum) {
1333                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1334                 return 0;
1335         }
1336
1337         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1338                                        oa->o_flags : 0);
1339         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1340                                       cksum_type);
1341
1342         if (cksum_type != client_cksum_type)
1343                 msg = "the server did not use the checksum type specified in "
1344                       "the original request - likely a protocol problem";
1345         else if (new_cksum == server_cksum)
1346                 msg = "changed on the client after we checksummed it - "
1347                       "likely false positive due to mmap IO (bug 11742)";
1348         else if (new_cksum == client_cksum)
1349                 msg = "changed in transit before arrival at OST";
1350         else
1351                 msg = "changed in transit AND doesn't match the original - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353
1354         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356                            msg, libcfs_nid2str(peer->nid),
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360                            POSTID(&oa->o_oi), pga[0]->off,
1361                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363                "client csum now %x\n", client_cksum, client_cksum_type,
1364                server_cksum, cksum_type, new_cksum);
1365         return 1;
1366 }
1367
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1370 {
1371         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372         const lnet_process_id_t *peer =
1373                         &req->rq_import->imp_connection->c_peer;
1374         struct client_obd *cli = aa->aa_cli;
1375         struct ost_body *body;
1376         u32 client_cksum = 0;
1377         ENTRY;
1378
1379         if (rc < 0 && rc != -EDQUOT) {
1380                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1381                 RETURN(rc);
1382         }
1383
1384         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1386         if (body == NULL) {
1387                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1388                 RETURN(-EPROTO);
1389         }
1390
1391         /* set/clear over quota flag for a uid/gid */
1392         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1395
1396                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1397                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1398                        body->oa.o_flags);
1399                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1400         }
1401
1402         osc_update_grant(cli, body);
1403
1404         if (rc < 0)
1405                 RETURN(rc);
1406
1407         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1408                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1409
1410         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1411                 if (rc > 0) {
1412                         CERROR("Unexpected +ve rc %d\n", rc);
1413                         RETURN(-EPROTO);
1414                 }
1415                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1416
1417                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1418                         RETURN(-EAGAIN);
1419
1420                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1421                     check_write_checksum(&body->oa, peer, client_cksum,
1422                                          body->oa.o_cksum, aa->aa_requested_nob,
1423                                          aa->aa_page_count, aa->aa_ppga,
1424                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1425                         RETURN(-EAGAIN);
1426
1427                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1428                                      aa->aa_page_count, aa->aa_ppga);
1429                 GOTO(out, rc);
1430         }
1431
1432         /* The rest of this function executes only for OST_READs */
1433
1434         /* if unwrap_bulk failed, return -EAGAIN to retry */
1435         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1436         if (rc < 0)
1437                 GOTO(out, rc = -EAGAIN);
1438
1439         if (rc > aa->aa_requested_nob) {
1440                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1441                        aa->aa_requested_nob);
1442                 RETURN(-EPROTO);
1443         }
1444
1445         if (rc != req->rq_bulk->bd_nob_transferred) {
1446                 CERROR ("Unexpected rc %d (%d transferred)\n",
1447                         rc, req->rq_bulk->bd_nob_transferred);
1448                 return (-EPROTO);
1449         }
1450
1451         if (rc < aa->aa_requested_nob)
1452                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1453
1454         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1455                 static int cksum_counter;
1456                 u32        server_cksum = body->oa.o_cksum;
1457                 char      *via = "";
1458                 char      *router = "";
1459                 cksum_type_t cksum_type;
1460
1461                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1462                                                body->oa.o_flags : 0);
1463                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1464                                                  aa->aa_ppga, OST_READ,
1465                                                  cksum_type);
1466
1467                 if (peer->nid != req->rq_bulk->bd_sender) {
1468                         via = " via ";
1469                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1470                 }
1471
1472                 if (server_cksum != client_cksum) {
1473                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1474                                            "%s%s%s inode "DFID" object "DOSTID
1475                                            " extent ["LPU64"-"LPU64"]\n",
1476                                            req->rq_import->imp_obd->obd_name,
1477                                            libcfs_nid2str(peer->nid),
1478                                            via, router,
1479                                            body->oa.o_valid & OBD_MD_FLFID ?
1480                                                 body->oa.o_parent_seq : (__u64)0,
1481                                            body->oa.o_valid & OBD_MD_FLFID ?
1482                                                 body->oa.o_parent_oid : 0,
1483                                            body->oa.o_valid & OBD_MD_FLFID ?
1484                                                 body->oa.o_parent_ver : 0,
1485                                            POSTID(&body->oa.o_oi),
1486                                            aa->aa_ppga[0]->off,
1487                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1488                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1489                                                                         1);
1490                         CERROR("client %x, server %x, cksum_type %x\n",
1491                                client_cksum, server_cksum, cksum_type);
1492                         cksum_counter = 0;
1493                         aa->aa_oa->o_cksum = client_cksum;
1494                         rc = -EAGAIN;
1495                 } else {
1496                         cksum_counter++;
1497                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1498                         rc = 0;
1499                 }
1500         } else if (unlikely(client_cksum)) {
1501                 static int cksum_missed;
1502
1503                 cksum_missed++;
1504                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1505                         CERROR("Checksum %u requested from %s but not sent\n",
1506                                cksum_missed, libcfs_nid2str(peer->nid));
1507         } else {
1508                 rc = 0;
1509         }
1510 out:
1511         if (rc >= 0)
1512                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1513                                      aa->aa_oa, &body->oa);
1514
1515         RETURN(rc);
1516 }
1517
1518 static int osc_brw_redo_request(struct ptlrpc_request *request,
1519                                 struct osc_brw_async_args *aa, int rc)
1520 {
1521         struct ptlrpc_request *new_req;
1522         struct osc_brw_async_args *new_aa;
1523         struct osc_async_page *oap;
1524         ENTRY;
1525
1526         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1527                   "redo for recoverable error %d", rc);
1528
1529         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1530                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1531                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1532                                   aa->aa_ppga, &new_req, 1);
1533         if (rc)
1534                 RETURN(rc);
1535
1536         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1537                 if (oap->oap_request != NULL) {
1538                         LASSERTF(request == oap->oap_request,
1539                                  "request %p != oap_request %p\n",
1540                                  request, oap->oap_request);
1541                         if (oap->oap_interrupted) {
1542                                 ptlrpc_req_finished(new_req);
1543                                 RETURN(-EINTR);
1544                         }
1545                 }
1546         }
1547         /* New request takes over pga and oaps from old request.
1548          * Note that copying a list_head doesn't work, need to move it... */
1549         aa->aa_resends++;
1550         new_req->rq_interpret_reply = request->rq_interpret_reply;
1551         new_req->rq_async_args = request->rq_async_args;
1552         new_req->rq_commit_cb = request->rq_commit_cb;
1553         /* cap resend delay to the current request timeout, this is similar to
1554          * what ptlrpc does (see after_reply()) */
1555         if (aa->aa_resends > new_req->rq_timeout)
1556                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1557         else
1558                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1559         new_req->rq_generation_set = 1;
1560         new_req->rq_import_generation = request->rq_import_generation;
1561
1562         new_aa = ptlrpc_req_async_args(new_req);
1563
1564         INIT_LIST_HEAD(&new_aa->aa_oaps);
1565         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1566         INIT_LIST_HEAD(&new_aa->aa_exts);
1567         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1568         new_aa->aa_resends = aa->aa_resends;
1569
1570         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1571                 if (oap->oap_request) {
1572                         ptlrpc_req_finished(oap->oap_request);
1573                         oap->oap_request = ptlrpc_request_addref(new_req);
1574                 }
1575         }
1576
1577         /* XXX: This code will run into problem if we're going to support
1578          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1579          * and wait for all of them to be finished. We should inherit request
1580          * set from old request. */
1581         ptlrpcd_add_req(new_req);
1582
1583         DEBUG_REQ(D_INFO, new_req, "new request");
1584         RETURN(0);
1585 }
1586
1587 /*
1588  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1589  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1590  * fine for our small page arrays and doesn't require allocation.  its an
1591  * insertion sort that swaps elements that are strides apart, shrinking the
1592  * stride down until its '1' and the array is sorted.
1593  */
1594 static void sort_brw_pages(struct brw_page **array, int num)
1595 {
1596         int stride, i, j;
1597         struct brw_page *tmp;
1598
1599         if (num == 1)
1600                 return;
1601         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1602                 ;
1603
1604         do {
1605                 stride /= 3;
1606                 for (i = stride ; i < num ; i++) {
1607                         tmp = array[i];
1608                         j = i;
1609                         while (j >= stride && array[j - stride]->off > tmp->off) {
1610                                 array[j] = array[j - stride];
1611                                 j -= stride;
1612                         }
1613                         array[j] = tmp;
1614                 }
1615         } while (stride > 1);
1616 }
1617
1618 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1619 {
1620         LASSERT(ppga != NULL);
1621         OBD_FREE(ppga, sizeof(*ppga) * count);
1622 }
1623
1624 static int brw_interpret(const struct lu_env *env,
1625                          struct ptlrpc_request *req, void *data, int rc)
1626 {
1627         struct osc_brw_async_args *aa = data;
1628         struct osc_extent *ext;
1629         struct osc_extent *tmp;
1630         struct client_obd *cli = aa->aa_cli;
1631         ENTRY;
1632
1633         rc = osc_brw_fini_request(req, rc);
1634         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1635         /* When server return -EINPROGRESS, client should always retry
1636          * regardless of the number of times the bulk was resent already. */
1637         if (osc_recoverable_error(rc)) {
1638                 if (req->rq_import_generation !=
1639                     req->rq_import->imp_generation) {
1640                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1641                                ""DOSTID", rc = %d.\n",
1642                                req->rq_import->imp_obd->obd_name,
1643                                POSTID(&aa->aa_oa->o_oi), rc);
1644                 } else if (rc == -EINPROGRESS ||
1645                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1646                         rc = osc_brw_redo_request(req, aa, rc);
1647                 } else {
1648                         CERROR("%s: too many resent retries for object: "
1649                                ""LPU64":"LPU64", rc = %d.\n",
1650                                req->rq_import->imp_obd->obd_name,
1651                                POSTID(&aa->aa_oa->o_oi), rc);
1652                 }
1653
1654                 if (rc == 0)
1655                         RETURN(0);
1656                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1657                         rc = -EIO;
1658         }
1659
1660         if (rc == 0) {
1661                 struct obdo *oa = aa->aa_oa;
1662                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1663                 unsigned long valid = 0;
1664                 struct cl_object *obj;
1665                 struct osc_async_page *last;
1666
1667                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1668                 obj = osc2cl(last->oap_obj);
1669
1670                 cl_object_attr_lock(obj);
1671                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1672                         attr->cat_blocks = oa->o_blocks;
1673                         valid |= CAT_BLOCKS;
1674                 }
1675                 if (oa->o_valid & OBD_MD_FLMTIME) {
1676                         attr->cat_mtime = oa->o_mtime;
1677                         valid |= CAT_MTIME;
1678                 }
1679                 if (oa->o_valid & OBD_MD_FLATIME) {
1680                         attr->cat_atime = oa->o_atime;
1681                         valid |= CAT_ATIME;
1682                 }
1683                 if (oa->o_valid & OBD_MD_FLCTIME) {
1684                         attr->cat_ctime = oa->o_ctime;
1685                         valid |= CAT_CTIME;
1686                 }
1687
1688                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1689                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1690                         loff_t last_off = last->oap_count + last->oap_obj_off +
1691                                 last->oap_page_off;
1692
1693                         /* Change file size if this is an out of quota or
1694                          * direct IO write and it extends the file size */
1695                         if (loi->loi_lvb.lvb_size < last_off) {
1696                                 attr->cat_size = last_off;
1697                                 valid |= CAT_SIZE;
1698                         }
1699                         /* Extend KMS if it's not a lockless write */
1700                         if (loi->loi_kms < last_off &&
1701                             oap2osc_page(last)->ops_srvlock == 0) {
1702                                 attr->cat_kms = last_off;
1703                                 valid |= CAT_KMS;
1704                         }
1705                 }
1706
1707                 if (valid != 0)
1708                         cl_object_attr_update(env, obj, attr, valid);
1709                 cl_object_attr_unlock(obj);
1710         }
1711         OBDO_FREE(aa->aa_oa);
1712
1713         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1714                 osc_inc_unstable_pages(req);
1715
1716         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1717                 list_del_init(&ext->oe_link);
1718                 osc_extent_finish(env, ext, 1, rc);
1719         }
1720         LASSERT(list_empty(&aa->aa_exts));
1721         LASSERT(list_empty(&aa->aa_oaps));
1722
1723         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1724         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1725
1726         spin_lock(&cli->cl_loi_list_lock);
1727         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1728          * is called so we know whether to go to sync BRWs or wait for more
1729          * RPCs to complete */
1730         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1731                 cli->cl_w_in_flight--;
1732         else
1733                 cli->cl_r_in_flight--;
1734         osc_wake_cache_waiters(cli);
1735         spin_unlock(&cli->cl_loi_list_lock);
1736
1737         osc_io_unplug(env, cli, NULL);
1738         RETURN(rc);
1739 }
1740
1741 static void brw_commit(struct ptlrpc_request *req)
1742 {
1743         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1744          * this called via the rq_commit_cb, I need to ensure
1745          * osc_dec_unstable_pages is still called. Otherwise unstable
1746          * pages may be leaked. */
1747         spin_lock(&req->rq_lock);
1748         if (likely(req->rq_unstable)) {
1749                 req->rq_unstable = 0;
1750                 spin_unlock(&req->rq_lock);
1751
1752                 osc_dec_unstable_pages(req);
1753         } else {
1754                 req->rq_committed = 1;
1755                 spin_unlock(&req->rq_lock);
1756         }
1757 }
1758
1759 /**
1760  * Build an RPC by the list of extent @ext_list. The caller must ensure
1761  * that the total pages in this list are NOT over max pages per RPC.
1762  * Extents in the list must be in OES_RPC state.
1763  */
1764 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1765                   struct list_head *ext_list, int cmd)
1766 {
1767         struct ptlrpc_request           *req = NULL;
1768         struct osc_extent               *ext;
1769         struct brw_page                 **pga = NULL;
1770         struct osc_brw_async_args       *aa = NULL;
1771         struct obdo                     *oa = NULL;
1772         struct osc_async_page           *oap;
1773         struct osc_object               *obj = NULL;
1774         struct cl_req_attr              *crattr = NULL;
1775         loff_t                          starting_offset = OBD_OBJECT_EOF;
1776         loff_t                          ending_offset = 0;
1777         int                             mpflag = 0;
1778         int                             mem_tight = 0;
1779         int                             page_count = 0;
1780         bool                            soft_sync = false;
1781         bool                            interrupted = false;
1782         int                             i;
1783         int                             grant = 0;
1784         int                             rc;
1785         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1786         struct ost_body                 *body;
1787         ENTRY;
1788         LASSERT(!list_empty(ext_list));
1789
1790         /* add pages into rpc_list to build BRW rpc */
1791         list_for_each_entry(ext, ext_list, oe_link) {
1792                 LASSERT(ext->oe_state == OES_RPC);
1793                 mem_tight |= ext->oe_memalloc;
1794                 grant += ext->oe_grants;
1795                 page_count += ext->oe_nr_pages;
1796                 if (obj == NULL)
1797                         obj = ext->oe_obj;
1798         }
1799
1800         soft_sync = osc_over_unstable_soft_limit(cli);
1801         if (mem_tight)
1802                 mpflag = cfs_memory_pressure_get_and_set();
1803
1804         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1805         if (pga == NULL)
1806                 GOTO(out, rc = -ENOMEM);
1807
1808         OBDO_ALLOC(oa);
1809         if (oa == NULL)
1810                 GOTO(out, rc = -ENOMEM);
1811
1812         i = 0;
1813         list_for_each_entry(ext, ext_list, oe_link) {
1814                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1815                         if (mem_tight)
1816                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1817                         if (soft_sync)
1818                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1819                         pga[i] = &oap->oap_brw_page;
1820                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1821                         i++;
1822
1823                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1824                         if (starting_offset == OBD_OBJECT_EOF ||
1825                             starting_offset > oap->oap_obj_off)
1826                                 starting_offset = oap->oap_obj_off;
1827                         else
1828                                 LASSERT(oap->oap_page_off == 0);
1829                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1830                                 ending_offset = oap->oap_obj_off +
1831                                                 oap->oap_count;
1832                         else
1833                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1834                                         PAGE_CACHE_SIZE);
1835                         if (oap->oap_interrupted)
1836                                 interrupted = true;
1837                 }
1838         }
1839
1840         /* first page in the list */
1841         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1842
1843         crattr = &osc_env_info(env)->oti_req_attr;
1844         memset(crattr, 0, sizeof(*crattr));
1845         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1846         crattr->cra_flags = ~0ULL;
1847         crattr->cra_page = oap2cl_page(oap);
1848         crattr->cra_oa = oa;
1849         cl_req_attr_set(env, osc2cl(obj), crattr);
1850
1851         if (cmd == OBD_BRW_WRITE)
1852                 oa->o_grant_used = grant;
1853
1854         sort_brw_pages(pga, page_count);
1855         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1856         if (rc != 0) {
1857                 CERROR("prep_req failed: %d\n", rc);
1858                 GOTO(out, rc);
1859         }
1860
1861         req->rq_commit_cb = brw_commit;
1862         req->rq_interpret_reply = brw_interpret;
1863         req->rq_memalloc = mem_tight != 0;
1864         oap->oap_request = ptlrpc_request_addref(req);
1865         if (interrupted && !req->rq_intr)
1866                 ptlrpc_mark_interrupted(req);
1867
1868         /* Need to update the timestamps after the request is built in case
1869          * we race with setattr (locally or in queue at OST).  If OST gets
1870          * later setattr before earlier BRW (as determined by the request xid),
1871          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1872          * way to do this in a single call.  bug 10150 */
1873         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1874         crattr->cra_oa = &body->oa;
1875         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1876         cl_req_attr_set(env, osc2cl(obj), crattr);
1877         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1878
1879         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1880         aa = ptlrpc_req_async_args(req);
1881         INIT_LIST_HEAD(&aa->aa_oaps);
1882         list_splice_init(&rpc_list, &aa->aa_oaps);
1883         INIT_LIST_HEAD(&aa->aa_exts);
1884         list_splice_init(ext_list, &aa->aa_exts);
1885
1886         spin_lock(&cli->cl_loi_list_lock);
1887         starting_offset >>= PAGE_CACHE_SHIFT;
1888         if (cmd == OBD_BRW_READ) {
1889                 cli->cl_r_in_flight++;
1890                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893                                       starting_offset + 1);
1894         } else {
1895                 cli->cl_w_in_flight++;
1896                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899                                       starting_offset + 1);
1900         }
1901         spin_unlock(&cli->cl_loi_list_lock);
1902
1903         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904                   page_count, aa, cli->cl_r_in_flight,
1905                   cli->cl_w_in_flight);
1906
1907         ptlrpcd_add_req(req);
1908         rc = 0;
1909         EXIT;
1910
1911 out:
1912         if (mem_tight != 0)
1913                 cfs_memory_pressure_restore(mpflag);
1914
1915         if (rc != 0) {
1916                 LASSERT(req == NULL);
1917
1918                 if (oa)
1919                         OBDO_FREE(oa);
1920                 if (pga)
1921                         OBD_FREE(pga, sizeof(*pga) * page_count);
1922                 /* this should happen rarely and is pretty bad, it makes the
1923                  * pending list not follow the dirty order */
1924                 while (!list_empty(ext_list)) {
1925                         ext = list_entry(ext_list->next, struct osc_extent,
1926                                          oe_link);
1927                         list_del_init(&ext->oe_link);
1928                         osc_extent_finish(env, ext, 0, rc);
1929                 }
1930         }
1931         RETURN(rc);
1932 }
1933
1934 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1935                                         struct ldlm_enqueue_info *einfo)
1936 {
1937         void *data = einfo->ei_cbdata;
1938         int set = 0;
1939
1940         LASSERT(lock != NULL);
1941         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1942         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1943         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1944         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1945
1946         lock_res_and_lock(lock);
1947
1948         if (lock->l_ast_data == NULL)
1949                 lock->l_ast_data = data;
1950         if (lock->l_ast_data == data)
1951                 set = 1;
1952
1953         unlock_res_and_lock(lock);
1954
1955         return set;
1956 }
1957
1958 static int osc_set_data_with_check(struct lustre_handle *lockh,
1959                                    struct ldlm_enqueue_info *einfo)
1960 {
1961         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1962         int set = 0;
1963
1964         if (lock != NULL) {
1965                 set = osc_set_lock_data_with_check(lock, einfo);
1966                 LDLM_LOCK_PUT(lock);
1967         } else
1968                 CERROR("lockh %p, data %p - client evicted?\n",
1969                        lockh, einfo->ei_cbdata);
1970         return set;
1971 }
1972
1973 static int osc_enqueue_fini(struct ptlrpc_request *req,
1974                             osc_enqueue_upcall_f upcall, void *cookie,
1975                             struct lustre_handle *lockh, enum ldlm_mode mode,
1976                             __u64 *flags, int agl, int errcode)
1977 {
1978         bool intent = *flags & LDLM_FL_HAS_INTENT;
1979         int rc;
1980         ENTRY;
1981
1982         /* The request was created before ldlm_cli_enqueue call. */
1983         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1984                 struct ldlm_reply *rep;
1985
1986                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1987                 LASSERT(rep != NULL);
1988
1989                 rep->lock_policy_res1 =
1990                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1991                 if (rep->lock_policy_res1)
1992                         errcode = rep->lock_policy_res1;
1993                 if (!agl)
1994                         *flags |= LDLM_FL_LVB_READY;
1995         } else if (errcode == ELDLM_OK) {
1996                 *flags |= LDLM_FL_LVB_READY;
1997         }
1998
1999         /* Call the update callback. */
2000         rc = (*upcall)(cookie, lockh, errcode);
2001
2002         /* release the reference taken in ldlm_cli_enqueue() */
2003         if (errcode == ELDLM_LOCK_MATCHED)
2004                 errcode = ELDLM_OK;
2005         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2006                 ldlm_lock_decref(lockh, mode);
2007
2008         RETURN(rc);
2009 }
2010
2011 static int osc_enqueue_interpret(const struct lu_env *env,
2012                                  struct ptlrpc_request *req,
2013                                  struct osc_enqueue_args *aa, int rc)
2014 {
2015         struct ldlm_lock *lock;
2016         struct lustre_handle *lockh = &aa->oa_lockh;
2017         enum ldlm_mode mode = aa->oa_mode;
2018         struct ost_lvb *lvb = aa->oa_lvb;
2019         __u32 lvb_len = sizeof(*lvb);
2020         __u64 flags = 0;
2021
2022         ENTRY;
2023
2024         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2025          * be valid. */
2026         lock = ldlm_handle2lock(lockh);
2027         LASSERTF(lock != NULL,
2028                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2029                  lockh->cookie, req, aa);
2030
2031         /* Take an additional reference so that a blocking AST that
2032          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2033          * to arrive after an upcall has been executed by
2034          * osc_enqueue_fini(). */
2035         ldlm_lock_addref(lockh, mode);
2036
2037         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2038         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2039
2040         /* Let CP AST to grant the lock first. */
2041         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2042
2043         if (aa->oa_agl) {
2044                 LASSERT(aa->oa_lvb == NULL);
2045                 LASSERT(aa->oa_flags == NULL);
2046                 aa->oa_flags = &flags;
2047         }
2048
2049         /* Complete obtaining the lock procedure. */
2050         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2051                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2052                                    lockh, rc);
2053         /* Complete osc stuff. */
2054         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2055                               aa->oa_flags, aa->oa_agl, rc);
2056
2057         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2058
2059         ldlm_lock_decref(lockh, mode);
2060         LDLM_LOCK_PUT(lock);
2061         RETURN(rc);
2062 }
2063
2064 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2065
2066 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2067  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2068  * other synchronous requests, however keeping some locks and trying to obtain
2069  * others may take a considerable amount of time in a case of ost failure; and
2070  * when other sync requests do not get released lock from a client, the client
2071  * is evicted from the cluster -- such scenarious make the life difficult, so
2072  * release locks just after they are obtained. */
2073 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2074                      __u64 *flags, union ldlm_policy_data *policy,
2075                      struct ost_lvb *lvb, int kms_valid,
2076                      osc_enqueue_upcall_f upcall, void *cookie,
2077                      struct ldlm_enqueue_info *einfo,
2078                      struct ptlrpc_request_set *rqset, int async, int agl)
2079 {
2080         struct obd_device *obd = exp->exp_obd;
2081         struct lustre_handle lockh = { 0 };
2082         struct ptlrpc_request *req = NULL;
2083         int intent = *flags & LDLM_FL_HAS_INTENT;
2084         __u64 match_flags = *flags;
2085         enum ldlm_mode mode;
2086         int rc;
2087         ENTRY;
2088
2089         /* Filesystem lock extents are extended to page boundaries so that
2090          * dealing with the page cache is a little smoother.  */
2091         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2092         policy->l_extent.end |= ~PAGE_MASK;
2093
2094         /*
2095          * kms is not valid when either object is completely fresh (so that no
2096          * locks are cached), or object was evicted. In the latter case cached
2097          * lock cannot be used, because it would prime inode state with
2098          * potentially stale LVB.
2099          */
2100         if (!kms_valid)
2101                 goto no_match;
2102
2103         /* Next, search for already existing extent locks that will cover us */
2104         /* If we're trying to read, we also search for an existing PW lock.  The
2105          * VFS and page cache already protect us locally, so lots of readers/
2106          * writers can share a single PW lock.
2107          *
2108          * There are problems with conversion deadlocks, so instead of
2109          * converting a read lock to a write lock, we'll just enqueue a new
2110          * one.
2111          *
2112          * At some point we should cancel the read lock instead of making them
2113          * send us a blocking callback, but there are problems with canceling
2114          * locks out from other users right now, too. */
2115         mode = einfo->ei_mode;
2116         if (einfo->ei_mode == LCK_PR)
2117                 mode |= LCK_PW;
2118         if (agl == 0)
2119                 match_flags |= LDLM_FL_LVB_READY;
2120         if (intent != 0)
2121                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2122         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2123                                einfo->ei_type, policy, mode, &lockh, 0);
2124         if (mode) {
2125                 struct ldlm_lock *matched;
2126
2127                 if (*flags & LDLM_FL_TEST_LOCK)
2128                         RETURN(ELDLM_OK);
2129
2130                 matched = ldlm_handle2lock(&lockh);
2131                 if (agl) {
2132                         /* AGL enqueues DLM locks speculatively. Therefore if
2133                          * it already exists a DLM lock, it wll just inform the
2134                          * caller to cancel the AGL process for this stripe. */
2135                         ldlm_lock_decref(&lockh, mode);
2136                         LDLM_LOCK_PUT(matched);
2137                         RETURN(-ECANCELED);
2138                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2139                         *flags |= LDLM_FL_LVB_READY;
2140
2141                         /* We already have a lock, and it's referenced. */
2142                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2143
2144                         ldlm_lock_decref(&lockh, mode);
2145                         LDLM_LOCK_PUT(matched);
2146                         RETURN(ELDLM_OK);
2147                 } else {
2148                         ldlm_lock_decref(&lockh, mode);
2149                         LDLM_LOCK_PUT(matched);
2150                 }
2151         }
2152
2153 no_match:
2154         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2155                 RETURN(-ENOLCK);
2156
2157         if (intent) {
2158                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2159                                            &RQF_LDLM_ENQUEUE_LVB);
2160                 if (req == NULL)
2161                         RETURN(-ENOMEM);
2162
2163                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2164                 if (rc) {
2165                         ptlrpc_request_free(req);
2166                         RETURN(rc);
2167                 }
2168
2169                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2170                                      sizeof *lvb);
2171                 ptlrpc_request_set_replen(req);
2172         }
2173
2174         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2175         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2176
2177         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2178                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2179         if (async) {
2180                 if (!rc) {
2181                         struct osc_enqueue_args *aa;
2182                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2183                         aa = ptlrpc_req_async_args(req);
2184                         aa->oa_exp    = exp;
2185                         aa->oa_mode   = einfo->ei_mode;
2186                         aa->oa_type   = einfo->ei_type;
2187                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2188                         aa->oa_upcall = upcall;
2189                         aa->oa_cookie = cookie;
2190                         aa->oa_agl    = !!agl;
2191                         if (!agl) {
2192                                 aa->oa_flags  = flags;
2193                                 aa->oa_lvb    = lvb;
2194                         } else {
2195                                 /* AGL is essentially to enqueue an DLM lock
2196                                  * in advance, so we don't care about the
2197                                  * result of AGL enqueue. */
2198                                 aa->oa_lvb    = NULL;
2199                                 aa->oa_flags  = NULL;
2200                         }
2201
2202                         req->rq_interpret_reply =
2203                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2204                         if (rqset == PTLRPCD_SET)
2205                                 ptlrpcd_add_req(req);
2206                         else
2207                                 ptlrpc_set_add_req(rqset, req);
2208                 } else if (intent) {
2209                         ptlrpc_req_finished(req);
2210                 }
2211                 RETURN(rc);
2212         }
2213
2214         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2215                               flags, agl, rc);
2216         if (intent)
2217                 ptlrpc_req_finished(req);
2218
2219         RETURN(rc);
2220 }
2221
2222 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2223                    enum ldlm_type type, union ldlm_policy_data *policy,
2224                    enum ldlm_mode mode, __u64 *flags, void *data,
2225                    struct lustre_handle *lockh, int unref)
2226 {
2227         struct obd_device *obd = exp->exp_obd;
2228         __u64 lflags = *flags;
2229         enum ldlm_mode rc;
2230         ENTRY;
2231
2232         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2233                 RETURN(-EIO);
2234
2235         /* Filesystem lock extents are extended to page boundaries so that
2236          * dealing with the page cache is a little smoother */
2237         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2238         policy->l_extent.end |= ~PAGE_MASK;
2239
2240         /* Next, search for already existing extent locks that will cover us */
2241         /* If we're trying to read, we also search for an existing PW lock.  The
2242          * VFS and page cache already protect us locally, so lots of readers/
2243          * writers can share a single PW lock. */
2244         rc = mode;
2245         if (mode == LCK_PR)
2246                 rc |= LCK_PW;
2247         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2248                              res_id, type, policy, rc, lockh, unref);
2249         if (rc) {
2250                 if (data != NULL) {
2251                         if (!osc_set_data_with_check(lockh, data)) {
2252                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2253                                         ldlm_lock_decref(lockh, rc);
2254                                 RETURN(0);
2255                         }
2256                 }
2257                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2258                         ldlm_lock_addref(lockh, LCK_PR);
2259                         ldlm_lock_decref(lockh, LCK_PW);
2260                 }
2261                 RETURN(rc);
2262         }
2263         RETURN(rc);
2264 }
2265
2266 static int osc_statfs_interpret(const struct lu_env *env,
2267                                 struct ptlrpc_request *req,
2268                                 struct osc_async_args *aa, int rc)
2269 {
2270         struct obd_statfs *msfs;
2271         ENTRY;
2272
2273         if (rc == -EBADR)
2274                 /* The request has in fact never been sent
2275                  * due to issues at a higher level (LOV).
2276                  * Exit immediately since the caller is
2277                  * aware of the problem and takes care
2278                  * of the clean up */
2279                  RETURN(rc);
2280
2281         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2282             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2283                 GOTO(out, rc = 0);
2284
2285         if (rc != 0)
2286                 GOTO(out, rc);
2287
2288         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2289         if (msfs == NULL) {
2290                 GOTO(out, rc = -EPROTO);
2291         }
2292
2293         *aa->aa_oi->oi_osfs = *msfs;
2294 out:
2295         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2296         RETURN(rc);
2297 }
2298
2299 static int osc_statfs_async(struct obd_export *exp,
2300                             struct obd_info *oinfo, __u64 max_age,
2301                             struct ptlrpc_request_set *rqset)
2302 {
2303         struct obd_device     *obd = class_exp2obd(exp);
2304         struct ptlrpc_request *req;
2305         struct osc_async_args *aa;
2306         int                    rc;
2307         ENTRY;
2308
2309         /* We could possibly pass max_age in the request (as an absolute
2310          * timestamp or a "seconds.usec ago") so the target can avoid doing
2311          * extra calls into the filesystem if that isn't necessary (e.g.
2312          * during mount that would help a bit).  Having relative timestamps
2313          * is not so great if request processing is slow, while absolute
2314          * timestamps are not ideal because they need time synchronization. */
2315         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2316         if (req == NULL)
2317                 RETURN(-ENOMEM);
2318
2319         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2320         if (rc) {
2321                 ptlrpc_request_free(req);
2322                 RETURN(rc);
2323         }
2324         ptlrpc_request_set_replen(req);
2325         req->rq_request_portal = OST_CREATE_PORTAL;
2326         ptlrpc_at_set_req_timeout(req);
2327
2328         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2329                 /* procfs requests not want stat in wait for avoid deadlock */
2330                 req->rq_no_resend = 1;
2331                 req->rq_no_delay = 1;
2332         }
2333
2334         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2335         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2336         aa = ptlrpc_req_async_args(req);
2337         aa->aa_oi = oinfo;
2338
2339         ptlrpc_set_add_req(rqset, req);
2340         RETURN(0);
2341 }
2342
2343 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2344                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2345 {
2346         struct obd_device     *obd = class_exp2obd(exp);
2347         struct obd_statfs     *msfs;
2348         struct ptlrpc_request *req;
2349         struct obd_import     *imp = NULL;
2350         int rc;
2351         ENTRY;
2352
2353         /*Since the request might also come from lprocfs, so we need
2354          *sync this with client_disconnect_export Bug15684*/
2355         down_read(&obd->u.cli.cl_sem);
2356         if (obd->u.cli.cl_import)
2357                 imp = class_import_get(obd->u.cli.cl_import);
2358         up_read(&obd->u.cli.cl_sem);
2359         if (!imp)
2360                 RETURN(-ENODEV);
2361
2362         /* We could possibly pass max_age in the request (as an absolute
2363          * timestamp or a "seconds.usec ago") so the target can avoid doing
2364          * extra calls into the filesystem if that isn't necessary (e.g.
2365          * during mount that would help a bit).  Having relative timestamps
2366          * is not so great if request processing is slow, while absolute
2367          * timestamps are not ideal because they need time synchronization. */
2368         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2369
2370         class_import_put(imp);
2371
2372         if (req == NULL)
2373                 RETURN(-ENOMEM);
2374
2375         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2376         if (rc) {
2377                 ptlrpc_request_free(req);
2378                 RETURN(rc);
2379         }
2380         ptlrpc_request_set_replen(req);
2381         req->rq_request_portal = OST_CREATE_PORTAL;
2382         ptlrpc_at_set_req_timeout(req);
2383
2384         if (flags & OBD_STATFS_NODELAY) {
2385                 /* procfs requests not want stat in wait for avoid deadlock */
2386                 req->rq_no_resend = 1;
2387                 req->rq_no_delay = 1;
2388         }
2389
2390         rc = ptlrpc_queue_wait(req);
2391         if (rc)
2392                 GOTO(out, rc);
2393
2394         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2395         if (msfs == NULL) {
2396                 GOTO(out, rc = -EPROTO);
2397         }
2398
2399         *osfs = *msfs;
2400
2401         EXIT;
2402  out:
2403         ptlrpc_req_finished(req);
2404         return rc;
2405 }
2406
2407 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2408                          void *karg, void __user *uarg)
2409 {
2410         struct obd_device *obd = exp->exp_obd;
2411         struct obd_ioctl_data *data = karg;
2412         int err = 0;
2413         ENTRY;
2414
2415         if (!try_module_get(THIS_MODULE)) {
2416                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2417                        module_name(THIS_MODULE));
2418                 return -EINVAL;
2419         }
2420         switch (cmd) {
2421         case OBD_IOC_CLIENT_RECOVER:
2422                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2423                                             data->ioc_inlbuf1, 0);
2424                 if (err > 0)
2425                         err = 0;
2426                 GOTO(out, err);
2427         case IOC_OSC_SET_ACTIVE:
2428                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2429                                                data->ioc_offset);
2430                 GOTO(out, err);
2431         case OBD_IOC_PING_TARGET:
2432                 err = ptlrpc_obd_ping(obd);
2433                 GOTO(out, err);
2434         default:
2435                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2436                        cmd, current_comm());
2437                 GOTO(out, err = -ENOTTY);
2438         }
2439 out:
2440         module_put(THIS_MODULE);
2441         return err;
2442 }
2443
2444 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2445                               u32 keylen, void *key,
2446                               u32 vallen, void *val,
2447                               struct ptlrpc_request_set *set)
2448 {
2449         struct ptlrpc_request *req;
2450         struct obd_device     *obd = exp->exp_obd;
2451         struct obd_import     *imp = class_exp2cliimp(exp);
2452         char                  *tmp;
2453         int                    rc;
2454         ENTRY;
2455
2456         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2457
2458         if (KEY_IS(KEY_CHECKSUM)) {
2459                 if (vallen != sizeof(int))
2460                         RETURN(-EINVAL);
2461                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2462                 RETURN(0);
2463         }
2464
2465         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2466                 sptlrpc_conf_client_adapt(obd);
2467                 RETURN(0);
2468         }
2469
2470         if (KEY_IS(KEY_FLUSH_CTX)) {
2471                 sptlrpc_import_flush_my_ctx(imp);
2472                 RETURN(0);
2473         }
2474
2475         if (KEY_IS(KEY_CACHE_SET)) {
2476                 struct client_obd *cli = &obd->u.cli;
2477
2478                 LASSERT(cli->cl_cache == NULL); /* only once */
2479                 cli->cl_cache = (struct cl_client_cache *)val;
2480                 cl_cache_incref(cli->cl_cache);
2481                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2482
2483                 /* add this osc into entity list */
2484                 LASSERT(list_empty(&cli->cl_lru_osc));
2485                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2486                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2487                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2488
2489                 RETURN(0);
2490         }
2491
2492         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2493                 struct client_obd *cli = &obd->u.cli;
2494                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2495                 long target = *(long *)val;
2496
2497                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2498                 *(long *)val -= nr;
2499                 RETURN(0);
2500         }
2501
2502         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2503                 RETURN(-EINVAL);
2504
2505         /* We pass all other commands directly to OST. Since nobody calls osc
2506            methods directly and everybody is supposed to go through LOV, we
2507            assume lov checked invalid values for us.
2508            The only recognised values so far are evict_by_nid and mds_conn.
2509            Even if something bad goes through, we'd get a -EINVAL from OST
2510            anyway. */
2511
2512         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2513                                                 &RQF_OST_SET_GRANT_INFO :
2514                                                 &RQF_OBD_SET_INFO);
2515         if (req == NULL)
2516                 RETURN(-ENOMEM);
2517
2518         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2519                              RCL_CLIENT, keylen);
2520         if (!KEY_IS(KEY_GRANT_SHRINK))
2521                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2522                                      RCL_CLIENT, vallen);
2523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2524         if (rc) {
2525                 ptlrpc_request_free(req);
2526                 RETURN(rc);
2527         }
2528
2529         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2530         memcpy(tmp, key, keylen);
2531         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2532                                                         &RMF_OST_BODY :
2533                                                         &RMF_SETINFO_VAL);
2534         memcpy(tmp, val, vallen);
2535
2536         if (KEY_IS(KEY_GRANT_SHRINK)) {
2537                 struct osc_grant_args *aa;
2538                 struct obdo *oa;
2539
2540                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2541                 aa = ptlrpc_req_async_args(req);
2542                 OBDO_ALLOC(oa);
2543                 if (!oa) {
2544                         ptlrpc_req_finished(req);
2545                         RETURN(-ENOMEM);
2546                 }
2547                 *oa = ((struct ost_body *)val)->oa;
2548                 aa->aa_oa = oa;
2549                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2550         }
2551
2552         ptlrpc_request_set_replen(req);
2553         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2554                 LASSERT(set != NULL);
2555                 ptlrpc_set_add_req(set, req);
2556                 ptlrpc_check_set(NULL, set);
2557         } else {
2558                 ptlrpcd_add_req(req);
2559         }
2560
2561         RETURN(0);
2562 }
2563
2564 static int osc_reconnect(const struct lu_env *env,
2565                          struct obd_export *exp, struct obd_device *obd,
2566                          struct obd_uuid *cluuid,
2567                          struct obd_connect_data *data,
2568                          void *localdata)
2569 {
2570         struct client_obd *cli = &obd->u.cli;
2571
2572         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2573                 long lost_grant;
2574                 long grant;
2575
2576                 spin_lock(&cli->cl_loi_list_lock);
2577                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2578                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2579                         grant += cli->cl_dirty_grant;
2580                 else
2581                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2582                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2583                 lost_grant = cli->cl_lost_grant;
2584                 cli->cl_lost_grant = 0;
2585                 spin_unlock(&cli->cl_loi_list_lock);
2586
2587                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2588                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2589                        data->ocd_version, data->ocd_grant, lost_grant);
2590         }
2591
2592         RETURN(0);
2593 }
2594
2595 static int osc_disconnect(struct obd_export *exp)
2596 {
2597         struct obd_device *obd = class_exp2obd(exp);
2598         int rc;
2599
2600         rc = client_disconnect_export(exp);
2601         /**
2602          * Initially we put del_shrink_grant before disconnect_export, but it
2603          * causes the following problem if setup (connect) and cleanup
2604          * (disconnect) are tangled together.
2605          *      connect p1                     disconnect p2
2606          *   ptlrpc_connect_import
2607          *     ...............               class_manual_cleanup
2608          *                                     osc_disconnect
2609          *                                     del_shrink_grant
2610          *   ptlrpc_connect_interrupt
2611          *     init_grant_shrink
2612          *   add this client to shrink list
2613          *                                      cleanup_osc
2614          * Bang! pinger trigger the shrink.
2615          * So the osc should be disconnected from the shrink list, after we
2616          * are sure the import has been destroyed. BUG18662
2617          */
2618         if (obd->u.cli.cl_import == NULL)
2619                 osc_del_shrink_grant(&obd->u.cli);
2620         return rc;
2621 }
2622
2623 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2624         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2625 {
2626         struct lu_env *env = arg;
2627         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2628         struct ldlm_lock *lock;
2629         struct osc_object *osc = NULL;
2630         ENTRY;
2631
2632         lock_res(res);
2633         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2634                 if (lock->l_ast_data != NULL && osc == NULL) {
2635                         osc = lock->l_ast_data;
2636                         cl_object_get(osc2cl(osc));
2637                 }
2638
2639                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2640                  * by the 2nd round of ldlm_namespace_clean() call in
2641                  * osc_import_event(). */
2642                 ldlm_clear_cleaned(lock);
2643         }
2644         unlock_res(res);
2645
2646         if (osc != NULL) {
2647                 osc_object_invalidate(env, osc);
2648                 cl_object_put(env, osc2cl(osc));
2649         }
2650
2651         RETURN(0);
2652 }
2653
2654 static int osc_import_event(struct obd_device *obd,
2655                             struct obd_import *imp,
2656                             enum obd_import_event event)
2657 {
2658         struct client_obd *cli;
2659         int rc = 0;
2660
2661         ENTRY;
2662         LASSERT(imp->imp_obd == obd);
2663
2664         switch (event) {
2665         case IMP_EVENT_DISCON: {
2666                 cli = &obd->u.cli;
2667                 spin_lock(&cli->cl_loi_list_lock);
2668                 cli->cl_avail_grant = 0;
2669                 cli->cl_lost_grant = 0;
2670                 spin_unlock(&cli->cl_loi_list_lock);
2671                 break;
2672         }
2673         case IMP_EVENT_INACTIVE: {
2674                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2675                 break;
2676         }
2677         case IMP_EVENT_INVALIDATE: {
2678                 struct ldlm_namespace *ns = obd->obd_namespace;
2679                 struct lu_env         *env;
2680                 __u16                  refcheck;
2681
2682                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2683
2684                 env = cl_env_get(&refcheck);
2685                 if (!IS_ERR(env)) {
2686                         osc_io_unplug(env, &obd->u.cli, NULL);
2687
2688                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2689                                                  osc_ldlm_resource_invalidate,
2690                                                  env, 0);
2691                         cl_env_put(env, &refcheck);
2692
2693                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2694                 } else
2695                         rc = PTR_ERR(env);
2696                 break;
2697         }
2698         case IMP_EVENT_ACTIVE: {
2699                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2700                 break;
2701         }
2702         case IMP_EVENT_OCD: {
2703                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2704
2705                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2706                         osc_init_grant(&obd->u.cli, ocd);
2707
2708                 /* See bug 7198 */
2709                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2710                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2711
2712                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2713                 break;
2714         }
2715         case IMP_EVENT_DEACTIVATE: {
2716                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2717                 break;
2718         }
2719         case IMP_EVENT_ACTIVATE: {
2720                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2721                 break;
2722         }
2723         default:
2724                 CERROR("Unknown import event %d\n", event);
2725                 LBUG();
2726         }
2727         RETURN(rc);
2728 }
2729
2730 /**
2731  * Determine whether the lock can be canceled before replaying the lock
2732  * during recovery, see bug16774 for detailed information.
2733  *
2734  * \retval zero the lock can't be canceled
2735  * \retval other ok to cancel
2736  */
2737 static int osc_cancel_weight(struct ldlm_lock *lock)
2738 {
2739         /*
2740          * Cancel all unused and granted extent lock.
2741          */
2742         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2743             lock->l_granted_mode == lock->l_req_mode &&
2744             osc_ldlm_weigh_ast(lock) == 0)
2745                 RETURN(1);
2746
2747         RETURN(0);
2748 }
2749
2750 static int brw_queue_work(const struct lu_env *env, void *data)
2751 {
2752         struct client_obd *cli = data;
2753
2754         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2755
2756         osc_io_unplug(env, cli, NULL);
2757         RETURN(0);
2758 }
2759
2760 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2761 {
2762         struct client_obd *cli = &obd->u.cli;
2763         struct obd_type   *type;
2764         void              *handler;
2765         int                rc;
2766         int                adding;
2767         int                added;
2768         int                req_count;
2769         ENTRY;
2770
2771         rc = ptlrpcd_addref();
2772         if (rc)
2773                 RETURN(rc);
2774
2775         rc = client_obd_setup(obd, lcfg);
2776         if (rc)
2777                 GOTO(out_ptlrpcd, rc);
2778
2779         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2780         if (IS_ERR(handler))
2781                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2782         cli->cl_writeback_work = handler;
2783
2784         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2785         if (IS_ERR(handler))
2786                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2787         cli->cl_lru_work = handler;
2788
2789         rc = osc_quota_setup(obd);
2790         if (rc)
2791                 GOTO(out_ptlrpcd_work, rc);
2792
2793         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2794
2795 #ifdef CONFIG_PROC_FS
2796         obd->obd_vars = lprocfs_osc_obd_vars;
2797 #endif
2798         /* If this is true then both client (osc) and server (osp) are on the
2799          * same node. The osp layer if loaded first will register the osc proc
2800          * directory. In that case this obd_device will be attached its proc
2801          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2802         type = class_search_type(LUSTRE_OSP_NAME);
2803         if (type && type->typ_procsym) {
2804                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2805                                                        type->typ_procsym,
2806                                                        obd->obd_vars, obd);
2807                 if (IS_ERR(obd->obd_proc_entry)) {
2808                         rc = PTR_ERR(obd->obd_proc_entry);
2809                         CERROR("error %d setting up lprocfs for %s\n", rc,
2810                                obd->obd_name);
2811                         obd->obd_proc_entry = NULL;
2812                 }
2813         } else {
2814                 rc = lprocfs_obd_setup(obd);
2815         }
2816
2817         /* If the basic OSC proc tree construction succeeded then
2818          * lets do the rest. */
2819         if (rc == 0) {
2820                 lproc_osc_attach_seqstat(obd);
2821                 sptlrpc_lprocfs_cliobd_attach(obd);
2822                 ptlrpc_lprocfs_register_obd(obd);
2823         }
2824
2825         /*
2826          * We try to control the total number of requests with a upper limit
2827          * osc_reqpool_maxreqcount. There might be some race which will cause
2828          * over-limit allocation, but it is fine.
2829          */
2830         req_count = atomic_read(&osc_pool_req_count);
2831         if (req_count < osc_reqpool_maxreqcount) {
2832                 adding = cli->cl_max_rpcs_in_flight + 2;
2833                 if (req_count + adding > osc_reqpool_maxreqcount)
2834                         adding = osc_reqpool_maxreqcount - req_count;
2835
2836                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2837                 atomic_add(added, &osc_pool_req_count);
2838         }
2839
2840         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2841         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2842
2843         spin_lock(&osc_shrink_lock);
2844         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2845         spin_unlock(&osc_shrink_lock);
2846
2847         RETURN(0);
2848
2849 out_ptlrpcd_work:
2850         if (cli->cl_writeback_work != NULL) {
2851                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2852                 cli->cl_writeback_work = NULL;
2853         }
2854         if (cli->cl_lru_work != NULL) {
2855                 ptlrpcd_destroy_work(cli->cl_lru_work);
2856                 cli->cl_lru_work = NULL;
2857         }
2858 out_client_setup:
2859         client_obd_cleanup(obd);
2860 out_ptlrpcd:
2861         ptlrpcd_decref();
2862         RETURN(rc);
2863 }
2864
2865 static int osc_precleanup(struct obd_device *obd)
2866 {
2867         struct client_obd *cli = &obd->u.cli;
2868         ENTRY;
2869
2870         /* LU-464
2871          * for echo client, export may be on zombie list, wait for
2872          * zombie thread to cull it, because cli.cl_import will be
2873          * cleared in client_disconnect_export():
2874          *   class_export_destroy() -> obd_cleanup() ->
2875          *   echo_device_free() -> echo_client_cleanup() ->
2876          *   obd_disconnect() -> osc_disconnect() ->
2877          *   client_disconnect_export()
2878          */
2879         obd_zombie_barrier();
2880         if (cli->cl_writeback_work) {
2881                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2882                 cli->cl_writeback_work = NULL;
2883         }
2884
2885         if (cli->cl_lru_work) {
2886                 ptlrpcd_destroy_work(cli->cl_lru_work);
2887                 cli->cl_lru_work = NULL;
2888         }
2889
2890         obd_cleanup_client_import(obd);
2891         ptlrpc_lprocfs_unregister_obd(obd);
2892         lprocfs_obd_cleanup(obd);
2893         RETURN(0);
2894 }
2895
2896 int osc_cleanup(struct obd_device *obd)
2897 {
2898         struct client_obd *cli = &obd->u.cli;
2899         int rc;
2900
2901         ENTRY;
2902
2903         spin_lock(&osc_shrink_lock);
2904         list_del(&cli->cl_shrink_list);
2905         spin_unlock(&osc_shrink_lock);
2906
2907         /* lru cleanup */
2908         if (cli->cl_cache != NULL) {
2909                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2910                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2911                 list_del_init(&cli->cl_lru_osc);
2912                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2913                 cli->cl_lru_left = NULL;
2914                 cl_cache_decref(cli->cl_cache);
2915                 cli->cl_cache = NULL;
2916         }
2917
2918         /* free memory of osc quota cache */
2919         osc_quota_cleanup(obd);
2920
2921         rc = client_obd_cleanup(obd);
2922
2923         ptlrpcd_decref();
2924         RETURN(rc);
2925 }
2926
2927 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2928 {
2929         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2930         return rc > 0 ? 0: rc;
2931 }
2932
2933 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2934 {
2935         return osc_process_config_base(obd, buf);
2936 }
2937
2938 static struct obd_ops osc_obd_ops = {
2939         .o_owner                = THIS_MODULE,
2940         .o_setup                = osc_setup,
2941         .o_precleanup           = osc_precleanup,
2942         .o_cleanup              = osc_cleanup,
2943         .o_add_conn             = client_import_add_conn,
2944         .o_del_conn             = client_import_del_conn,
2945         .o_connect              = client_connect_import,
2946         .o_reconnect            = osc_reconnect,
2947         .o_disconnect           = osc_disconnect,
2948         .o_statfs               = osc_statfs,
2949         .o_statfs_async         = osc_statfs_async,
2950         .o_create               = osc_create,
2951         .o_destroy              = osc_destroy,
2952         .o_getattr              = osc_getattr,
2953         .o_setattr              = osc_setattr,
2954         .o_iocontrol            = osc_iocontrol,
2955         .o_set_info_async       = osc_set_info_async,
2956         .o_import_event         = osc_import_event,
2957         .o_process_config       = osc_process_config,
2958         .o_quotactl             = osc_quotactl,
2959 };
2960
2961 static struct shrinker *osc_cache_shrinker;
2962 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2963 DEFINE_SPINLOCK(osc_shrink_lock);
2964
2965 #ifndef HAVE_SHRINKER_COUNT
2966 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2967 {
2968         struct shrink_control scv = {
2969                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2970                 .gfp_mask   = shrink_param(sc, gfp_mask)
2971         };
2972 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2973         struct shrinker *shrinker = NULL;
2974 #endif
2975
2976         (void)osc_cache_shrink_scan(shrinker, &scv);
2977
2978         return osc_cache_shrink_count(shrinker, &scv);
2979 }
2980 #endif
2981
2982 static int __init osc_init(void)
2983 {
2984         bool enable_proc = true;
2985         struct obd_type *type;
2986         unsigned int reqpool_size;
2987         unsigned int reqsize;
2988         int rc;
2989         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2990                          osc_cache_shrink_count, osc_cache_shrink_scan);
2991         ENTRY;
2992
2993         /* print an address of _any_ initialized kernel symbol from this
2994          * module, to allow debugging with gdb that doesn't support data
2995          * symbols from modules.*/
2996         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2997
2998         rc = lu_kmem_init(osc_caches);
2999         if (rc)
3000                 RETURN(rc);
3001
3002         type = class_search_type(LUSTRE_OSP_NAME);
3003         if (type != NULL && type->typ_procsym != NULL)
3004                 enable_proc = false;
3005
3006         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3007                                  LUSTRE_OSC_NAME, &osc_device_type);
3008         if (rc)
3009                 GOTO(out_kmem, rc);
3010
3011         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3012
3013         /* This is obviously too much memory, only prevent overflow here */
3014         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3015                 GOTO(out_type, rc = -EINVAL);
3016
3017         reqpool_size = osc_reqpool_mem_max << 20;
3018
3019         reqsize = 1;
3020         while (reqsize < OST_IO_MAXREQSIZE)
3021                 reqsize = reqsize << 1;
3022
3023         /*
3024          * We don't enlarge the request count in OSC pool according to
3025          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3026          * tried after normal allocation failed. So a small OSC pool won't
3027          * cause much performance degression in most of cases.
3028          */
3029         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3030
3031         atomic_set(&osc_pool_req_count, 0);
3032         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3033                                           ptlrpc_add_rqs_to_pool);
3034
3035         if (osc_rq_pool != NULL)
3036                 GOTO(out, rc);
3037         rc = -ENOMEM;
3038 out_type:
3039         class_unregister_type(LUSTRE_OSC_NAME);
3040 out_kmem:
3041         lu_kmem_fini(osc_caches);
3042 out:
3043         RETURN(rc);
3044 }
3045
3046 static void __exit osc_exit(void)
3047 {
3048         remove_shrinker(osc_cache_shrinker);
3049         class_unregister_type(LUSTRE_OSC_NAME);
3050         lu_kmem_fini(osc_caches);
3051         ptlrpc_free_rq_pool(osc_rq_pool);
3052 }
3053
3054 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3055 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3056 MODULE_VERSION(LUSTRE_VERSION_STRING);
3057 MODULE_LICENSE("GPL");
3058
3059 module_init(osc_init);
3060 module_exit(osc_exit);