Whamcloud - gitweb
LU-8175 ldlm: conflicting PW & PR extent locks on a client
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre/lustre_user.h>
42
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
52 #include <obd.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
55
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
58
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
62
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
66
67 struct osc_brw_async_args {
68         struct obdo              *aa_oa;
69         int                       aa_requested_nob;
70         int                       aa_nio_count;
71         u32                       aa_page_count;
72         int                       aa_resends;
73         struct brw_page **aa_ppga;
74         struct client_obd        *aa_cli;
75         struct list_head          aa_oaps;
76         struct list_head          aa_exts;
77 };
78
79 #define osc_grant_args osc_brw_async_args
80
81 struct osc_setattr_args {
82         struct obdo             *sa_oa;
83         obd_enqueue_update_f     sa_upcall;
84         void                    *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct osc_object       *fa_obj;
89         struct obdo             *fa_oa;
90         obd_enqueue_update_f    fa_upcall;
91         void                    *fa_cookie;
92 };
93
94 struct osc_ladvise_args {
95         struct obdo             *la_oa;
96         obd_enqueue_update_f     la_upcall;
97         void                    *la_cookie;
98 };
99
100 struct osc_enqueue_args {
101         struct obd_export       *oa_exp;
102         enum ldlm_type          oa_type;
103         enum ldlm_mode          oa_mode;
104         __u64                   *oa_flags;
105         osc_enqueue_upcall_f    oa_upcall;
106         void                    *oa_cookie;
107         struct ost_lvb          *oa_lvb;
108         struct lustre_handle    oa_lockh;
109         unsigned int            oa_agl:1;
110 };
111
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
114                          void *data, int rc);
115
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
117 {
118         struct ost_body *body;
119
120         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
121         LASSERT(body);
122
123         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
124 }
125
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
127                        struct obdo *oa)
128 {
129         struct ptlrpc_request   *req;
130         struct ost_body         *body;
131         int                      rc;
132
133         ENTRY;
134         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
135         if (req == NULL)
136                 RETURN(-ENOMEM);
137
138         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
139         if (rc) {
140                 ptlrpc_request_free(req);
141                 RETURN(rc);
142         }
143
144         osc_pack_req_body(req, oa);
145
146         ptlrpc_request_set_replen(req);
147
148         rc = ptlrpc_queue_wait(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
153         if (body == NULL)
154                 GOTO(out, rc = -EPROTO);
155
156         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
158
159         oa->o_blksize = cli_brw_size(exp->exp_obd);
160         oa->o_valid |= OBD_MD_FLBLKSZ;
161
162         EXIT;
163 out:
164         ptlrpc_req_finished(req);
165
166         return rc;
167 }
168
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
170                        struct obdo *oa)
171 {
172         struct ptlrpc_request   *req;
173         struct ost_body         *body;
174         int                      rc;
175
176         ENTRY;
177         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
178
179         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
180         if (req == NULL)
181                 RETURN(-ENOMEM);
182
183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
184         if (rc) {
185                 ptlrpc_request_free(req);
186                 RETURN(rc);
187         }
188
189         osc_pack_req_body(req, oa);
190
191         ptlrpc_request_set_replen(req);
192
193         rc = ptlrpc_queue_wait(req);
194         if (rc)
195                 GOTO(out, rc);
196
197         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
198         if (body == NULL)
199                 GOTO(out, rc = -EPROTO);
200
201         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
202
203         EXIT;
204 out:
205         ptlrpc_req_finished(req);
206
207         RETURN(rc);
208 }
209
210 static int osc_setattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_setattr_args *sa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 GOTO(out, rc = -EPROTO);
223
224         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
225                              &body->oa);
226 out:
227         rc = sa->sa_upcall(sa->sa_cookie, rc);
228         RETURN(rc);
229 }
230
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232                       obd_enqueue_update_f upcall, void *cookie,
233                       struct ptlrpc_request_set *rqset)
234 {
235         struct ptlrpc_request   *req;
236         struct osc_setattr_args *sa;
237         int                      rc;
238
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oa);
252
253         ptlrpc_request_set_replen(req);
254
255         /* do mds to ost setattr asynchronously */
256         if (!rqset) {
257                 /* Do not wait for response. */
258                 ptlrpcd_add_req(req);
259         } else {
260                 req->rq_interpret_reply =
261                         (ptlrpc_interpterer_t)osc_setattr_interpret;
262
263                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264                 sa = ptlrpc_req_async_args(req);
265                 sa->sa_oa = oa;
266                 sa->sa_upcall = upcall;
267                 sa->sa_cookie = cookie;
268
269                 if (rqset == PTLRPCD_SET)
270                         ptlrpcd_add_req(req);
271                 else
272                         ptlrpc_set_add_req(rqset, req);
273         }
274
275         RETURN(0);
276 }
277
278 static int osc_ladvise_interpret(const struct lu_env *env,
279                                  struct ptlrpc_request *req,
280                                  void *arg, int rc)
281 {
282         struct osc_ladvise_args *la = arg;
283         struct ost_body *body;
284         ENTRY;
285
286         if (rc != 0)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         *la->la_oa = body->oa;
294 out:
295         rc = la->la_upcall(la->la_cookie, rc);
296         RETURN(rc);
297 }
298
299 /**
300  * If rqset is NULL, do not wait for response. Upcall and cookie could also
301  * be NULL in this case
302  */
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304                      struct ladvise_hdr *ladvise_hdr,
305                      obd_enqueue_update_f upcall, void *cookie,
306                      struct ptlrpc_request_set *rqset)
307 {
308         struct ptlrpc_request   *req;
309         struct ost_body         *body;
310         struct osc_ladvise_args *la;
311         int                      rc;
312         struct lu_ladvise       *req_ladvise;
313         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
314         int                      num_advise = ladvise_hdr->lah_count;
315         struct ladvise_hdr      *req_ladvise_hdr;
316         ENTRY;
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323                              num_advise * sizeof(*ladvise));
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
325         if (rc != 0) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329         req->rq_request_portal = OST_IO_PORTAL;
330         ptlrpc_at_set_req_timeout(req);
331
332         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
333         LASSERT(body);
334         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
335                              oa);
336
337         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338                                                  &RMF_OST_LADVISE_HDR);
339         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
340
341         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343         ptlrpc_request_set_replen(req);
344
345         if (rqset == NULL) {
346                 /* Do not wait for response. */
347                 ptlrpcd_add_req(req);
348                 RETURN(0);
349         }
350
351         req->rq_interpret_reply = osc_ladvise_interpret;
352         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353         la = ptlrpc_req_async_args(req);
354         la->la_oa = oa;
355         la->la_upcall = upcall;
356         la->la_cookie = cookie;
357
358         if (rqset == PTLRPCD_SET)
359                 ptlrpcd_add_req(req);
360         else
361                 ptlrpc_set_add_req(rqset, req);
362
363         RETURN(0);
364 }
365
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
367                       struct obdo *oa)
368 {
369         struct ptlrpc_request *req;
370         struct ost_body       *body;
371         int                    rc;
372         ENTRY;
373
374         LASSERT(oa != NULL);
375         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
379         if (req == NULL)
380                 GOTO(out, rc = -ENOMEM);
381
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 GOTO(out, rc);
386         }
387
388         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
389         LASSERT(body);
390
391         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
392
393         ptlrpc_request_set_replen(req);
394
395         rc = ptlrpc_queue_wait(req);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
400         if (body == NULL)
401                 GOTO(out_req, rc = -EPROTO);
402
403         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
405
406         oa->o_blksize = cli_brw_size(exp->exp_obd);
407         oa->o_valid |= OBD_MD_FLBLKSZ;
408
409         CDEBUG(D_HA, "transno: %lld\n",
410                lustre_msg_get_transno(req->rq_repmsg));
411 out_req:
412         ptlrpc_req_finished(req);
413 out:
414         RETURN(rc);
415 }
416
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418                    obd_enqueue_update_f upcall, void *cookie,
419                    struct ptlrpc_request_set *rqset)
420 {
421         struct ptlrpc_request   *req;
422         struct osc_setattr_args *sa;
423         struct ost_body         *body;
424         int                      rc;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
428         if (req == NULL)
429                 RETURN(-ENOMEM);
430
431         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(rc);
435         }
436         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437         ptlrpc_at_set_req_timeout(req);
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
442
443         ptlrpc_request_set_replen(req);
444
445         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447         sa = ptlrpc_req_async_args(req);
448         sa->sa_oa = oa;
449         sa->sa_upcall = upcall;
450         sa->sa_cookie = cookie;
451         if (rqset == PTLRPCD_SET)
452                 ptlrpcd_add_req(req);
453         else
454                 ptlrpc_set_add_req(rqset, req);
455
456         RETURN(0);
457 }
458
459 static int osc_sync_interpret(const struct lu_env *env,
460                               struct ptlrpc_request *req,
461                               void *arg, int rc)
462 {
463         struct osc_fsync_args   *fa = arg;
464         struct ost_body         *body;
465         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
466         unsigned long           valid = 0;
467         struct cl_object        *obj;
468         ENTRY;
469
470         if (rc != 0)
471                 GOTO(out, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL) {
475                 CERROR("can't unpack ost_body\n");
476                 GOTO(out, rc = -EPROTO);
477         }
478
479         *fa->fa_oa = body->oa;
480         obj = osc2cl(fa->fa_obj);
481
482         /* Update osc object's blocks attribute */
483         cl_object_attr_lock(obj);
484         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485                 attr->cat_blocks = body->oa.o_blocks;
486                 valid |= CAT_BLOCKS;
487         }
488
489         if (valid != 0)
490                 cl_object_attr_update(env, obj, attr, valid);
491         cl_object_attr_unlock(obj);
492
493 out:
494         rc = fa->fa_upcall(fa->fa_cookie, rc);
495         RETURN(rc);
496 }
497
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499                   obd_enqueue_update_f upcall, void *cookie,
500                   struct ptlrpc_request_set *rqset)
501 {
502         struct obd_export     *exp = osc_export(obj);
503         struct ptlrpc_request *req;
504         struct ost_body       *body;
505         struct osc_fsync_args *fa;
506         int                    rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
510         if (req == NULL)
511                 RETURN(-ENOMEM);
512
513         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(rc);
517         }
518
519         /* overload the size and blocks fields in the oa with start/end */
520         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
521         LASSERT(body);
522         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
523
524         ptlrpc_request_set_replen(req);
525         req->rq_interpret_reply = osc_sync_interpret;
526
527         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528         fa = ptlrpc_req_async_args(req);
529         fa->fa_obj = obj;
530         fa->fa_oa = oa;
531         fa->fa_upcall = upcall;
532         fa->fa_cookie = cookie;
533
534         if (rqset == PTLRPCD_SET)
535                 ptlrpcd_add_req(req);
536         else
537                 ptlrpc_set_add_req(rqset, req);
538
539         RETURN (0);
540 }
541
542 /* Find and cancel locally locks matched by @mode in the resource found by
543  * @objid. Found locks are added into @cancel list. Returns the amount of
544  * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546                                    struct list_head *cancels,
547                                    enum ldlm_mode mode, __u64 lock_flags)
548 {
549         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550         struct ldlm_res_id res_id;
551         struct ldlm_resource *res;
552         int count;
553         ENTRY;
554
555         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556          * export) but disabled through procfs (flag in NS).
557          *
558          * This distinguishes from a case when ELC is not supported originally,
559          * when we still want to cancel locks in advance and just cancel them
560          * locally, without sending any RPC. */
561         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
562                 RETURN(0);
563
564         ostid_build_res_name(&oa->o_oi, &res_id);
565         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
566         if (IS_ERR(res))
567                 RETURN(0);
568
569         LDLM_RESOURCE_ADDREF(res);
570         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571                                            lock_flags, 0, NULL);
572         LDLM_RESOURCE_DELREF(res);
573         ldlm_resource_putref(res);
574         RETURN(count);
575 }
576
577 static int osc_destroy_interpret(const struct lu_env *env,
578                                  struct ptlrpc_request *req, void *data,
579                                  int rc)
580 {
581         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
582
583         atomic_dec(&cli->cl_destroy_in_flight);
584         wake_up(&cli->cl_destroy_waitq);
585         return 0;
586 }
587
588 static int osc_can_send_destroy(struct client_obd *cli)
589 {
590         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591             cli->cl_max_rpcs_in_flight) {
592                 /* The destroy request can be sent */
593                 return 1;
594         }
595         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596             cli->cl_max_rpcs_in_flight) {
597                 /*
598                  * The counter has been modified between the two atomic
599                  * operations.
600                  */
601                 wake_up(&cli->cl_destroy_waitq);
602         }
603         return 0;
604 }
605
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
607                        struct obdo *oa)
608 {
609         struct client_obd     *cli = &exp->exp_obd->u.cli;
610         struct ptlrpc_request *req;
611         struct ost_body       *body;
612         struct list_head       cancels = LIST_HEAD_INIT(cancels);
613         int rc, count;
614         ENTRY;
615
616         if (!oa) {
617                 CDEBUG(D_INFO, "oa NULL\n");
618                 RETURN(-EINVAL);
619         }
620
621         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622                                         LDLM_FL_DISCARD_DATA);
623
624         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
625         if (req == NULL) {
626                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
627                 RETURN(-ENOMEM);
628         }
629
630         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
631                                0, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(rc);
635         }
636
637         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638         ptlrpc_at_set_req_timeout(req);
639
640         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
641         LASSERT(body);
642         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
643
644         ptlrpc_request_set_replen(req);
645
646         req->rq_interpret_reply = osc_destroy_interpret;
647         if (!osc_can_send_destroy(cli)) {
648                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
649
650                 /*
651                  * Wait until the number of on-going destroy RPCs drops
652                  * under max_rpc_in_flight
653                  */
654                 l_wait_event_exclusive(cli->cl_destroy_waitq,
655                                        osc_can_send_destroy(cli), &lwi);
656         }
657
658         /* Do not wait for response */
659         ptlrpcd_add_req(req);
660         RETURN(0);
661 }
662
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664                                 long writing_bytes)
665 {
666         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
667
668         LASSERT(!(oa->o_valid & bits));
669
670         oa->o_valid |= bits;
671         spin_lock(&cli->cl_loi_list_lock);
672         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673                 oa->o_dirty = cli->cl_dirty_grant;
674         else
675                 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677                      cli->cl_dirty_max_pages)) {
678                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679                        cli->cl_dirty_pages, cli->cl_dirty_transit,
680                        cli->cl_dirty_max_pages);
681                 oa->o_undirty = 0;
682         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683                             atomic_long_read(&obd_dirty_transit_pages) >
684                             (long)(obd_max_dirty_pages + 1))) {
685                 /* The atomic_read() allowing the atomic_inc() are
686                  * not covered by a lock thus they may safely race and trip
687                  * this CERROR() unless we add in a small fudge factor (+1). */
688                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
690                        atomic_long_read(&obd_dirty_transit_pages),
691                        obd_max_dirty_pages);
692                 oa->o_undirty = 0;
693         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
694                             0x7fffffff)) {
695                 CERROR("dirty %lu - dirty_max %lu too big???\n",
696                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697                 oa->o_undirty = 0;
698         } else {
699                 unsigned long nrpages;
700
701                 nrpages = cli->cl_max_pages_per_rpc;
702                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704                 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
706                                  GRANT_PARAM)) {
707                         int nrextents;
708
709                         /* take extent tax into account when asking for more
710                          * grant space */
711                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
712                                      cli->cl_max_extent_pages;
713                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714                 }
715         }
716         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717         oa->o_dropped = cli->cl_lost_grant;
718         cli->cl_lost_grant = 0;
719         spin_unlock(&cli->cl_loi_list_lock);
720         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 }
723
724 void osc_update_next_shrink(struct client_obd *cli)
725 {
726         cli->cl_next_shrink_grant =
727                 cfs_time_shift(cli->cl_grant_shrink_interval);
728         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729                cli->cl_next_shrink_grant);
730 }
731
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 {
734         spin_lock(&cli->cl_loi_list_lock);
735         cli->cl_avail_grant += grant;
736         spin_unlock(&cli->cl_loi_list_lock);
737 }
738
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 {
741         if (body->oa.o_valid & OBD_MD_FLGRANT) {
742                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743                 __osc_update_grant(cli, body->oa.o_grant);
744         }
745 }
746
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748                               u32 keylen, void *key,
749                               u32 vallen, void *val,
750                               struct ptlrpc_request_set *set);
751
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753                                       struct ptlrpc_request *req,
754                                       void *aa, int rc)
755 {
756         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758         struct ost_body *body;
759
760         if (rc != 0) {
761                 __osc_update_grant(cli, oa->o_grant);
762                 GOTO(out, rc);
763         }
764
765         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766         LASSERT(body);
767         osc_update_grant(cli, body);
768 out:
769         OBDO_FREE(oa);
770         return rc;
771 }
772
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 {
775         spin_lock(&cli->cl_loi_list_lock);
776         oa->o_grant = cli->cl_avail_grant / 4;
777         cli->cl_avail_grant -= oa->o_grant;
778         spin_unlock(&cli->cl_loi_list_lock);
779         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780                 oa->o_valid |= OBD_MD_FLFLAGS;
781                 oa->o_flags = 0;
782         }
783         oa->o_flags |= OBD_FL_SHRINK_GRANT;
784         osc_update_next_shrink(cli);
785 }
786
787 /* Shrink the current grant, either from some large amount to enough for a
788  * full set of in-flight RPCs, or if we have already shrunk to that limit
789  * then to enough for a single RPC.  This avoids keeping more grant than
790  * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
792 {
793         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
795
796         spin_lock(&cli->cl_loi_list_lock);
797         if (cli->cl_avail_grant <= target_bytes)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799         spin_unlock(&cli->cl_loi_list_lock);
800
801         return osc_shrink_grant_to_target(cli, target_bytes);
802 }
803
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
805 {
806         int                     rc = 0;
807         struct ost_body        *body;
808         ENTRY;
809
810         spin_lock(&cli->cl_loi_list_lock);
811         /* Don't shrink if we are already above or below the desired limit
812          * We don't want to shrink below a single RPC, as that will negatively
813          * impact block allocation and long-term performance. */
814         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
816
817         if (target_bytes >= cli->cl_avail_grant) {
818                 spin_unlock(&cli->cl_loi_list_lock);
819                 RETURN(0);
820         }
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         OBD_ALLOC_PTR(body);
824         if (!body)
825                 RETURN(-ENOMEM);
826
827         osc_announce_cached(cli, &body->oa, 0);
828
829         spin_lock(&cli->cl_loi_list_lock);
830         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831         cli->cl_avail_grant = target_bytes;
832         spin_unlock(&cli->cl_loi_list_lock);
833         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834                 body->oa.o_valid |= OBD_MD_FLFLAGS;
835                 body->oa.o_flags = 0;
836         }
837         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838         osc_update_next_shrink(cli);
839
840         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842                                 sizeof(*body), body, NULL);
843         if (rc != 0)
844                 __osc_update_grant(cli, body->oa.o_grant);
845         OBD_FREE_PTR(body);
846         RETURN(rc);
847 }
848
849 static int osc_should_shrink_grant(struct client_obd *client)
850 {
851         cfs_time_t time = cfs_time_current();
852         cfs_time_t next_shrink = client->cl_next_shrink_grant;
853
854         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855              OBD_CONNECT_GRANT_SHRINK) == 0)
856                 return 0;
857
858         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859                 /* Get the current RPC size directly, instead of going via:
860                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861                  * Keep comment here so that it can be found by searching. */
862                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
863
864                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865                     client->cl_avail_grant > brw_size)
866                         return 1;
867                 else
868                         osc_update_next_shrink(client);
869         }
870         return 0;
871 }
872
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
874 {
875         struct client_obd *client;
876
877         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878                 if (osc_should_shrink_grant(client))
879                         osc_shrink_grant(client);
880         }
881         return 0;
882 }
883
884 static int osc_add_shrink_grant(struct client_obd *client)
885 {
886         int rc;
887
888         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
889                                        TIMEOUT_GRANT,
890                                        osc_grant_shrink_grant_cb, NULL,
891                                        &client->cl_grant_shrink_list);
892         if (rc) {
893                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894                 return rc;
895         }
896         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897         osc_update_next_shrink(client);
898         return 0;
899 }
900
901 static int osc_del_shrink_grant(struct client_obd *client)
902 {
903         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
904                                          TIMEOUT_GRANT);
905 }
906
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 {
909         /*
910          * ocd_grant is the total grant amount we're expect to hold: if we've
911          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913          * dirty.
914          *
915          * race is tolerable here: if we're evicted, but imp_state already
916          * left EVICTED state, then cl_dirty_pages must be 0 already.
917          */
918         spin_lock(&cli->cl_loi_list_lock);
919         cli->cl_avail_grant = ocd->ocd_grant;
920         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921                 cli->cl_avail_grant -= cli->cl_reserved_grant;
922                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923                         cli->cl_avail_grant -= cli->cl_dirty_grant;
924                 else
925                         cli->cl_avail_grant -=
926                                         cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
927         }
928
929         if (cli->cl_avail_grant < 0) {
930                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931                       cli_name(cli), cli->cl_avail_grant,
932                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933                 /* workaround for servers which do not have the patch from
934                  * LU-2679 */
935                 cli->cl_avail_grant = ocd->ocd_grant;
936         }
937
938         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
939                 u64 size;
940                 int chunk_mask;
941
942                 /* overhead for each extent insertion */
943                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
944                 /* determine the appropriate chunk size used by osc_extent. */
945                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
946                                           ocd->ocd_grant_blkbits);
947                 /* max_pages_per_rpc must be chunk aligned */
948                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
949                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
950                                              ~chunk_mask) & chunk_mask;
951                 /* determine maximum extent size, in #pages */
952                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
953                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
954                 if (cli->cl_max_extent_pages == 0)
955                         cli->cl_max_extent_pages = 1;
956         } else {
957                 cli->cl_grant_extent_tax = 0;
958                 cli->cl_chunkbits = PAGE_SHIFT;
959                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
960         }
961         spin_unlock(&cli->cl_loi_list_lock);
962
963         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
964                 "chunk bits: %d cl_max_extent_pages: %d\n",
965                 cli_name(cli),
966                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
967                 cli->cl_max_extent_pages);
968
969         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
970             list_empty(&cli->cl_grant_shrink_list))
971                 osc_add_shrink_grant(cli);
972 }
973
974 /* We assume that the reason this OSC got a short read is because it read
975  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
976  * via the LOV, and it _knows_ it's reading inside the file, it's just that
977  * this stripe never got written at or beyond this stripe offset yet. */
978 static void handle_short_read(int nob_read, size_t page_count,
979                               struct brw_page **pga)
980 {
981         char *ptr;
982         int i = 0;
983
984         /* skip bytes read OK */
985         while (nob_read > 0) {
986                 LASSERT (page_count > 0);
987
988                 if (pga[i]->count > nob_read) {
989                         /* EOF inside this page */
990                         ptr = kmap(pga[i]->pg) +
991                                 (pga[i]->off & ~PAGE_MASK);
992                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
993                         kunmap(pga[i]->pg);
994                         page_count--;
995                         i++;
996                         break;
997                 }
998
999                 nob_read -= pga[i]->count;
1000                 page_count--;
1001                 i++;
1002         }
1003
1004         /* zero remaining pages */
1005         while (page_count-- > 0) {
1006                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1007                 memset(ptr, 0, pga[i]->count);
1008                 kunmap(pga[i]->pg);
1009                 i++;
1010         }
1011 }
1012
1013 static int check_write_rcs(struct ptlrpc_request *req,
1014                            int requested_nob, int niocount,
1015                            size_t page_count, struct brw_page **pga)
1016 {
1017         int     i;
1018         __u32   *remote_rcs;
1019
1020         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1021                                                   sizeof(*remote_rcs) *
1022                                                   niocount);
1023         if (remote_rcs == NULL) {
1024                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1025                 return(-EPROTO);
1026         }
1027
1028         /* return error if any niobuf was in error */
1029         for (i = 0; i < niocount; i++) {
1030                 if ((int)remote_rcs[i] < 0)
1031                         return(remote_rcs[i]);
1032
1033                 if (remote_rcs[i] != 0) {
1034                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1035                                 i, remote_rcs[i], req);
1036                         return(-EPROTO);
1037                 }
1038         }
1039
1040         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1041                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1042                        req->rq_bulk->bd_nob_transferred, requested_nob);
1043                 return(-EPROTO);
1044         }
1045
1046         return (0);
1047 }
1048
1049 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1050 {
1051         if (p1->flag != p2->flag) {
1052                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1053                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1054                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1055
1056                 /* warn if we try to combine flags that we don't know to be
1057                  * safe to combine */
1058                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1059                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1060                               "report this at https://jira.hpdd.intel.com/\n",
1061                               p1->flag, p2->flag);
1062                 }
1063                 return 0;
1064         }
1065
1066         return (p1->off + p1->count == p2->off);
1067 }
1068
1069 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1070                              struct brw_page **pga, int opc,
1071                              cksum_type_t cksum_type)
1072 {
1073         u32                             cksum;
1074         int                             i = 0;
1075         struct cfs_crypto_hash_desc     *hdesc;
1076         unsigned int                    bufsize;
1077         int                             err;
1078         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1079
1080         LASSERT(pg_count > 0);
1081
1082         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1083         if (IS_ERR(hdesc)) {
1084                 CERROR("Unable to initialize checksum hash %s\n",
1085                        cfs_crypto_hash_name(cfs_alg));
1086                 return PTR_ERR(hdesc);
1087         }
1088
1089         while (nob > 0 && pg_count > 0) {
1090                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1091
1092                 /* corrupt the data before we compute the checksum, to
1093                  * simulate an OST->client data error */
1094                 if (i == 0 && opc == OST_READ &&
1095                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1096                         unsigned char *ptr = kmap(pga[i]->pg);
1097                         int off = pga[i]->off & ~PAGE_MASK;
1098
1099                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1100                         kunmap(pga[i]->pg);
1101                 }
1102                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1103                                             pga[i]->off & ~PAGE_MASK,
1104                                             count);
1105                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1106                                (int)(pga[i]->off & ~PAGE_MASK));
1107
1108                 nob -= pga[i]->count;
1109                 pg_count--;
1110                 i++;
1111         }
1112
1113         bufsize = sizeof(cksum);
1114         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1115
1116         /* For sending we only compute the wrong checksum instead
1117          * of corrupting the data so it is still correct on a redo */
1118         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1119                 cksum++;
1120
1121         return cksum;
1122 }
1123
1124 static int
1125 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1126                      u32 page_count, struct brw_page **pga,
1127                      struct ptlrpc_request **reqp, int resend)
1128 {
1129         struct ptlrpc_request   *req;
1130         struct ptlrpc_bulk_desc *desc;
1131         struct ost_body         *body;
1132         struct obd_ioobj        *ioobj;
1133         struct niobuf_remote    *niobuf;
1134         int niocount, i, requested_nob, opc, rc;
1135         struct osc_brw_async_args *aa;
1136         struct req_capsule      *pill;
1137         struct brw_page *pg_prev;
1138
1139         ENTRY;
1140         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1141                 RETURN(-ENOMEM); /* Recoverable */
1142         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1143                 RETURN(-EINVAL); /* Fatal */
1144
1145         if ((cmd & OBD_BRW_WRITE) != 0) {
1146                 opc = OST_WRITE;
1147                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1148                                                 osc_rq_pool,
1149                                                 &RQF_OST_BRW_WRITE);
1150         } else {
1151                 opc = OST_READ;
1152                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1153         }
1154         if (req == NULL)
1155                 RETURN(-ENOMEM);
1156
1157         for (niocount = i = 1; i < page_count; i++) {
1158                 if (!can_merge_pages(pga[i - 1], pga[i]))
1159                         niocount++;
1160         }
1161
1162         pill = &req->rq_pill;
1163         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1164                              sizeof(*ioobj));
1165         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1166                              niocount * sizeof(*niobuf));
1167
1168         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1169         if (rc) {
1170                 ptlrpc_request_free(req);
1171                 RETURN(rc);
1172         }
1173         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1174         ptlrpc_at_set_req_timeout(req);
1175         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1176          * retry logic */
1177         req->rq_no_retry_einprogress = 1;
1178
1179         desc = ptlrpc_prep_bulk_imp(req, page_count,
1180                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1181                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1182                         PTLRPC_BULK_PUT_SINK) |
1183                         PTLRPC_BULK_BUF_KIOV,
1184                 OST_BULK_PORTAL,
1185                 &ptlrpc_bulk_kiov_pin_ops);
1186
1187         if (desc == NULL)
1188                 GOTO(out, rc = -ENOMEM);
1189         /* NB request now owns desc and will free it when it gets freed */
1190
1191         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1192         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1193         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1194         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1195
1196         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1197
1198         obdo_to_ioobj(oa, ioobj);
1199         ioobj->ioo_bufcnt = niocount;
1200         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1201          * that might be send for this request.  The actual number is decided
1202          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1203          * "max - 1" for old client compatibility sending "0", and also so the
1204          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1205         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1206         LASSERT(page_count > 0);
1207         pg_prev = pga[0];
1208         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1209                 struct brw_page *pg = pga[i];
1210                 int poff = pg->off & ~PAGE_MASK;
1211
1212                 LASSERT(pg->count > 0);
1213                 /* make sure there is no gap in the middle of page array */
1214                 LASSERTF(page_count == 1 ||
1215                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1216                           ergo(i > 0 && i < page_count - 1,
1217                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1218                           ergo(i == page_count - 1, poff == 0)),
1219                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1220                          i, page_count, pg, pg->off, pg->count);
1221                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1222                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1223                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1224                          i, page_count,
1225                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1226                          pg_prev->pg, page_private(pg_prev->pg),
1227                          pg_prev->pg->index, pg_prev->off);
1228                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1229                         (pg->flag & OBD_BRW_SRVLOCK));
1230
1231                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1232                 requested_nob += pg->count;
1233
1234                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1235                         niobuf--;
1236                         niobuf->rnb_len += pg->count;
1237                 } else {
1238                         niobuf->rnb_offset = pg->off;
1239                         niobuf->rnb_len    = pg->count;
1240                         niobuf->rnb_flags  = pg->flag;
1241                 }
1242                 pg_prev = pg;
1243         }
1244
1245         LASSERTF((void *)(niobuf - niocount) ==
1246                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1247                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1248                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1249
1250         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1251         if (resend) {
1252                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1253                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1254                         body->oa.o_flags = 0;
1255                 }
1256                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1257         }
1258
1259         if (osc_should_shrink_grant(cli))
1260                 osc_shrink_grant_local(cli, &body->oa);
1261
1262         /* size[REQ_REC_OFF] still sizeof (*body) */
1263         if (opc == OST_WRITE) {
1264                 if (cli->cl_checksum &&
1265                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1266                         /* store cl_cksum_type in a local variable since
1267                          * it can be changed via lprocfs */
1268                         cksum_type_t cksum_type = cli->cl_cksum_type;
1269
1270                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272                                 body->oa.o_flags = 0;
1273                         }
1274                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1275                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1277                                                              page_count, pga,
1278                                                              OST_WRITE,
1279                                                              cksum_type);
1280                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1281                                body->oa.o_cksum);
1282                         /* save this in 'oa', too, for later checking */
1283                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284                         oa->o_flags |= cksum_type_pack(cksum_type);
1285                 } else {
1286                         /* clear out the checksum flag, in case this is a
1287                          * resend but cl_checksum is no longer set. b=11238 */
1288                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1289                 }
1290                 oa->o_cksum = body->oa.o_cksum;
1291                 /* 1 RC per niobuf */
1292                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1293                                      sizeof(__u32) * niocount);
1294         } else {
1295                 if (cli->cl_checksum &&
1296                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1297                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1298                                 body->oa.o_flags = 0;
1299                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1300                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1301                 }
1302         }
1303         ptlrpc_request_set_replen(req);
1304
1305         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306         aa = ptlrpc_req_async_args(req);
1307         aa->aa_oa = oa;
1308         aa->aa_requested_nob = requested_nob;
1309         aa->aa_nio_count = niocount;
1310         aa->aa_page_count = page_count;
1311         aa->aa_resends = 0;
1312         aa->aa_ppga = pga;
1313         aa->aa_cli = cli;
1314         INIT_LIST_HEAD(&aa->aa_oaps);
1315
1316         *reqp = req;
1317         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1318         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1319                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1320                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1321         RETURN(0);
1322
1323  out:
1324         ptlrpc_req_finished(req);
1325         RETURN(rc);
1326 }
1327
1328 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1329                                 __u32 client_cksum, __u32 server_cksum, int nob,
1330                                 size_t page_count, struct brw_page **pga,
1331                                 cksum_type_t client_cksum_type)
1332 {
1333         __u32 new_cksum;
1334         char *msg;
1335         cksum_type_t cksum_type;
1336
1337         if (server_cksum == client_cksum) {
1338                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1339                 return 0;
1340         }
1341
1342         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1343                                        oa->o_flags : 0);
1344         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1345                                       cksum_type);
1346
1347         if (cksum_type != client_cksum_type)
1348                 msg = "the server did not use the checksum type specified in "
1349                       "the original request - likely a protocol problem";
1350         else if (new_cksum == server_cksum)
1351                 msg = "changed on the client after we checksummed it - "
1352                       "likely false positive due to mmap IO (bug 11742)";
1353         else if (new_cksum == client_cksum)
1354                 msg = "changed in transit before arrival at OST";
1355         else
1356                 msg = "changed in transit AND doesn't match the original - "
1357                       "likely false positive due to mmap IO (bug 11742)";
1358
1359         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1360                            " object "DOSTID" extent [%llu-%llu]\n",
1361                            msg, libcfs_nid2str(peer->nid),
1362                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1363                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1364                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1365                            POSTID(&oa->o_oi), pga[0]->off,
1366                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1367         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1368                "client csum now %x\n", client_cksum, client_cksum_type,
1369                server_cksum, cksum_type, new_cksum);
1370         return 1;
1371 }
1372
1373 /* Note rc enters this function as number of bytes transferred */
1374 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1375 {
1376         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1377         const lnet_process_id_t *peer =
1378                         &req->rq_import->imp_connection->c_peer;
1379         struct client_obd *cli = aa->aa_cli;
1380         struct ost_body *body;
1381         u32 client_cksum = 0;
1382         ENTRY;
1383
1384         if (rc < 0 && rc != -EDQUOT) {
1385                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1386                 RETURN(rc);
1387         }
1388
1389         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1390         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1391         if (body == NULL) {
1392                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1393                 RETURN(-EPROTO);
1394         }
1395
1396         /* set/clear over quota flag for a uid/gid */
1397         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1398             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1399                 unsigned int qid[LL_MAXQUOTAS] =
1400                                         {body->oa.o_uid, body->oa.o_gid};
1401
1402                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1403                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1404                        body->oa.o_flags);
1405                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1406         }
1407
1408         osc_update_grant(cli, body);
1409
1410         if (rc < 0)
1411                 RETURN(rc);
1412
1413         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1414                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1415
1416         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1417                 if (rc > 0) {
1418                         CERROR("Unexpected +ve rc %d\n", rc);
1419                         RETURN(-EPROTO);
1420                 }
1421                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1422
1423                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1424                         RETURN(-EAGAIN);
1425
1426                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1427                     check_write_checksum(&body->oa, peer, client_cksum,
1428                                          body->oa.o_cksum, aa->aa_requested_nob,
1429                                          aa->aa_page_count, aa->aa_ppga,
1430                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1431                         RETURN(-EAGAIN);
1432
1433                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1434                                      aa->aa_page_count, aa->aa_ppga);
1435                 GOTO(out, rc);
1436         }
1437
1438         /* The rest of this function executes only for OST_READs */
1439
1440         /* if unwrap_bulk failed, return -EAGAIN to retry */
1441         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1442         if (rc < 0)
1443                 GOTO(out, rc = -EAGAIN);
1444
1445         if (rc > aa->aa_requested_nob) {
1446                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1447                        aa->aa_requested_nob);
1448                 RETURN(-EPROTO);
1449         }
1450
1451         if (rc != req->rq_bulk->bd_nob_transferred) {
1452                 CERROR ("Unexpected rc %d (%d transferred)\n",
1453                         rc, req->rq_bulk->bd_nob_transferred);
1454                 return (-EPROTO);
1455         }
1456
1457         if (rc < aa->aa_requested_nob)
1458                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1459
1460         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1461                 static int cksum_counter;
1462                 u32        server_cksum = body->oa.o_cksum;
1463                 char      *via = "";
1464                 char      *router = "";
1465                 cksum_type_t cksum_type;
1466
1467                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1468                                                body->oa.o_flags : 0);
1469                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1470                                                  aa->aa_ppga, OST_READ,
1471                                                  cksum_type);
1472
1473                 if (peer->nid != req->rq_bulk->bd_sender) {
1474                         via = " via ";
1475                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1476                 }
1477
1478                 if (server_cksum != client_cksum) {
1479                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1480                                            "%s%s%s inode "DFID" object "DOSTID
1481                                            " extent [%llu-%llu]\n",
1482                                            req->rq_import->imp_obd->obd_name,
1483                                            libcfs_nid2str(peer->nid),
1484                                            via, router,
1485                                            body->oa.o_valid & OBD_MD_FLFID ?
1486                                                 body->oa.o_parent_seq : (__u64)0,
1487                                            body->oa.o_valid & OBD_MD_FLFID ?
1488                                                 body->oa.o_parent_oid : 0,
1489                                            body->oa.o_valid & OBD_MD_FLFID ?
1490                                                 body->oa.o_parent_ver : 0,
1491                                            POSTID(&body->oa.o_oi),
1492                                            aa->aa_ppga[0]->off,
1493                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1494                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1495                                                                         1);
1496                         CERROR("client %x, server %x, cksum_type %x\n",
1497                                client_cksum, server_cksum, cksum_type);
1498                         cksum_counter = 0;
1499                         aa->aa_oa->o_cksum = client_cksum;
1500                         rc = -EAGAIN;
1501                 } else {
1502                         cksum_counter++;
1503                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1504                         rc = 0;
1505                 }
1506         } else if (unlikely(client_cksum)) {
1507                 static int cksum_missed;
1508
1509                 cksum_missed++;
1510                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511                         CERROR("Checksum %u requested from %s but not sent\n",
1512                                cksum_missed, libcfs_nid2str(peer->nid));
1513         } else {
1514                 rc = 0;
1515         }
1516 out:
1517         if (rc >= 0)
1518                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1519                                      aa->aa_oa, &body->oa);
1520
1521         RETURN(rc);
1522 }
1523
1524 static int osc_brw_redo_request(struct ptlrpc_request *request,
1525                                 struct osc_brw_async_args *aa, int rc)
1526 {
1527         struct ptlrpc_request *new_req;
1528         struct osc_brw_async_args *new_aa;
1529         struct osc_async_page *oap;
1530         ENTRY;
1531
1532         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1533                   "redo for recoverable error %d", rc);
1534
1535         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1536                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1537                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1538                                   aa->aa_ppga, &new_req, 1);
1539         if (rc)
1540                 RETURN(rc);
1541
1542         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1543                 if (oap->oap_request != NULL) {
1544                         LASSERTF(request == oap->oap_request,
1545                                  "request %p != oap_request %p\n",
1546                                  request, oap->oap_request);
1547                         if (oap->oap_interrupted) {
1548                                 ptlrpc_req_finished(new_req);
1549                                 RETURN(-EINTR);
1550                         }
1551                 }
1552         }
1553         /* New request takes over pga and oaps from old request.
1554          * Note that copying a list_head doesn't work, need to move it... */
1555         aa->aa_resends++;
1556         new_req->rq_interpret_reply = request->rq_interpret_reply;
1557         new_req->rq_async_args = request->rq_async_args;
1558         new_req->rq_commit_cb = request->rq_commit_cb;
1559         /* cap resend delay to the current request timeout, this is similar to
1560          * what ptlrpc does (see after_reply()) */
1561         if (aa->aa_resends > new_req->rq_timeout)
1562                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1563         else
1564                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1565         new_req->rq_generation_set = 1;
1566         new_req->rq_import_generation = request->rq_import_generation;
1567
1568         new_aa = ptlrpc_req_async_args(new_req);
1569
1570         INIT_LIST_HEAD(&new_aa->aa_oaps);
1571         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1572         INIT_LIST_HEAD(&new_aa->aa_exts);
1573         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1574         new_aa->aa_resends = aa->aa_resends;
1575
1576         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1577                 if (oap->oap_request) {
1578                         ptlrpc_req_finished(oap->oap_request);
1579                         oap->oap_request = ptlrpc_request_addref(new_req);
1580                 }
1581         }
1582
1583         /* XXX: This code will run into problem if we're going to support
1584          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1585          * and wait for all of them to be finished. We should inherit request
1586          * set from old request. */
1587         ptlrpcd_add_req(new_req);
1588
1589         DEBUG_REQ(D_INFO, new_req, "new request");
1590         RETURN(0);
1591 }
1592
1593 /*
1594  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1595  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1596  * fine for our small page arrays and doesn't require allocation.  its an
1597  * insertion sort that swaps elements that are strides apart, shrinking the
1598  * stride down until its '1' and the array is sorted.
1599  */
1600 static void sort_brw_pages(struct brw_page **array, int num)
1601 {
1602         int stride, i, j;
1603         struct brw_page *tmp;
1604
1605         if (num == 1)
1606                 return;
1607         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608                 ;
1609
1610         do {
1611                 stride /= 3;
1612                 for (i = stride ; i < num ; i++) {
1613                         tmp = array[i];
1614                         j = i;
1615                         while (j >= stride && array[j - stride]->off > tmp->off) {
1616                                 array[j] = array[j - stride];
1617                                 j -= stride;
1618                         }
1619                         array[j] = tmp;
1620                 }
1621         } while (stride > 1);
1622 }
1623
1624 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1625 {
1626         LASSERT(ppga != NULL);
1627         OBD_FREE(ppga, sizeof(*ppga) * count);
1628 }
1629
1630 static int brw_interpret(const struct lu_env *env,
1631                          struct ptlrpc_request *req, void *data, int rc)
1632 {
1633         struct osc_brw_async_args *aa = data;
1634         struct osc_extent *ext;
1635         struct osc_extent *tmp;
1636         struct client_obd *cli = aa->aa_cli;
1637         ENTRY;
1638
1639         rc = osc_brw_fini_request(req, rc);
1640         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1641         /* When server return -EINPROGRESS, client should always retry
1642          * regardless of the number of times the bulk was resent already. */
1643         if (osc_recoverable_error(rc)) {
1644                 if (req->rq_import_generation !=
1645                     req->rq_import->imp_generation) {
1646                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1647                                ""DOSTID", rc = %d.\n",
1648                                req->rq_import->imp_obd->obd_name,
1649                                POSTID(&aa->aa_oa->o_oi), rc);
1650                 } else if (rc == -EINPROGRESS ||
1651                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1652                         rc = osc_brw_redo_request(req, aa, rc);
1653                 } else {
1654                         CERROR("%s: too many resent retries for object: "
1655                                "%llu:%llu, rc = %d.\n",
1656                                req->rq_import->imp_obd->obd_name,
1657                                POSTID(&aa->aa_oa->o_oi), rc);
1658                 }
1659
1660                 if (rc == 0)
1661                         RETURN(0);
1662                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1663                         rc = -EIO;
1664         }
1665
1666         if (rc == 0) {
1667                 struct obdo *oa = aa->aa_oa;
1668                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1669                 unsigned long valid = 0;
1670                 struct cl_object *obj;
1671                 struct osc_async_page *last;
1672
1673                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1674                 obj = osc2cl(last->oap_obj);
1675
1676                 cl_object_attr_lock(obj);
1677                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1678                         attr->cat_blocks = oa->o_blocks;
1679                         valid |= CAT_BLOCKS;
1680                 }
1681                 if (oa->o_valid & OBD_MD_FLMTIME) {
1682                         attr->cat_mtime = oa->o_mtime;
1683                         valid |= CAT_MTIME;
1684                 }
1685                 if (oa->o_valid & OBD_MD_FLATIME) {
1686                         attr->cat_atime = oa->o_atime;
1687                         valid |= CAT_ATIME;
1688                 }
1689                 if (oa->o_valid & OBD_MD_FLCTIME) {
1690                         attr->cat_ctime = oa->o_ctime;
1691                         valid |= CAT_CTIME;
1692                 }
1693
1694                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1695                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1696                         loff_t last_off = last->oap_count + last->oap_obj_off +
1697                                 last->oap_page_off;
1698
1699                         /* Change file size if this is an out of quota or
1700                          * direct IO write and it extends the file size */
1701                         if (loi->loi_lvb.lvb_size < last_off) {
1702                                 attr->cat_size = last_off;
1703                                 valid |= CAT_SIZE;
1704                         }
1705                         /* Extend KMS if it's not a lockless write */
1706                         if (loi->loi_kms < last_off &&
1707                             oap2osc_page(last)->ops_srvlock == 0) {
1708                                 attr->cat_kms = last_off;
1709                                 valid |= CAT_KMS;
1710                         }
1711                 }
1712
1713                 if (valid != 0)
1714                         cl_object_attr_update(env, obj, attr, valid);
1715                 cl_object_attr_unlock(obj);
1716         }
1717         OBDO_FREE(aa->aa_oa);
1718
1719         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1720                 osc_inc_unstable_pages(req);
1721
1722         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1723                 list_del_init(&ext->oe_link);
1724                 osc_extent_finish(env, ext, 1, rc);
1725         }
1726         LASSERT(list_empty(&aa->aa_exts));
1727         LASSERT(list_empty(&aa->aa_oaps));
1728
1729         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1730         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1731
1732         spin_lock(&cli->cl_loi_list_lock);
1733         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1734          * is called so we know whether to go to sync BRWs or wait for more
1735          * RPCs to complete */
1736         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1737                 cli->cl_w_in_flight--;
1738         else
1739                 cli->cl_r_in_flight--;
1740         osc_wake_cache_waiters(cli);
1741         spin_unlock(&cli->cl_loi_list_lock);
1742
1743         osc_io_unplug(env, cli, NULL);
1744         RETURN(rc);
1745 }
1746
1747 static void brw_commit(struct ptlrpc_request *req)
1748 {
1749         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1750          * this called via the rq_commit_cb, I need to ensure
1751          * osc_dec_unstable_pages is still called. Otherwise unstable
1752          * pages may be leaked. */
1753         spin_lock(&req->rq_lock);
1754         if (likely(req->rq_unstable)) {
1755                 req->rq_unstable = 0;
1756                 spin_unlock(&req->rq_lock);
1757
1758                 osc_dec_unstable_pages(req);
1759         } else {
1760                 req->rq_committed = 1;
1761                 spin_unlock(&req->rq_lock);
1762         }
1763 }
1764
1765 /**
1766  * Build an RPC by the list of extent @ext_list. The caller must ensure
1767  * that the total pages in this list are NOT over max pages per RPC.
1768  * Extents in the list must be in OES_RPC state.
1769  */
1770 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1771                   struct list_head *ext_list, int cmd)
1772 {
1773         struct ptlrpc_request           *req = NULL;
1774         struct osc_extent               *ext;
1775         struct brw_page                 **pga = NULL;
1776         struct osc_brw_async_args       *aa = NULL;
1777         struct obdo                     *oa = NULL;
1778         struct osc_async_page           *oap;
1779         struct osc_object               *obj = NULL;
1780         struct cl_req_attr              *crattr = NULL;
1781         loff_t                          starting_offset = OBD_OBJECT_EOF;
1782         loff_t                          ending_offset = 0;
1783         int                             mpflag = 0;
1784         int                             mem_tight = 0;
1785         int                             page_count = 0;
1786         bool                            soft_sync = false;
1787         bool                            interrupted = false;
1788         int                             i;
1789         int                             grant = 0;
1790         int                             rc;
1791         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1792         struct ost_body                 *body;
1793         ENTRY;
1794         LASSERT(!list_empty(ext_list));
1795
1796         /* add pages into rpc_list to build BRW rpc */
1797         list_for_each_entry(ext, ext_list, oe_link) {
1798                 LASSERT(ext->oe_state == OES_RPC);
1799                 mem_tight |= ext->oe_memalloc;
1800                 grant += ext->oe_grants;
1801                 page_count += ext->oe_nr_pages;
1802                 if (obj == NULL)
1803                         obj = ext->oe_obj;
1804         }
1805
1806         soft_sync = osc_over_unstable_soft_limit(cli);
1807         if (mem_tight)
1808                 mpflag = cfs_memory_pressure_get_and_set();
1809
1810         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1811         if (pga == NULL)
1812                 GOTO(out, rc = -ENOMEM);
1813
1814         OBDO_ALLOC(oa);
1815         if (oa == NULL)
1816                 GOTO(out, rc = -ENOMEM);
1817
1818         i = 0;
1819         list_for_each_entry(ext, ext_list, oe_link) {
1820                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1821                         if (mem_tight)
1822                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1823                         if (soft_sync)
1824                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1825                         pga[i] = &oap->oap_brw_page;
1826                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1827                         i++;
1828
1829                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1830                         if (starting_offset == OBD_OBJECT_EOF ||
1831                             starting_offset > oap->oap_obj_off)
1832                                 starting_offset = oap->oap_obj_off;
1833                         else
1834                                 LASSERT(oap->oap_page_off == 0);
1835                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1836                                 ending_offset = oap->oap_obj_off +
1837                                                 oap->oap_count;
1838                         else
1839                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1840                                         PAGE_CACHE_SIZE);
1841                         if (oap->oap_interrupted)
1842                                 interrupted = true;
1843                 }
1844         }
1845
1846         /* first page in the list */
1847         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1848
1849         crattr = &osc_env_info(env)->oti_req_attr;
1850         memset(crattr, 0, sizeof(*crattr));
1851         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1852         crattr->cra_flags = ~0ULL;
1853         crattr->cra_page = oap2cl_page(oap);
1854         crattr->cra_oa = oa;
1855         cl_req_attr_set(env, osc2cl(obj), crattr);
1856
1857         if (cmd == OBD_BRW_WRITE)
1858                 oa->o_grant_used = grant;
1859
1860         sort_brw_pages(pga, page_count);
1861         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1862         if (rc != 0) {
1863                 CERROR("prep_req failed: %d\n", rc);
1864                 GOTO(out, rc);
1865         }
1866
1867         req->rq_commit_cb = brw_commit;
1868         req->rq_interpret_reply = brw_interpret;
1869         req->rq_memalloc = mem_tight != 0;
1870         oap->oap_request = ptlrpc_request_addref(req);
1871         if (interrupted && !req->rq_intr)
1872                 ptlrpc_mark_interrupted(req);
1873
1874         /* Need to update the timestamps after the request is built in case
1875          * we race with setattr (locally or in queue at OST).  If OST gets
1876          * later setattr before earlier BRW (as determined by the request xid),
1877          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1878          * way to do this in a single call.  bug 10150 */
1879         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1880         crattr->cra_oa = &body->oa;
1881         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1882         cl_req_attr_set(env, osc2cl(obj), crattr);
1883         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1884
1885         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1886         aa = ptlrpc_req_async_args(req);
1887         INIT_LIST_HEAD(&aa->aa_oaps);
1888         list_splice_init(&rpc_list, &aa->aa_oaps);
1889         INIT_LIST_HEAD(&aa->aa_exts);
1890         list_splice_init(ext_list, &aa->aa_exts);
1891
1892         spin_lock(&cli->cl_loi_list_lock);
1893         starting_offset >>= PAGE_CACHE_SHIFT;
1894         if (cmd == OBD_BRW_READ) {
1895                 cli->cl_r_in_flight++;
1896                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1897                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1898                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1899                                       starting_offset + 1);
1900         } else {
1901                 cli->cl_w_in_flight++;
1902                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1903                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1904                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1905                                       starting_offset + 1);
1906         }
1907         spin_unlock(&cli->cl_loi_list_lock);
1908
1909         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1910                   page_count, aa, cli->cl_r_in_flight,
1911                   cli->cl_w_in_flight);
1912         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1913
1914         ptlrpcd_add_req(req);
1915         rc = 0;
1916         EXIT;
1917
1918 out:
1919         if (mem_tight != 0)
1920                 cfs_memory_pressure_restore(mpflag);
1921
1922         if (rc != 0) {
1923                 LASSERT(req == NULL);
1924
1925                 if (oa)
1926                         OBDO_FREE(oa);
1927                 if (pga)
1928                         OBD_FREE(pga, sizeof(*pga) * page_count);
1929                 /* this should happen rarely and is pretty bad, it makes the
1930                  * pending list not follow the dirty order */
1931                 while (!list_empty(ext_list)) {
1932                         ext = list_entry(ext_list->next, struct osc_extent,
1933                                          oe_link);
1934                         list_del_init(&ext->oe_link);
1935                         osc_extent_finish(env, ext, 0, rc);
1936                 }
1937         }
1938         RETURN(rc);
1939 }
1940
1941 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1942 {
1943         int set = 0;
1944
1945         LASSERT(lock != NULL);
1946
1947         lock_res_and_lock(lock);
1948
1949         if (lock->l_ast_data == NULL)
1950                 lock->l_ast_data = data;
1951         if (lock->l_ast_data == data)
1952                 set = 1;
1953
1954         unlock_res_and_lock(lock);
1955
1956         return set;
1957 }
1958
1959 static int osc_enqueue_fini(struct ptlrpc_request *req,
1960                             osc_enqueue_upcall_f upcall, void *cookie,
1961                             struct lustre_handle *lockh, enum ldlm_mode mode,
1962                             __u64 *flags, int agl, int errcode)
1963 {
1964         bool intent = *flags & LDLM_FL_HAS_INTENT;
1965         int rc;
1966         ENTRY;
1967
1968         /* The request was created before ldlm_cli_enqueue call. */
1969         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1970                 struct ldlm_reply *rep;
1971
1972                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1973                 LASSERT(rep != NULL);
1974
1975                 rep->lock_policy_res1 =
1976                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1977                 if (rep->lock_policy_res1)
1978                         errcode = rep->lock_policy_res1;
1979                 if (!agl)
1980                         *flags |= LDLM_FL_LVB_READY;
1981         } else if (errcode == ELDLM_OK) {
1982                 *flags |= LDLM_FL_LVB_READY;
1983         }
1984
1985         /* Call the update callback. */
1986         rc = (*upcall)(cookie, lockh, errcode);
1987
1988         /* release the reference taken in ldlm_cli_enqueue() */
1989         if (errcode == ELDLM_LOCK_MATCHED)
1990                 errcode = ELDLM_OK;
1991         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1992                 ldlm_lock_decref(lockh, mode);
1993
1994         RETURN(rc);
1995 }
1996
1997 static int osc_enqueue_interpret(const struct lu_env *env,
1998                                  struct ptlrpc_request *req,
1999                                  struct osc_enqueue_args *aa, int rc)
2000 {
2001         struct ldlm_lock *lock;
2002         struct lustre_handle *lockh = &aa->oa_lockh;
2003         enum ldlm_mode mode = aa->oa_mode;
2004         struct ost_lvb *lvb = aa->oa_lvb;
2005         __u32 lvb_len = sizeof(*lvb);
2006         __u64 flags = 0;
2007
2008         ENTRY;
2009
2010         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2011          * be valid. */
2012         lock = ldlm_handle2lock(lockh);
2013         LASSERTF(lock != NULL,
2014                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2015                  lockh->cookie, req, aa);
2016
2017         /* Take an additional reference so that a blocking AST that
2018          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2019          * to arrive after an upcall has been executed by
2020          * osc_enqueue_fini(). */
2021         ldlm_lock_addref(lockh, mode);
2022
2023         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2024         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2025
2026         /* Let CP AST to grant the lock first. */
2027         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2028
2029         if (aa->oa_agl) {
2030                 LASSERT(aa->oa_lvb == NULL);
2031                 LASSERT(aa->oa_flags == NULL);
2032                 aa->oa_flags = &flags;
2033         }
2034
2035         /* Complete obtaining the lock procedure. */
2036         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2037                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2038                                    lockh, rc);
2039         /* Complete osc stuff. */
2040         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2041                               aa->oa_flags, aa->oa_agl, rc);
2042
2043         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2044
2045         ldlm_lock_decref(lockh, mode);
2046         LDLM_LOCK_PUT(lock);
2047         RETURN(rc);
2048 }
2049
2050 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2051
2052 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2053  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2054  * other synchronous requests, however keeping some locks and trying to obtain
2055  * others may take a considerable amount of time in a case of ost failure; and
2056  * when other sync requests do not get released lock from a client, the client
2057  * is evicted from the cluster -- such scenarious make the life difficult, so
2058  * release locks just after they are obtained. */
2059 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2060                      __u64 *flags, union ldlm_policy_data *policy,
2061                      struct ost_lvb *lvb, int kms_valid,
2062                      osc_enqueue_upcall_f upcall, void *cookie,
2063                      struct ldlm_enqueue_info *einfo,
2064                      struct ptlrpc_request_set *rqset, int async, int agl)
2065 {
2066         struct obd_device *obd = exp->exp_obd;
2067         struct lustre_handle lockh = { 0 };
2068         struct ptlrpc_request *req = NULL;
2069         int intent = *flags & LDLM_FL_HAS_INTENT;
2070         __u64 match_flags = *flags;
2071         enum ldlm_mode mode;
2072         int rc;
2073         ENTRY;
2074
2075         /* Filesystem lock extents are extended to page boundaries so that
2076          * dealing with the page cache is a little smoother.  */
2077         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2078         policy->l_extent.end |= ~PAGE_MASK;
2079
2080         /*
2081          * kms is not valid when either object is completely fresh (so that no
2082          * locks are cached), or object was evicted. In the latter case cached
2083          * lock cannot be used, because it would prime inode state with
2084          * potentially stale LVB.
2085          */
2086         if (!kms_valid)
2087                 goto no_match;
2088
2089         /* Next, search for already existing extent locks that will cover us */
2090         /* If we're trying to read, we also search for an existing PW lock.  The
2091          * VFS and page cache already protect us locally, so lots of readers/
2092          * writers can share a single PW lock.
2093          *
2094          * There are problems with conversion deadlocks, so instead of
2095          * converting a read lock to a write lock, we'll just enqueue a new
2096          * one.
2097          *
2098          * At some point we should cancel the read lock instead of making them
2099          * send us a blocking callback, but there are problems with canceling
2100          * locks out from other users right now, too. */
2101         mode = einfo->ei_mode;
2102         if (einfo->ei_mode == LCK_PR)
2103                 mode |= LCK_PW;
2104         if (agl == 0)
2105                 match_flags |= LDLM_FL_LVB_READY;
2106         if (intent != 0)
2107                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2108         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2109                                einfo->ei_type, policy, mode, &lockh, 0);
2110         if (mode) {
2111                 struct ldlm_lock *matched;
2112
2113                 if (*flags & LDLM_FL_TEST_LOCK)
2114                         RETURN(ELDLM_OK);
2115
2116                 matched = ldlm_handle2lock(&lockh);
2117                 if (agl) {
2118                         /* AGL enqueues DLM locks speculatively. Therefore if
2119                          * it already exists a DLM lock, it wll just inform the
2120                          * caller to cancel the AGL process for this stripe. */
2121                         ldlm_lock_decref(&lockh, mode);
2122                         LDLM_LOCK_PUT(matched);
2123                         RETURN(-ECANCELED);
2124                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2125                         *flags |= LDLM_FL_LVB_READY;
2126
2127                         /* We already have a lock, and it's referenced. */
2128                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2129
2130                         ldlm_lock_decref(&lockh, mode);
2131                         LDLM_LOCK_PUT(matched);
2132                         RETURN(ELDLM_OK);
2133                 } else {
2134                         ldlm_lock_decref(&lockh, mode);
2135                         LDLM_LOCK_PUT(matched);
2136                 }
2137         }
2138
2139 no_match:
2140         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2141                 RETURN(-ENOLCK);
2142
2143         if (intent) {
2144                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2145                                            &RQF_LDLM_ENQUEUE_LVB);
2146                 if (req == NULL)
2147                         RETURN(-ENOMEM);
2148
2149                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2150                 if (rc) {
2151                         ptlrpc_request_free(req);
2152                         RETURN(rc);
2153                 }
2154
2155                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2156                                      sizeof *lvb);
2157                 ptlrpc_request_set_replen(req);
2158         }
2159
2160         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2161         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2162
2163         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2164                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2165         if (async) {
2166                 if (!rc) {
2167                         struct osc_enqueue_args *aa;
2168                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2169                         aa = ptlrpc_req_async_args(req);
2170                         aa->oa_exp    = exp;
2171                         aa->oa_mode   = einfo->ei_mode;
2172                         aa->oa_type   = einfo->ei_type;
2173                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2174                         aa->oa_upcall = upcall;
2175                         aa->oa_cookie = cookie;
2176                         aa->oa_agl    = !!agl;
2177                         if (!agl) {
2178                                 aa->oa_flags  = flags;
2179                                 aa->oa_lvb    = lvb;
2180                         } else {
2181                                 /* AGL is essentially to enqueue an DLM lock
2182                                  * in advance, so we don't care about the
2183                                  * result of AGL enqueue. */
2184                                 aa->oa_lvb    = NULL;
2185                                 aa->oa_flags  = NULL;
2186                         }
2187
2188                         req->rq_interpret_reply =
2189                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2190                         if (rqset == PTLRPCD_SET)
2191                                 ptlrpcd_add_req(req);
2192                         else
2193                                 ptlrpc_set_add_req(rqset, req);
2194                 } else if (intent) {
2195                         ptlrpc_req_finished(req);
2196                 }
2197                 RETURN(rc);
2198         }
2199
2200         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2201                               flags, agl, rc);
2202         if (intent)
2203                 ptlrpc_req_finished(req);
2204
2205         RETURN(rc);
2206 }
2207
2208 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2209                    enum ldlm_type type, union ldlm_policy_data *policy,
2210                    enum ldlm_mode mode, __u64 *flags, void *data,
2211                    struct lustre_handle *lockh, int unref)
2212 {
2213         struct obd_device *obd = exp->exp_obd;
2214         __u64 lflags = *flags;
2215         enum ldlm_mode rc;
2216         ENTRY;
2217
2218         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2219                 RETURN(-EIO);
2220
2221         /* Filesystem lock extents are extended to page boundaries so that
2222          * dealing with the page cache is a little smoother */
2223         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2224         policy->l_extent.end |= ~PAGE_MASK;
2225
2226         /* Next, search for already existing extent locks that will cover us */
2227         /* If we're trying to read, we also search for an existing PW lock.  The
2228          * VFS and page cache already protect us locally, so lots of readers/
2229          * writers can share a single PW lock. */
2230         rc = mode;
2231         if (mode == LCK_PR)
2232                 rc |= LCK_PW;
2233         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2234                              res_id, type, policy, rc, lockh, unref);
2235         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2236                 RETURN(rc);
2237
2238         if (data != NULL) {
2239                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2240
2241                 LASSERT(lock != NULL);
2242                 if (!osc_set_lock_data(lock, data)) {
2243                         ldlm_lock_decref(lockh, rc);
2244                         rc = 0;
2245                 }
2246                 LDLM_LOCK_PUT(lock);
2247         }
2248         RETURN(rc);
2249 }
2250
2251 static int osc_statfs_interpret(const struct lu_env *env,
2252                                 struct ptlrpc_request *req,
2253                                 struct osc_async_args *aa, int rc)
2254 {
2255         struct obd_statfs *msfs;
2256         ENTRY;
2257
2258         if (rc == -EBADR)
2259                 /* The request has in fact never been sent
2260                  * due to issues at a higher level (LOV).
2261                  * Exit immediately since the caller is
2262                  * aware of the problem and takes care
2263                  * of the clean up */
2264                  RETURN(rc);
2265
2266         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2267             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2268                 GOTO(out, rc = 0);
2269
2270         if (rc != 0)
2271                 GOTO(out, rc);
2272
2273         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2274         if (msfs == NULL) {
2275                 GOTO(out, rc = -EPROTO);
2276         }
2277
2278         *aa->aa_oi->oi_osfs = *msfs;
2279 out:
2280         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2281         RETURN(rc);
2282 }
2283
2284 static int osc_statfs_async(struct obd_export *exp,
2285                             struct obd_info *oinfo, __u64 max_age,
2286                             struct ptlrpc_request_set *rqset)
2287 {
2288         struct obd_device     *obd = class_exp2obd(exp);
2289         struct ptlrpc_request *req;
2290         struct osc_async_args *aa;
2291         int                    rc;
2292         ENTRY;
2293
2294         /* We could possibly pass max_age in the request (as an absolute
2295          * timestamp or a "seconds.usec ago") so the target can avoid doing
2296          * extra calls into the filesystem if that isn't necessary (e.g.
2297          * during mount that would help a bit).  Having relative timestamps
2298          * is not so great if request processing is slow, while absolute
2299          * timestamps are not ideal because they need time synchronization. */
2300         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2301         if (req == NULL)
2302                 RETURN(-ENOMEM);
2303
2304         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2305         if (rc) {
2306                 ptlrpc_request_free(req);
2307                 RETURN(rc);
2308         }
2309         ptlrpc_request_set_replen(req);
2310         req->rq_request_portal = OST_CREATE_PORTAL;
2311         ptlrpc_at_set_req_timeout(req);
2312
2313         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2314                 /* procfs requests not want stat in wait for avoid deadlock */
2315                 req->rq_no_resend = 1;
2316                 req->rq_no_delay = 1;
2317         }
2318
2319         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2320         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2321         aa = ptlrpc_req_async_args(req);
2322         aa->aa_oi = oinfo;
2323
2324         ptlrpc_set_add_req(rqset, req);
2325         RETURN(0);
2326 }
2327
2328 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2329                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2330 {
2331         struct obd_device     *obd = class_exp2obd(exp);
2332         struct obd_statfs     *msfs;
2333         struct ptlrpc_request *req;
2334         struct obd_import     *imp = NULL;
2335         int rc;
2336         ENTRY;
2337
2338         /*Since the request might also come from lprocfs, so we need
2339          *sync this with client_disconnect_export Bug15684*/
2340         down_read(&obd->u.cli.cl_sem);
2341         if (obd->u.cli.cl_import)
2342                 imp = class_import_get(obd->u.cli.cl_import);
2343         up_read(&obd->u.cli.cl_sem);
2344         if (!imp)
2345                 RETURN(-ENODEV);
2346
2347         /* We could possibly pass max_age in the request (as an absolute
2348          * timestamp or a "seconds.usec ago") so the target can avoid doing
2349          * extra calls into the filesystem if that isn't necessary (e.g.
2350          * during mount that would help a bit).  Having relative timestamps
2351          * is not so great if request processing is slow, while absolute
2352          * timestamps are not ideal because they need time synchronization. */
2353         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2354
2355         class_import_put(imp);
2356
2357         if (req == NULL)
2358                 RETURN(-ENOMEM);
2359
2360         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2361         if (rc) {
2362                 ptlrpc_request_free(req);
2363                 RETURN(rc);
2364         }
2365         ptlrpc_request_set_replen(req);
2366         req->rq_request_portal = OST_CREATE_PORTAL;
2367         ptlrpc_at_set_req_timeout(req);
2368
2369         if (flags & OBD_STATFS_NODELAY) {
2370                 /* procfs requests not want stat in wait for avoid deadlock */
2371                 req->rq_no_resend = 1;
2372                 req->rq_no_delay = 1;
2373         }
2374
2375         rc = ptlrpc_queue_wait(req);
2376         if (rc)
2377                 GOTO(out, rc);
2378
2379         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2380         if (msfs == NULL) {
2381                 GOTO(out, rc = -EPROTO);
2382         }
2383
2384         *osfs = *msfs;
2385
2386         EXIT;
2387  out:
2388         ptlrpc_req_finished(req);
2389         return rc;
2390 }
2391
2392 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2393                          void *karg, void __user *uarg)
2394 {
2395         struct obd_device *obd = exp->exp_obd;
2396         struct obd_ioctl_data *data = karg;
2397         int err = 0;
2398         ENTRY;
2399
2400         if (!try_module_get(THIS_MODULE)) {
2401                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2402                        module_name(THIS_MODULE));
2403                 return -EINVAL;
2404         }
2405         switch (cmd) {
2406         case OBD_IOC_CLIENT_RECOVER:
2407                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2408                                             data->ioc_inlbuf1, 0);
2409                 if (err > 0)
2410                         err = 0;
2411                 GOTO(out, err);
2412         case IOC_OSC_SET_ACTIVE:
2413                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2414                                                data->ioc_offset);
2415                 GOTO(out, err);
2416         case OBD_IOC_PING_TARGET:
2417                 err = ptlrpc_obd_ping(obd);
2418                 GOTO(out, err);
2419         default:
2420                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2421                        cmd, current_comm());
2422                 GOTO(out, err = -ENOTTY);
2423         }
2424 out:
2425         module_put(THIS_MODULE);
2426         return err;
2427 }
2428
2429 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2430                               u32 keylen, void *key,
2431                               u32 vallen, void *val,
2432                               struct ptlrpc_request_set *set)
2433 {
2434         struct ptlrpc_request *req;
2435         struct obd_device     *obd = exp->exp_obd;
2436         struct obd_import     *imp = class_exp2cliimp(exp);
2437         char                  *tmp;
2438         int                    rc;
2439         ENTRY;
2440
2441         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2442
2443         if (KEY_IS(KEY_CHECKSUM)) {
2444                 if (vallen != sizeof(int))
2445                         RETURN(-EINVAL);
2446                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2447                 RETURN(0);
2448         }
2449
2450         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2451                 sptlrpc_conf_client_adapt(obd);
2452                 RETURN(0);
2453         }
2454
2455         if (KEY_IS(KEY_FLUSH_CTX)) {
2456                 sptlrpc_import_flush_my_ctx(imp);
2457                 RETURN(0);
2458         }
2459
2460         if (KEY_IS(KEY_CACHE_SET)) {
2461                 struct client_obd *cli = &obd->u.cli;
2462
2463                 LASSERT(cli->cl_cache == NULL); /* only once */
2464                 cli->cl_cache = (struct cl_client_cache *)val;
2465                 cl_cache_incref(cli->cl_cache);
2466                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2467
2468                 /* add this osc into entity list */
2469                 LASSERT(list_empty(&cli->cl_lru_osc));
2470                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2471                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2472                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2473
2474                 RETURN(0);
2475         }
2476
2477         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2478                 struct client_obd *cli = &obd->u.cli;
2479                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2480                 long target = *(long *)val;
2481
2482                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2483                 *(long *)val -= nr;
2484                 RETURN(0);
2485         }
2486
2487         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2488                 RETURN(-EINVAL);
2489
2490         /* We pass all other commands directly to OST. Since nobody calls osc
2491            methods directly and everybody is supposed to go through LOV, we
2492            assume lov checked invalid values for us.
2493            The only recognised values so far are evict_by_nid and mds_conn.
2494            Even if something bad goes through, we'd get a -EINVAL from OST
2495            anyway. */
2496
2497         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2498                                                 &RQF_OST_SET_GRANT_INFO :
2499                                                 &RQF_OBD_SET_INFO);
2500         if (req == NULL)
2501                 RETURN(-ENOMEM);
2502
2503         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2504                              RCL_CLIENT, keylen);
2505         if (!KEY_IS(KEY_GRANT_SHRINK))
2506                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2507                                      RCL_CLIENT, vallen);
2508         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2509         if (rc) {
2510                 ptlrpc_request_free(req);
2511                 RETURN(rc);
2512         }
2513
2514         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2515         memcpy(tmp, key, keylen);
2516         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2517                                                         &RMF_OST_BODY :
2518                                                         &RMF_SETINFO_VAL);
2519         memcpy(tmp, val, vallen);
2520
2521         if (KEY_IS(KEY_GRANT_SHRINK)) {
2522                 struct osc_grant_args *aa;
2523                 struct obdo *oa;
2524
2525                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2526                 aa = ptlrpc_req_async_args(req);
2527                 OBDO_ALLOC(oa);
2528                 if (!oa) {
2529                         ptlrpc_req_finished(req);
2530                         RETURN(-ENOMEM);
2531                 }
2532                 *oa = ((struct ost_body *)val)->oa;
2533                 aa->aa_oa = oa;
2534                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2535         }
2536
2537         ptlrpc_request_set_replen(req);
2538         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2539                 LASSERT(set != NULL);
2540                 ptlrpc_set_add_req(set, req);
2541                 ptlrpc_check_set(NULL, set);
2542         } else {
2543                 ptlrpcd_add_req(req);
2544         }
2545
2546         RETURN(0);
2547 }
2548
2549 static int osc_reconnect(const struct lu_env *env,
2550                          struct obd_export *exp, struct obd_device *obd,
2551                          struct obd_uuid *cluuid,
2552                          struct obd_connect_data *data,
2553                          void *localdata)
2554 {
2555         struct client_obd *cli = &obd->u.cli;
2556
2557         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2558                 long lost_grant;
2559                 long grant;
2560
2561                 spin_lock(&cli->cl_loi_list_lock);
2562                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2563                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2564                         grant += cli->cl_dirty_grant;
2565                 else
2566                         grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2567                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2568                 lost_grant = cli->cl_lost_grant;
2569                 cli->cl_lost_grant = 0;
2570                 spin_unlock(&cli->cl_loi_list_lock);
2571
2572                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2573                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2574                        data->ocd_version, data->ocd_grant, lost_grant);
2575         }
2576
2577         RETURN(0);
2578 }
2579
2580 static int osc_disconnect(struct obd_export *exp)
2581 {
2582         struct obd_device *obd = class_exp2obd(exp);
2583         int rc;
2584
2585         rc = client_disconnect_export(exp);
2586         /**
2587          * Initially we put del_shrink_grant before disconnect_export, but it
2588          * causes the following problem if setup (connect) and cleanup
2589          * (disconnect) are tangled together.
2590          *      connect p1                     disconnect p2
2591          *   ptlrpc_connect_import
2592          *     ...............               class_manual_cleanup
2593          *                                     osc_disconnect
2594          *                                     del_shrink_grant
2595          *   ptlrpc_connect_interrupt
2596          *     init_grant_shrink
2597          *   add this client to shrink list
2598          *                                      cleanup_osc
2599          * Bang! pinger trigger the shrink.
2600          * So the osc should be disconnected from the shrink list, after we
2601          * are sure the import has been destroyed. BUG18662
2602          */
2603         if (obd->u.cli.cl_import == NULL)
2604                 osc_del_shrink_grant(&obd->u.cli);
2605         return rc;
2606 }
2607
2608 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2609         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2610 {
2611         struct lu_env *env = arg;
2612         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2613         struct ldlm_lock *lock;
2614         struct osc_object *osc = NULL;
2615         ENTRY;
2616
2617         lock_res(res);
2618         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2619                 if (lock->l_ast_data != NULL && osc == NULL) {
2620                         osc = lock->l_ast_data;
2621                         cl_object_get(osc2cl(osc));
2622                 }
2623
2624                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2625                  * by the 2nd round of ldlm_namespace_clean() call in
2626                  * osc_import_event(). */
2627                 ldlm_clear_cleaned(lock);
2628         }
2629         unlock_res(res);
2630
2631         if (osc != NULL) {
2632                 osc_object_invalidate(env, osc);
2633                 cl_object_put(env, osc2cl(osc));
2634         }
2635
2636         RETURN(0);
2637 }
2638
2639 static int osc_import_event(struct obd_device *obd,
2640                             struct obd_import *imp,
2641                             enum obd_import_event event)
2642 {
2643         struct client_obd *cli;
2644         int rc = 0;
2645
2646         ENTRY;
2647         LASSERT(imp->imp_obd == obd);
2648
2649         switch (event) {
2650         case IMP_EVENT_DISCON: {
2651                 cli = &obd->u.cli;
2652                 spin_lock(&cli->cl_loi_list_lock);
2653                 cli->cl_avail_grant = 0;
2654                 cli->cl_lost_grant = 0;
2655                 spin_unlock(&cli->cl_loi_list_lock);
2656                 break;
2657         }
2658         case IMP_EVENT_INACTIVE: {
2659                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2660                 break;
2661         }
2662         case IMP_EVENT_INVALIDATE: {
2663                 struct ldlm_namespace *ns = obd->obd_namespace;
2664                 struct lu_env         *env;
2665                 __u16                  refcheck;
2666
2667                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2668
2669                 env = cl_env_get(&refcheck);
2670                 if (!IS_ERR(env)) {
2671                         osc_io_unplug(env, &obd->u.cli, NULL);
2672
2673                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2674                                                  osc_ldlm_resource_invalidate,
2675                                                  env, 0);
2676                         cl_env_put(env, &refcheck);
2677
2678                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2679                 } else
2680                         rc = PTR_ERR(env);
2681                 break;
2682         }
2683         case IMP_EVENT_ACTIVE: {
2684                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2685                 break;
2686         }
2687         case IMP_EVENT_OCD: {
2688                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2689
2690                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2691                         osc_init_grant(&obd->u.cli, ocd);
2692
2693                 /* See bug 7198 */
2694                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2695                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2696
2697                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2698                 break;
2699         }
2700         case IMP_EVENT_DEACTIVATE: {
2701                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2702                 break;
2703         }
2704         case IMP_EVENT_ACTIVATE: {
2705                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2706                 break;
2707         }
2708         default:
2709                 CERROR("Unknown import event %d\n", event);
2710                 LBUG();
2711         }
2712         RETURN(rc);
2713 }
2714
2715 /**
2716  * Determine whether the lock can be canceled before replaying the lock
2717  * during recovery, see bug16774 for detailed information.
2718  *
2719  * \retval zero the lock can't be canceled
2720  * \retval other ok to cancel
2721  */
2722 static int osc_cancel_weight(struct ldlm_lock *lock)
2723 {
2724         /*
2725          * Cancel all unused and granted extent lock.
2726          */
2727         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2728             lock->l_granted_mode == lock->l_req_mode &&
2729             osc_ldlm_weigh_ast(lock) == 0)
2730                 RETURN(1);
2731
2732         RETURN(0);
2733 }
2734
2735 static int brw_queue_work(const struct lu_env *env, void *data)
2736 {
2737         struct client_obd *cli = data;
2738
2739         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2740
2741         osc_io_unplug(env, cli, NULL);
2742         RETURN(0);
2743 }
2744
2745 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2746 {
2747         struct client_obd *cli = &obd->u.cli;
2748         struct obd_type   *type;
2749         void              *handler;
2750         int                rc;
2751         int                adding;
2752         int                added;
2753         int                req_count;
2754         ENTRY;
2755
2756         rc = ptlrpcd_addref();
2757         if (rc)
2758                 RETURN(rc);
2759
2760         rc = client_obd_setup(obd, lcfg);
2761         if (rc)
2762                 GOTO(out_ptlrpcd, rc);
2763
2764         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2765         if (IS_ERR(handler))
2766                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2767         cli->cl_writeback_work = handler;
2768
2769         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2770         if (IS_ERR(handler))
2771                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2772         cli->cl_lru_work = handler;
2773
2774         rc = osc_quota_setup(obd);
2775         if (rc)
2776                 GOTO(out_ptlrpcd_work, rc);
2777
2778         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2779
2780 #ifdef CONFIG_PROC_FS
2781         obd->obd_vars = lprocfs_osc_obd_vars;
2782 #endif
2783         /* If this is true then both client (osc) and server (osp) are on the
2784          * same node. The osp layer if loaded first will register the osc proc
2785          * directory. In that case this obd_device will be attached its proc
2786          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2787         type = class_search_type(LUSTRE_OSP_NAME);
2788         if (type && type->typ_procsym) {
2789                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2790                                                        type->typ_procsym,
2791                                                        obd->obd_vars, obd);
2792                 if (IS_ERR(obd->obd_proc_entry)) {
2793                         rc = PTR_ERR(obd->obd_proc_entry);
2794                         CERROR("error %d setting up lprocfs for %s\n", rc,
2795                                obd->obd_name);
2796                         obd->obd_proc_entry = NULL;
2797                 }
2798         } else {
2799                 rc = lprocfs_obd_setup(obd);
2800         }
2801
2802         /* If the basic OSC proc tree construction succeeded then
2803          * lets do the rest. */
2804         if (rc == 0) {
2805                 lproc_osc_attach_seqstat(obd);
2806                 sptlrpc_lprocfs_cliobd_attach(obd);
2807                 ptlrpc_lprocfs_register_obd(obd);
2808         }
2809
2810         /*
2811          * We try to control the total number of requests with a upper limit
2812          * osc_reqpool_maxreqcount. There might be some race which will cause
2813          * over-limit allocation, but it is fine.
2814          */
2815         req_count = atomic_read(&osc_pool_req_count);
2816         if (req_count < osc_reqpool_maxreqcount) {
2817                 adding = cli->cl_max_rpcs_in_flight + 2;
2818                 if (req_count + adding > osc_reqpool_maxreqcount)
2819                         adding = osc_reqpool_maxreqcount - req_count;
2820
2821                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2822                 atomic_add(added, &osc_pool_req_count);
2823         }
2824
2825         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2826         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2827
2828         spin_lock(&osc_shrink_lock);
2829         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2830         spin_unlock(&osc_shrink_lock);
2831
2832         RETURN(0);
2833
2834 out_ptlrpcd_work:
2835         if (cli->cl_writeback_work != NULL) {
2836                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2837                 cli->cl_writeback_work = NULL;
2838         }
2839         if (cli->cl_lru_work != NULL) {
2840                 ptlrpcd_destroy_work(cli->cl_lru_work);
2841                 cli->cl_lru_work = NULL;
2842         }
2843 out_client_setup:
2844         client_obd_cleanup(obd);
2845 out_ptlrpcd:
2846         ptlrpcd_decref();
2847         RETURN(rc);
2848 }
2849
2850 static int osc_precleanup(struct obd_device *obd)
2851 {
2852         struct client_obd *cli = &obd->u.cli;
2853         ENTRY;
2854
2855         /* LU-464
2856          * for echo client, export may be on zombie list, wait for
2857          * zombie thread to cull it, because cli.cl_import will be
2858          * cleared in client_disconnect_export():
2859          *   class_export_destroy() -> obd_cleanup() ->
2860          *   echo_device_free() -> echo_client_cleanup() ->
2861          *   obd_disconnect() -> osc_disconnect() ->
2862          *   client_disconnect_export()
2863          */
2864         obd_zombie_barrier();
2865         if (cli->cl_writeback_work) {
2866                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2867                 cli->cl_writeback_work = NULL;
2868         }
2869
2870         if (cli->cl_lru_work) {
2871                 ptlrpcd_destroy_work(cli->cl_lru_work);
2872                 cli->cl_lru_work = NULL;
2873         }
2874
2875         obd_cleanup_client_import(obd);
2876         ptlrpc_lprocfs_unregister_obd(obd);
2877         lprocfs_obd_cleanup(obd);
2878         RETURN(0);
2879 }
2880
2881 int osc_cleanup(struct obd_device *obd)
2882 {
2883         struct client_obd *cli = &obd->u.cli;
2884         int rc;
2885
2886         ENTRY;
2887
2888         spin_lock(&osc_shrink_lock);
2889         list_del(&cli->cl_shrink_list);
2890         spin_unlock(&osc_shrink_lock);
2891
2892         /* lru cleanup */
2893         if (cli->cl_cache != NULL) {
2894                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2895                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2896                 list_del_init(&cli->cl_lru_osc);
2897                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2898                 cli->cl_lru_left = NULL;
2899                 cl_cache_decref(cli->cl_cache);
2900                 cli->cl_cache = NULL;
2901         }
2902
2903         /* free memory of osc quota cache */
2904         osc_quota_cleanup(obd);
2905
2906         rc = client_obd_cleanup(obd);
2907
2908         ptlrpcd_decref();
2909         RETURN(rc);
2910 }
2911
2912 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2913 {
2914         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2915         return rc > 0 ? 0: rc;
2916 }
2917
2918 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2919 {
2920         return osc_process_config_base(obd, buf);
2921 }
2922
2923 static struct obd_ops osc_obd_ops = {
2924         .o_owner                = THIS_MODULE,
2925         .o_setup                = osc_setup,
2926         .o_precleanup           = osc_precleanup,
2927         .o_cleanup              = osc_cleanup,
2928         .o_add_conn             = client_import_add_conn,
2929         .o_del_conn             = client_import_del_conn,
2930         .o_connect              = client_connect_import,
2931         .o_reconnect            = osc_reconnect,
2932         .o_disconnect           = osc_disconnect,
2933         .o_statfs               = osc_statfs,
2934         .o_statfs_async         = osc_statfs_async,
2935         .o_create               = osc_create,
2936         .o_destroy              = osc_destroy,
2937         .o_getattr              = osc_getattr,
2938         .o_setattr              = osc_setattr,
2939         .o_iocontrol            = osc_iocontrol,
2940         .o_set_info_async       = osc_set_info_async,
2941         .o_import_event         = osc_import_event,
2942         .o_process_config       = osc_process_config,
2943         .o_quotactl             = osc_quotactl,
2944 };
2945
2946 static struct shrinker *osc_cache_shrinker;
2947 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2948 DEFINE_SPINLOCK(osc_shrink_lock);
2949
2950 #ifndef HAVE_SHRINKER_COUNT
2951 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2952 {
2953         struct shrink_control scv = {
2954                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2955                 .gfp_mask   = shrink_param(sc, gfp_mask)
2956         };
2957 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2958         struct shrinker *shrinker = NULL;
2959 #endif
2960
2961         (void)osc_cache_shrink_scan(shrinker, &scv);
2962
2963         return osc_cache_shrink_count(shrinker, &scv);
2964 }
2965 #endif
2966
2967 static int __init osc_init(void)
2968 {
2969         bool enable_proc = true;
2970         struct obd_type *type;
2971         unsigned int reqpool_size;
2972         unsigned int reqsize;
2973         int rc;
2974         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2975                          osc_cache_shrink_count, osc_cache_shrink_scan);
2976         ENTRY;
2977
2978         /* print an address of _any_ initialized kernel symbol from this
2979          * module, to allow debugging with gdb that doesn't support data
2980          * symbols from modules.*/
2981         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2982
2983         rc = lu_kmem_init(osc_caches);
2984         if (rc)
2985                 RETURN(rc);
2986
2987         type = class_search_type(LUSTRE_OSP_NAME);
2988         if (type != NULL && type->typ_procsym != NULL)
2989                 enable_proc = false;
2990
2991         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2992                                  LUSTRE_OSC_NAME, &osc_device_type);
2993         if (rc)
2994                 GOTO(out_kmem, rc);
2995
2996         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2997
2998         /* This is obviously too much memory, only prevent overflow here */
2999         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3000                 GOTO(out_type, rc = -EINVAL);
3001
3002         reqpool_size = osc_reqpool_mem_max << 20;
3003
3004         reqsize = 1;
3005         while (reqsize < OST_IO_MAXREQSIZE)
3006                 reqsize = reqsize << 1;
3007
3008         /*
3009          * We don't enlarge the request count in OSC pool according to
3010          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3011          * tried after normal allocation failed. So a small OSC pool won't
3012          * cause much performance degression in most of cases.
3013          */
3014         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3015
3016         atomic_set(&osc_pool_req_count, 0);
3017         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3018                                           ptlrpc_add_rqs_to_pool);
3019
3020         if (osc_rq_pool != NULL)
3021                 GOTO(out, rc);
3022         rc = -ENOMEM;
3023 out_type:
3024         class_unregister_type(LUSTRE_OSC_NAME);
3025 out_kmem:
3026         lu_kmem_fini(osc_caches);
3027 out:
3028         RETURN(rc);
3029 }
3030
3031 static void __exit osc_exit(void)
3032 {
3033         remove_shrinker(osc_cache_shrinker);
3034         class_unregister_type(LUSTRE_OSC_NAME);
3035         lu_kmem_fini(osc_caches);
3036         ptlrpc_free_rq_pool(osc_rq_pool);
3037 }
3038
3039 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3040 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3041 MODULE_VERSION(LUSTRE_VERSION_STRING);
3042 MODULE_LICENSE("GPL");
3043
3044 module_init(osc_init);
3045 module_exit(osc_exit);