Whamcloud - gitweb
LU-6635 lfsck: block replacing the OST-object for test
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lustre/lustre_user.h>
38
39 #include <lprocfs_status.h>
40 #include <lustre_debug.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <lustre_ha.h>
44 #include <lustre_ioctl.h>
45 #include <lustre_net.h>
46 #include <lustre_obdo.h>
47 #include <lustre_param.h>
48 #include <obd.h>
49 #include <obd_cksum.h>
50 #include <obd_class.h>
51
52 #include "osc_cl_internal.h"
53 #include "osc_internal.h"
54
55 atomic_t osc_pool_req_count;
56 unsigned int osc_reqpool_maxreqcount;
57 struct ptlrpc_request_pool *osc_rq_pool;
58
59 /* max memory used for request pool, unit is MB */
60 static unsigned int osc_reqpool_mem_max = 5;
61 module_param(osc_reqpool_mem_max, uint, 0444);
62
63 struct osc_brw_async_args {
64         struct obdo              *aa_oa;
65         int                       aa_requested_nob;
66         int                       aa_nio_count;
67         u32                       aa_page_count;
68         int                       aa_resends;
69         struct brw_page **aa_ppga;
70         struct client_obd        *aa_cli;
71         struct list_head          aa_oaps;
72         struct list_head          aa_exts;
73 };
74
75 #define osc_grant_args osc_brw_async_args
76
77 struct osc_setattr_args {
78         struct obdo             *sa_oa;
79         obd_enqueue_update_f     sa_upcall;
80         void                    *sa_cookie;
81 };
82
83 struct osc_fsync_args {
84         struct osc_object       *fa_obj;
85         struct obdo             *fa_oa;
86         obd_enqueue_update_f    fa_upcall;
87         void                    *fa_cookie;
88 };
89
90 struct osc_ladvise_args {
91         struct obdo             *la_oa;
92         obd_enqueue_update_f     la_upcall;
93         void                    *la_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export       *oa_exp;
98         enum ldlm_type          oa_type;
99         enum ldlm_mode          oa_mode;
100         __u64                   *oa_flags;
101         osc_enqueue_upcall_f    oa_upcall;
102         void                    *oa_cookie;
103         struct ost_lvb          *oa_lvb;
104         struct lustre_handle    oa_lockh;
105         unsigned int            oa_agl:1;
106 };
107
108 static void osc_release_ppga(struct brw_page **ppga, size_t count);
109 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110                          void *data, int rc);
111
112 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
113 {
114         struct ost_body *body;
115
116         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117         LASSERT(body);
118
119         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 }
121
122 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123                        struct obdo *oa)
124 {
125         struct ptlrpc_request   *req;
126         struct ost_body         *body;
127         int                      rc;
128
129         ENTRY;
130         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
131         if (req == NULL)
132                 RETURN(-ENOMEM);
133
134         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
135         if (rc) {
136                 ptlrpc_request_free(req);
137                 RETURN(rc);
138         }
139
140         osc_pack_req_body(req, oa);
141
142         ptlrpc_request_set_replen(req);
143
144         rc = ptlrpc_queue_wait(req);
145         if (rc)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body == NULL)
150                 GOTO(out, rc = -EPROTO);
151
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
154
155         oa->o_blksize = cli_brw_size(exp->exp_obd);
156         oa->o_valid |= OBD_MD_FLBLKSZ;
157
158         EXIT;
159 out:
160         ptlrpc_req_finished(req);
161
162         return rc;
163 }
164
165 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166                        struct obdo *oa)
167 {
168         struct ptlrpc_request   *req;
169         struct ost_body         *body;
170         int                      rc;
171
172         ENTRY;
173         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
180         if (rc) {
181                 ptlrpc_request_free(req);
182                 RETURN(rc);
183         }
184
185         osc_pack_req_body(req, oa);
186
187         ptlrpc_request_set_replen(req);
188
189         rc = ptlrpc_queue_wait(req);
190         if (rc)
191                 GOTO(out, rc);
192
193         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194         if (body == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
198
199         EXIT;
200 out:
201         ptlrpc_req_finished(req);
202
203         RETURN(rc);
204 }
205
206 static int osc_setattr_interpret(const struct lu_env *env,
207                                  struct ptlrpc_request *req,
208                                  struct osc_setattr_args *sa, int rc)
209 {
210         struct ost_body *body;
211         ENTRY;
212
213         if (rc != 0)
214                 GOTO(out, rc);
215
216         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         if (body == NULL)
218                 GOTO(out, rc = -EPROTO);
219
220         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221                              &body->oa);
222 out:
223         rc = sa->sa_upcall(sa->sa_cookie, rc);
224         RETURN(rc);
225 }
226
227 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
228                       obd_enqueue_update_f upcall, void *cookie,
229                       struct ptlrpc_request_set *rqset)
230 {
231         struct ptlrpc_request   *req;
232         struct osc_setattr_args *sa;
233         int                      rc;
234
235         ENTRY;
236
237         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
242         if (rc) {
243                 ptlrpc_request_free(req);
244                 RETURN(rc);
245         }
246
247         osc_pack_req_body(req, oa);
248
249         ptlrpc_request_set_replen(req);
250
251         /* do mds to ost setattr asynchronously */
252         if (!rqset) {
253                 /* Do not wait for response. */
254                 ptlrpcd_add_req(req);
255         } else {
256                 req->rq_interpret_reply =
257                         (ptlrpc_interpterer_t)osc_setattr_interpret;
258
259                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
260                 sa = ptlrpc_req_async_args(req);
261                 sa->sa_oa = oa;
262                 sa->sa_upcall = upcall;
263                 sa->sa_cookie = cookie;
264
265                 if (rqset == PTLRPCD_SET)
266                         ptlrpcd_add_req(req);
267                 else
268                         ptlrpc_set_add_req(rqset, req);
269         }
270
271         RETURN(0);
272 }
273
274 static int osc_ladvise_interpret(const struct lu_env *env,
275                                  struct ptlrpc_request *req,
276                                  void *arg, int rc)
277 {
278         struct osc_ladvise_args *la = arg;
279         struct ost_body *body;
280         ENTRY;
281
282         if (rc != 0)
283                 GOTO(out, rc);
284
285         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286         if (body == NULL)
287                 GOTO(out, rc = -EPROTO);
288
289         *la->la_oa = body->oa;
290 out:
291         rc = la->la_upcall(la->la_cookie, rc);
292         RETURN(rc);
293 }
294
295 /**
296  * If rqset is NULL, do not wait for response. Upcall and cookie could also
297  * be NULL in this case
298  */
299 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
300                      struct ladvise_hdr *ladvise_hdr,
301                      obd_enqueue_update_f upcall, void *cookie,
302                      struct ptlrpc_request_set *rqset)
303 {
304         struct ptlrpc_request   *req;
305         struct ost_body         *body;
306         struct osc_ladvise_args *la;
307         int                      rc;
308         struct lu_ladvise       *req_ladvise;
309         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
310         int                      num_advise = ladvise_hdr->lah_count;
311         struct ladvise_hdr      *req_ladvise_hdr;
312         ENTRY;
313
314         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
315         if (req == NULL)
316                 RETURN(-ENOMEM);
317
318         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
319                              num_advise * sizeof(*ladvise));
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
321         if (rc != 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325         req->rq_request_portal = OST_IO_PORTAL;
326         ptlrpc_at_set_req_timeout(req);
327
328         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
329         LASSERT(body);
330         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331                              oa);
332
333         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
334                                                  &RMF_OST_LADVISE_HDR);
335         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
336
337         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
338         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
339         ptlrpc_request_set_replen(req);
340
341         if (rqset == NULL) {
342                 /* Do not wait for response. */
343                 ptlrpcd_add_req(req);
344                 RETURN(0);
345         }
346
347         req->rq_interpret_reply = osc_ladvise_interpret;
348         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
349         la = ptlrpc_req_async_args(req);
350         la->la_oa = oa;
351         la->la_upcall = upcall;
352         la->la_cookie = cookie;
353
354         if (rqset == PTLRPCD_SET)
355                 ptlrpcd_add_req(req);
356         else
357                 ptlrpc_set_add_req(rqset, req);
358
359         RETURN(0);
360 }
361
362 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363                       struct obdo *oa)
364 {
365         struct ptlrpc_request *req;
366         struct ost_body       *body;
367         int                    rc;
368         ENTRY;
369
370         LASSERT(oa != NULL);
371         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
372         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
373
374         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
375         if (req == NULL)
376                 GOTO(out, rc = -ENOMEM);
377
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 GOTO(out, rc);
382         }
383
384         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385         LASSERT(body);
386
387         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
388
389         ptlrpc_request_set_replen(req);
390
391         rc = ptlrpc_queue_wait(req);
392         if (rc)
393                 GOTO(out_req, rc);
394
395         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
396         if (body == NULL)
397                 GOTO(out_req, rc = -EPROTO);
398
399         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
400         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
401
402         oa->o_blksize = cli_brw_size(exp->exp_obd);
403         oa->o_valid |= OBD_MD_FLBLKSZ;
404
405         CDEBUG(D_HA, "transno: %lld\n",
406                lustre_msg_get_transno(req->rq_repmsg));
407 out_req:
408         ptlrpc_req_finished(req);
409 out:
410         RETURN(rc);
411 }
412
413 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
414                    obd_enqueue_update_f upcall, void *cookie,
415                    struct ptlrpc_request_set *rqset)
416 {
417         struct ptlrpc_request   *req;
418         struct osc_setattr_args *sa;
419         struct ost_body         *body;
420         int                      rc;
421         ENTRY;
422
423         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
424         if (req == NULL)
425                 RETURN(-ENOMEM);
426
427         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(rc);
431         }
432         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
433         ptlrpc_at_set_req_timeout(req);
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
438
439         ptlrpc_request_set_replen(req);
440
441         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
442         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
443         sa = ptlrpc_req_async_args(req);
444         sa->sa_oa = oa;
445         sa->sa_upcall = upcall;
446         sa->sa_cookie = cookie;
447         if (rqset == PTLRPCD_SET)
448                 ptlrpcd_add_req(req);
449         else
450                 ptlrpc_set_add_req(rqset, req);
451
452         RETURN(0);
453 }
454
455 static int osc_sync_interpret(const struct lu_env *env,
456                               struct ptlrpc_request *req,
457                               void *arg, int rc)
458 {
459         struct osc_fsync_args   *fa = arg;
460         struct ost_body         *body;
461         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
462         unsigned long           valid = 0;
463         struct cl_object        *obj;
464         ENTRY;
465
466         if (rc != 0)
467                 GOTO(out, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL) {
471                 CERROR("can't unpack ost_body\n");
472                 GOTO(out, rc = -EPROTO);
473         }
474
475         *fa->fa_oa = body->oa;
476         obj = osc2cl(fa->fa_obj);
477
478         /* Update osc object's blocks attribute */
479         cl_object_attr_lock(obj);
480         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
481                 attr->cat_blocks = body->oa.o_blocks;
482                 valid |= CAT_BLOCKS;
483         }
484
485         if (valid != 0)
486                 cl_object_attr_update(env, obj, attr, valid);
487         cl_object_attr_unlock(obj);
488
489 out:
490         rc = fa->fa_upcall(fa->fa_cookie, rc);
491         RETURN(rc);
492 }
493
494 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
495                   obd_enqueue_update_f upcall, void *cookie,
496                   struct ptlrpc_request_set *rqset)
497 {
498         struct obd_export     *exp = osc_export(obj);
499         struct ptlrpc_request *req;
500         struct ost_body       *body;
501         struct osc_fsync_args *fa;
502         int                    rc;
503         ENTRY;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
506         if (req == NULL)
507                 RETURN(-ENOMEM);
508
509         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
510         if (rc) {
511                 ptlrpc_request_free(req);
512                 RETURN(rc);
513         }
514
515         /* overload the size and blocks fields in the oa with start/end */
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521         req->rq_interpret_reply = osc_sync_interpret;
522
523         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
524         fa = ptlrpc_req_async_args(req);
525         fa->fa_obj = obj;
526         fa->fa_oa = oa;
527         fa->fa_upcall = upcall;
528         fa->fa_cookie = cookie;
529
530         if (rqset == PTLRPCD_SET)
531                 ptlrpcd_add_req(req);
532         else
533                 ptlrpc_set_add_req(rqset, req);
534
535         RETURN (0);
536 }
537
538 /* Find and cancel locally locks matched by @mode in the resource found by
539  * @objid. Found locks are added into @cancel list. Returns the amount of
540  * locks added to @cancels list. */
541 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
542                                    struct list_head *cancels,
543                                    enum ldlm_mode mode, __u64 lock_flags)
544 {
545         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
546         struct ldlm_res_id res_id;
547         struct ldlm_resource *res;
548         int count;
549         ENTRY;
550
551         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
552          * export) but disabled through procfs (flag in NS).
553          *
554          * This distinguishes from a case when ELC is not supported originally,
555          * when we still want to cancel locks in advance and just cancel them
556          * locally, without sending any RPC. */
557         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558                 RETURN(0);
559
560         ostid_build_res_name(&oa->o_oi, &res_id);
561         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
562         if (IS_ERR(res))
563                 RETURN(0);
564
565         LDLM_RESOURCE_ADDREF(res);
566         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
567                                            lock_flags, 0, NULL);
568         LDLM_RESOURCE_DELREF(res);
569         ldlm_resource_putref(res);
570         RETURN(count);
571 }
572
573 static int osc_destroy_interpret(const struct lu_env *env,
574                                  struct ptlrpc_request *req, void *data,
575                                  int rc)
576 {
577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
578
579         atomic_dec(&cli->cl_destroy_in_flight);
580         wake_up(&cli->cl_destroy_waitq);
581         return 0;
582 }
583
584 static int osc_can_send_destroy(struct client_obd *cli)
585 {
586         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
587             cli->cl_max_rpcs_in_flight) {
588                 /* The destroy request can be sent */
589                 return 1;
590         }
591         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
592             cli->cl_max_rpcs_in_flight) {
593                 /*
594                  * The counter has been modified between the two atomic
595                  * operations.
596                  */
597                 wake_up(&cli->cl_destroy_waitq);
598         }
599         return 0;
600 }
601
602 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603                        struct obdo *oa)
604 {
605         struct client_obd     *cli = &exp->exp_obd->u.cli;
606         struct ptlrpc_request *req;
607         struct ost_body       *body;
608         struct list_head       cancels = LIST_HEAD_INIT(cancels);
609         int rc, count;
610         ENTRY;
611
612         if (!oa) {
613                 CDEBUG(D_INFO, "oa NULL\n");
614                 RETURN(-EINVAL);
615         }
616
617         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
618                                         LDLM_FL_DISCARD_DATA);
619
620         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
621         if (req == NULL) {
622                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
623                 RETURN(-ENOMEM);
624         }
625
626         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627                                0, &cancels, count);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 RETURN(rc);
631         }
632
633         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
634         ptlrpc_at_set_req_timeout(req);
635
636         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
637         LASSERT(body);
638         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
639
640         ptlrpc_request_set_replen(req);
641
642         req->rq_interpret_reply = osc_destroy_interpret;
643         if (!osc_can_send_destroy(cli)) {
644                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645
646                 /*
647                  * Wait until the number of on-going destroy RPCs drops
648                  * under max_rpc_in_flight
649                  */
650                 l_wait_event_exclusive(cli->cl_destroy_waitq,
651                                        osc_can_send_destroy(cli), &lwi);
652         }
653
654         /* Do not wait for response */
655         ptlrpcd_add_req(req);
656         RETURN(0);
657 }
658
659 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
660                                 long writing_bytes)
661 {
662         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
663
664         LASSERT(!(oa->o_valid & bits));
665
666         oa->o_valid |= bits;
667         spin_lock(&cli->cl_loi_list_lock);
668         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
669                 oa->o_dirty = cli->cl_dirty_grant;
670         else
671                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
672         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
673                      cli->cl_dirty_max_pages)) {
674                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
675                        cli->cl_dirty_pages, cli->cl_dirty_transit,
676                        cli->cl_dirty_max_pages);
677                 oa->o_undirty = 0;
678         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
679                             atomic_long_read(&obd_dirty_transit_pages) >
680                             (long)(obd_max_dirty_pages + 1))) {
681                 /* The atomic_read() allowing the atomic_inc() are
682                  * not covered by a lock thus they may safely race and trip
683                  * this CERROR() unless we add in a small fudge factor (+1). */
684                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
685                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
686                        atomic_long_read(&obd_dirty_transit_pages),
687                        obd_max_dirty_pages);
688                 oa->o_undirty = 0;
689         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
690                             0x7fffffff)) {
691                 CERROR("dirty %lu - dirty_max %lu too big???\n",
692                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
693                 oa->o_undirty = 0;
694         } else {
695                 unsigned long nrpages;
696
697                 nrpages = cli->cl_max_pages_per_rpc;
698                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
699                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
700                 oa->o_undirty = nrpages << PAGE_SHIFT;
701                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
702                                  GRANT_PARAM)) {
703                         int nrextents;
704
705                         /* take extent tax into account when asking for more
706                          * grant space */
707                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
708                                      cli->cl_max_extent_pages;
709                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
710                 }
711         }
712         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
713         oa->o_dropped = cli->cl_lost_grant;
714         cli->cl_lost_grant = 0;
715         spin_unlock(&cli->cl_loi_list_lock);
716         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
717                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
718 }
719
720 void osc_update_next_shrink(struct client_obd *cli)
721 {
722         cli->cl_next_shrink_grant =
723                 cfs_time_shift(cli->cl_grant_shrink_interval);
724         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
725                cli->cl_next_shrink_grant);
726 }
727
728 static void __osc_update_grant(struct client_obd *cli, u64 grant)
729 {
730         spin_lock(&cli->cl_loi_list_lock);
731         cli->cl_avail_grant += grant;
732         spin_unlock(&cli->cl_loi_list_lock);
733 }
734
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
736 {
737         if (body->oa.o_valid & OBD_MD_FLGRANT) {
738                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
739                 __osc_update_grant(cli, body->oa.o_grant);
740         }
741 }
742
743 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
744                               u32 keylen, void *key,
745                               u32 vallen, void *val,
746                               struct ptlrpc_request_set *set);
747
748 static int osc_shrink_grant_interpret(const struct lu_env *env,
749                                       struct ptlrpc_request *req,
750                                       void *aa, int rc)
751 {
752         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
753         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
754         struct ost_body *body;
755
756         if (rc != 0) {
757                 __osc_update_grant(cli, oa->o_grant);
758                 GOTO(out, rc);
759         }
760
761         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
762         LASSERT(body);
763         osc_update_grant(cli, body);
764 out:
765         OBDO_FREE(oa);
766         return rc;
767 }
768
769 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
770 {
771         spin_lock(&cli->cl_loi_list_lock);
772         oa->o_grant = cli->cl_avail_grant / 4;
773         cli->cl_avail_grant -= oa->o_grant;
774         spin_unlock(&cli->cl_loi_list_lock);
775         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
776                 oa->o_valid |= OBD_MD_FLFLAGS;
777                 oa->o_flags = 0;
778         }
779         oa->o_flags |= OBD_FL_SHRINK_GRANT;
780         osc_update_next_shrink(cli);
781 }
782
783 /* Shrink the current grant, either from some large amount to enough for a
784  * full set of in-flight RPCs, or if we have already shrunk to that limit
785  * then to enough for a single RPC.  This avoids keeping more grant than
786  * needed, and avoids shrinking the grant piecemeal. */
787 static int osc_shrink_grant(struct client_obd *cli)
788 {
789         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
790                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
791
792         spin_lock(&cli->cl_loi_list_lock);
793         if (cli->cl_avail_grant <= target_bytes)
794                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
795         spin_unlock(&cli->cl_loi_list_lock);
796
797         return osc_shrink_grant_to_target(cli, target_bytes);
798 }
799
800 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
801 {
802         int                     rc = 0;
803         struct ost_body        *body;
804         ENTRY;
805
806         spin_lock(&cli->cl_loi_list_lock);
807         /* Don't shrink if we are already above or below the desired limit
808          * We don't want to shrink below a single RPC, as that will negatively
809          * impact block allocation and long-term performance. */
810         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
811                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
812
813         if (target_bytes >= cli->cl_avail_grant) {
814                 spin_unlock(&cli->cl_loi_list_lock);
815                 RETURN(0);
816         }
817         spin_unlock(&cli->cl_loi_list_lock);
818
819         OBD_ALLOC_PTR(body);
820         if (!body)
821                 RETURN(-ENOMEM);
822
823         osc_announce_cached(cli, &body->oa, 0);
824
825         spin_lock(&cli->cl_loi_list_lock);
826         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
827         cli->cl_avail_grant = target_bytes;
828         spin_unlock(&cli->cl_loi_list_lock);
829         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
830                 body->oa.o_valid |= OBD_MD_FLFLAGS;
831                 body->oa.o_flags = 0;
832         }
833         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
834         osc_update_next_shrink(cli);
835
836         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
837                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
838                                 sizeof(*body), body, NULL);
839         if (rc != 0)
840                 __osc_update_grant(cli, body->oa.o_grant);
841         OBD_FREE_PTR(body);
842         RETURN(rc);
843 }
844
845 static int osc_should_shrink_grant(struct client_obd *client)
846 {
847         cfs_time_t time = cfs_time_current();
848         cfs_time_t next_shrink = client->cl_next_shrink_grant;
849
850         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
851              OBD_CONNECT_GRANT_SHRINK) == 0)
852                 return 0;
853
854         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
855                 /* Get the current RPC size directly, instead of going via:
856                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
857                  * Keep comment here so that it can be found by searching. */
858                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
859
860                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
861                     client->cl_avail_grant > brw_size)
862                         return 1;
863                 else
864                         osc_update_next_shrink(client);
865         }
866         return 0;
867 }
868
869 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
870 {
871         struct client_obd *client;
872
873         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
874                 if (osc_should_shrink_grant(client))
875                         osc_shrink_grant(client);
876         }
877         return 0;
878 }
879
880 static int osc_add_shrink_grant(struct client_obd *client)
881 {
882         int rc;
883
884         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
885                                        TIMEOUT_GRANT,
886                                        osc_grant_shrink_grant_cb, NULL,
887                                        &client->cl_grant_shrink_list);
888         if (rc) {
889                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
890                 return rc;
891         }
892         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
893         osc_update_next_shrink(client);
894         return 0;
895 }
896
897 static int osc_del_shrink_grant(struct client_obd *client)
898 {
899         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
900                                          TIMEOUT_GRANT);
901 }
902
903 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
904 {
905         /*
906          * ocd_grant is the total grant amount we're expect to hold: if we've
907          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
908          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
909          * dirty.
910          *
911          * race is tolerable here: if we're evicted, but imp_state already
912          * left EVICTED state, then cl_dirty_pages must be 0 already.
913          */
914         spin_lock(&cli->cl_loi_list_lock);
915         cli->cl_avail_grant = ocd->ocd_grant;
916         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
917                 cli->cl_avail_grant -= cli->cl_reserved_grant;
918                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
919                         cli->cl_avail_grant -= cli->cl_dirty_grant;
920                 else
921                         cli->cl_avail_grant -=
922                                         cli->cl_dirty_pages << PAGE_SHIFT;
923         }
924
925         if (cli->cl_avail_grant < 0) {
926                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
927                       cli_name(cli), cli->cl_avail_grant,
928                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
929                 /* workaround for servers which do not have the patch from
930                  * LU-2679 */
931                 cli->cl_avail_grant = ocd->ocd_grant;
932         }
933
934         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
935                 u64 size;
936                 int chunk_mask;
937
938                 /* overhead for each extent insertion */
939                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
940                 /* determine the appropriate chunk size used by osc_extent. */
941                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
942                                           ocd->ocd_grant_blkbits);
943                 /* max_pages_per_rpc must be chunk aligned */
944                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
945                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
946                                              ~chunk_mask) & chunk_mask;
947                 /* determine maximum extent size, in #pages */
948                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
949                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
950                 if (cli->cl_max_extent_pages == 0)
951                         cli->cl_max_extent_pages = 1;
952         } else {
953                 cli->cl_grant_extent_tax = 0;
954                 cli->cl_chunkbits = PAGE_SHIFT;
955                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
956         }
957         spin_unlock(&cli->cl_loi_list_lock);
958
959         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
960                 "chunk bits: %d cl_max_extent_pages: %d\n",
961                 cli_name(cli),
962                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
963                 cli->cl_max_extent_pages);
964
965         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
966             list_empty(&cli->cl_grant_shrink_list))
967                 osc_add_shrink_grant(cli);
968 }
969
970 /* We assume that the reason this OSC got a short read is because it read
971  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
972  * via the LOV, and it _knows_ it's reading inside the file, it's just that
973  * this stripe never got written at or beyond this stripe offset yet. */
974 static void handle_short_read(int nob_read, size_t page_count,
975                               struct brw_page **pga)
976 {
977         char *ptr;
978         int i = 0;
979
980         /* skip bytes read OK */
981         while (nob_read > 0) {
982                 LASSERT (page_count > 0);
983
984                 if (pga[i]->count > nob_read) {
985                         /* EOF inside this page */
986                         ptr = kmap(pga[i]->pg) +
987                                 (pga[i]->off & ~PAGE_MASK);
988                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
989                         kunmap(pga[i]->pg);
990                         page_count--;
991                         i++;
992                         break;
993                 }
994
995                 nob_read -= pga[i]->count;
996                 page_count--;
997                 i++;
998         }
999
1000         /* zero remaining pages */
1001         while (page_count-- > 0) {
1002                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1003                 memset(ptr, 0, pga[i]->count);
1004                 kunmap(pga[i]->pg);
1005                 i++;
1006         }
1007 }
1008
1009 static int check_write_rcs(struct ptlrpc_request *req,
1010                            int requested_nob, int niocount,
1011                            size_t page_count, struct brw_page **pga)
1012 {
1013         int     i;
1014         __u32   *remote_rcs;
1015
1016         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1017                                                   sizeof(*remote_rcs) *
1018                                                   niocount);
1019         if (remote_rcs == NULL) {
1020                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1021                 return(-EPROTO);
1022         }
1023
1024         /* return error if any niobuf was in error */
1025         for (i = 0; i < niocount; i++) {
1026                 if ((int)remote_rcs[i] < 0)
1027                         return(remote_rcs[i]);
1028
1029                 if (remote_rcs[i] != 0) {
1030                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1031                                 i, remote_rcs[i], req);
1032                         return(-EPROTO);
1033                 }
1034         }
1035
1036         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1037                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1038                        req->rq_bulk->bd_nob_transferred, requested_nob);
1039                 return(-EPROTO);
1040         }
1041
1042         return (0);
1043 }
1044
1045 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1046 {
1047         if (p1->flag != p2->flag) {
1048                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1049                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1050                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1051
1052                 /* warn if we try to combine flags that we don't know to be
1053                  * safe to combine */
1054                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1055                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1056                               "report this at https://jira.hpdd.intel.com/\n",
1057                               p1->flag, p2->flag);
1058                 }
1059                 return 0;
1060         }
1061
1062         return (p1->off + p1->count == p2->off);
1063 }
1064
1065 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1066                              struct brw_page **pga, int opc,
1067                              cksum_type_t cksum_type)
1068 {
1069         u32                             cksum;
1070         int                             i = 0;
1071         struct cfs_crypto_hash_desc     *hdesc;
1072         unsigned int                    bufsize;
1073         int                             err;
1074         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1075
1076         LASSERT(pg_count > 0);
1077
1078         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1079         if (IS_ERR(hdesc)) {
1080                 CERROR("Unable to initialize checksum hash %s\n",
1081                        cfs_crypto_hash_name(cfs_alg));
1082                 return PTR_ERR(hdesc);
1083         }
1084
1085         while (nob > 0 && pg_count > 0) {
1086                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1087
1088                 /* corrupt the data before we compute the checksum, to
1089                  * simulate an OST->client data error */
1090                 if (i == 0 && opc == OST_READ &&
1091                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1092                         unsigned char *ptr = kmap(pga[i]->pg);
1093                         int off = pga[i]->off & ~PAGE_MASK;
1094
1095                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1096                         kunmap(pga[i]->pg);
1097                 }
1098                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1099                                             pga[i]->off & ~PAGE_MASK,
1100                                             count);
1101                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1102                                (int)(pga[i]->off & ~PAGE_MASK));
1103
1104                 nob -= pga[i]->count;
1105                 pg_count--;
1106                 i++;
1107         }
1108
1109         bufsize = sizeof(cksum);
1110         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1111
1112         /* For sending we only compute the wrong checksum instead
1113          * of corrupting the data so it is still correct on a redo */
1114         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1115                 cksum++;
1116
1117         return cksum;
1118 }
1119
1120 static int
1121 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1122                      u32 page_count, struct brw_page **pga,
1123                      struct ptlrpc_request **reqp, int resend)
1124 {
1125         struct ptlrpc_request   *req;
1126         struct ptlrpc_bulk_desc *desc;
1127         struct ost_body         *body;
1128         struct obd_ioobj        *ioobj;
1129         struct niobuf_remote    *niobuf;
1130         int niocount, i, requested_nob, opc, rc;
1131         struct osc_brw_async_args *aa;
1132         struct req_capsule      *pill;
1133         struct brw_page *pg_prev;
1134
1135         ENTRY;
1136         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1137                 RETURN(-ENOMEM); /* Recoverable */
1138         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1139                 RETURN(-EINVAL); /* Fatal */
1140
1141         if ((cmd & OBD_BRW_WRITE) != 0) {
1142                 opc = OST_WRITE;
1143                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144                                                 osc_rq_pool,
1145                                                 &RQF_OST_BRW_WRITE);
1146         } else {
1147                 opc = OST_READ;
1148                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1149         }
1150         if (req == NULL)
1151                 RETURN(-ENOMEM);
1152
1153         for (niocount = i = 1; i < page_count; i++) {
1154                 if (!can_merge_pages(pga[i - 1], pga[i]))
1155                         niocount++;
1156         }
1157
1158         pill = &req->rq_pill;
1159         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160                              sizeof(*ioobj));
1161         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1162                              niocount * sizeof(*niobuf));
1163
1164         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1165         if (rc) {
1166                 ptlrpc_request_free(req);
1167                 RETURN(rc);
1168         }
1169         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1170         ptlrpc_at_set_req_timeout(req);
1171         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1172          * retry logic */
1173         req->rq_no_retry_einprogress = 1;
1174
1175         desc = ptlrpc_prep_bulk_imp(req, page_count,
1176                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1177                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1178                         PTLRPC_BULK_PUT_SINK) |
1179                         PTLRPC_BULK_BUF_KIOV,
1180                 OST_BULK_PORTAL,
1181                 &ptlrpc_bulk_kiov_pin_ops);
1182
1183         if (desc == NULL)
1184                 GOTO(out, rc = -ENOMEM);
1185         /* NB request now owns desc and will free it when it gets freed */
1186
1187         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1188         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1189         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1190         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1191
1192         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1193
1194         obdo_to_ioobj(oa, ioobj);
1195         ioobj->ioo_bufcnt = niocount;
1196         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1197          * that might be send for this request.  The actual number is decided
1198          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1199          * "max - 1" for old client compatibility sending "0", and also so the
1200          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1201         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1202         LASSERT(page_count > 0);
1203         pg_prev = pga[0];
1204         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1205                 struct brw_page *pg = pga[i];
1206                 int poff = pg->off & ~PAGE_MASK;
1207
1208                 LASSERT(pg->count > 0);
1209                 /* make sure there is no gap in the middle of page array */
1210                 LASSERTF(page_count == 1 ||
1211                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1212                           ergo(i > 0 && i < page_count - 1,
1213                                poff == 0 && pg->count == PAGE_SIZE)   &&
1214                           ergo(i == page_count - 1, poff == 0)),
1215                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1216                          i, page_count, pg, pg->off, pg->count);
1217                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1218                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1219                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1220                          i, page_count,
1221                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1222                          pg_prev->pg, page_private(pg_prev->pg),
1223                          pg_prev->pg->index, pg_prev->off);
1224                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1225                         (pg->flag & OBD_BRW_SRVLOCK));
1226
1227                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1228                 requested_nob += pg->count;
1229
1230                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1231                         niobuf--;
1232                         niobuf->rnb_len += pg->count;
1233                 } else {
1234                         niobuf->rnb_offset = pg->off;
1235                         niobuf->rnb_len    = pg->count;
1236                         niobuf->rnb_flags  = pg->flag;
1237                 }
1238                 pg_prev = pg;
1239         }
1240
1241         LASSERTF((void *)(niobuf - niocount) ==
1242                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1243                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1244                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1245
1246         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1247         if (resend) {
1248                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1249                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1250                         body->oa.o_flags = 0;
1251                 }
1252                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1253         }
1254
1255         if (osc_should_shrink_grant(cli))
1256                 osc_shrink_grant_local(cli, &body->oa);
1257
1258         /* size[REQ_REC_OFF] still sizeof (*body) */
1259         if (opc == OST_WRITE) {
1260                 if (cli->cl_checksum &&
1261                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1262                         /* store cl_cksum_type in a local variable since
1263                          * it can be changed via lprocfs */
1264                         cksum_type_t cksum_type = cli->cl_cksum_type;
1265
1266                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1267                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1268                                 body->oa.o_flags = 0;
1269                         }
1270                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1271                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1272                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1273                                                              page_count, pga,
1274                                                              OST_WRITE,
1275                                                              cksum_type);
1276                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1277                                body->oa.o_cksum);
1278                         /* save this in 'oa', too, for later checking */
1279                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1280                         oa->o_flags |= cksum_type_pack(cksum_type);
1281                 } else {
1282                         /* clear out the checksum flag, in case this is a
1283                          * resend but cl_checksum is no longer set. b=11238 */
1284                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1285                 }
1286                 oa->o_cksum = body->oa.o_cksum;
1287                 /* 1 RC per niobuf */
1288                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1289                                      sizeof(__u32) * niocount);
1290         } else {
1291                 if (cli->cl_checksum &&
1292                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1293                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1294                                 body->oa.o_flags = 0;
1295                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1296                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1297                 }
1298         }
1299         ptlrpc_request_set_replen(req);
1300
1301         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1302         aa = ptlrpc_req_async_args(req);
1303         aa->aa_oa = oa;
1304         aa->aa_requested_nob = requested_nob;
1305         aa->aa_nio_count = niocount;
1306         aa->aa_page_count = page_count;
1307         aa->aa_resends = 0;
1308         aa->aa_ppga = pga;
1309         aa->aa_cli = cli;
1310         INIT_LIST_HEAD(&aa->aa_oaps);
1311
1312         *reqp = req;
1313         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1314         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1315                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1316                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1317         RETURN(0);
1318
1319  out:
1320         ptlrpc_req_finished(req);
1321         RETURN(rc);
1322 }
1323
1324 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1325                                 __u32 client_cksum, __u32 server_cksum, int nob,
1326                                 size_t page_count, struct brw_page **pga,
1327                                 cksum_type_t client_cksum_type)
1328 {
1329         __u32 new_cksum;
1330         char *msg;
1331         cksum_type_t cksum_type;
1332
1333         if (server_cksum == client_cksum) {
1334                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1335                 return 0;
1336         }
1337
1338         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1339                                        oa->o_flags : 0);
1340         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1341                                       cksum_type);
1342
1343         if (cksum_type != client_cksum_type)
1344                 msg = "the server did not use the checksum type specified in "
1345                       "the original request - likely a protocol problem";
1346         else if (new_cksum == server_cksum)
1347                 msg = "changed on the client after we checksummed it - "
1348                       "likely false positive due to mmap IO (bug 11742)";
1349         else if (new_cksum == client_cksum)
1350                 msg = "changed in transit before arrival at OST";
1351         else
1352                 msg = "changed in transit AND doesn't match the original - "
1353                       "likely false positive due to mmap IO (bug 11742)";
1354
1355         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1356                            " object "DOSTID" extent [%llu-%llu]\n",
1357                            msg, libcfs_nid2str(peer->nid),
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1360                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1361                            POSTID(&oa->o_oi), pga[0]->off,
1362                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1363         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1364                "client csum now %x\n", client_cksum, client_cksum_type,
1365                server_cksum, cksum_type, new_cksum);
1366         return 1;
1367 }
1368
1369 /* Note rc enters this function as number of bytes transferred */
1370 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 {
1372         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1373         const lnet_process_id_t *peer =
1374                         &req->rq_import->imp_connection->c_peer;
1375         struct client_obd *cli = aa->aa_cli;
1376         struct ost_body *body;
1377         u32 client_cksum = 0;
1378         ENTRY;
1379
1380         if (rc < 0 && rc != -EDQUOT) {
1381                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1382                 RETURN(rc);
1383         }
1384
1385         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1386         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1387         if (body == NULL) {
1388                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1389                 RETURN(-EPROTO);
1390         }
1391
1392         /* set/clear over quota flag for a uid/gid */
1393         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1394             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1395                 unsigned int qid[LL_MAXQUOTAS] =
1396                                         {body->oa.o_uid, body->oa.o_gid};
1397
1398                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1399                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1400                        body->oa.o_flags);
1401                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1402         }
1403
1404         osc_update_grant(cli, body);
1405
1406         if (rc < 0)
1407                 RETURN(rc);
1408
1409         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411
1412         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1413                 if (rc > 0) {
1414                         CERROR("Unexpected +ve rc %d\n", rc);
1415                         RETURN(-EPROTO);
1416                 }
1417                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1418
1419                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1420                         RETURN(-EAGAIN);
1421
1422                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1423                     check_write_checksum(&body->oa, peer, client_cksum,
1424                                          body->oa.o_cksum, aa->aa_requested_nob,
1425                                          aa->aa_page_count, aa->aa_ppga,
1426                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1427                         RETURN(-EAGAIN);
1428
1429                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430                                      aa->aa_page_count, aa->aa_ppga);
1431                 GOTO(out, rc);
1432         }
1433
1434         /* The rest of this function executes only for OST_READs */
1435
1436         /* if unwrap_bulk failed, return -EAGAIN to retry */
1437         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1438         if (rc < 0)
1439                 GOTO(out, rc = -EAGAIN);
1440
1441         if (rc > aa->aa_requested_nob) {
1442                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1443                        aa->aa_requested_nob);
1444                 RETURN(-EPROTO);
1445         }
1446
1447         if (rc != req->rq_bulk->bd_nob_transferred) {
1448                 CERROR ("Unexpected rc %d (%d transferred)\n",
1449                         rc, req->rq_bulk->bd_nob_transferred);
1450                 return (-EPROTO);
1451         }
1452
1453         if (rc < aa->aa_requested_nob)
1454                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1455
1456         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1457                 static int cksum_counter;
1458                 u32        server_cksum = body->oa.o_cksum;
1459                 char      *via = "";
1460                 char      *router = "";
1461                 cksum_type_t cksum_type;
1462
1463                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1464                                                body->oa.o_flags : 0);
1465                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1466                                                  aa->aa_ppga, OST_READ,
1467                                                  cksum_type);
1468
1469                 if (peer->nid != req->rq_bulk->bd_sender) {
1470                         via = " via ";
1471                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1472                 }
1473
1474                 if (server_cksum != client_cksum) {
1475                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1476                                            "%s%s%s inode "DFID" object "DOSTID
1477                                            " extent [%llu-%llu]\n",
1478                                            req->rq_import->imp_obd->obd_name,
1479                                            libcfs_nid2str(peer->nid),
1480                                            via, router,
1481                                            body->oa.o_valid & OBD_MD_FLFID ?
1482                                                 body->oa.o_parent_seq : (__u64)0,
1483                                            body->oa.o_valid & OBD_MD_FLFID ?
1484                                                 body->oa.o_parent_oid : 0,
1485                                            body->oa.o_valid & OBD_MD_FLFID ?
1486                                                 body->oa.o_parent_ver : 0,
1487                                            POSTID(&body->oa.o_oi),
1488                                            aa->aa_ppga[0]->off,
1489                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1490                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1491                                                                         1);
1492                         CERROR("client %x, server %x, cksum_type %x\n",
1493                                client_cksum, server_cksum, cksum_type);
1494                         cksum_counter = 0;
1495                         aa->aa_oa->o_cksum = client_cksum;
1496                         rc = -EAGAIN;
1497                 } else {
1498                         cksum_counter++;
1499                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1500                         rc = 0;
1501                 }
1502         } else if (unlikely(client_cksum)) {
1503                 static int cksum_missed;
1504
1505                 cksum_missed++;
1506                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1507                         CERROR("Checksum %u requested from %s but not sent\n",
1508                                cksum_missed, libcfs_nid2str(peer->nid));
1509         } else {
1510                 rc = 0;
1511         }
1512 out:
1513         if (rc >= 0)
1514                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1515                                      aa->aa_oa, &body->oa);
1516
1517         RETURN(rc);
1518 }
1519
1520 static int osc_brw_redo_request(struct ptlrpc_request *request,
1521                                 struct osc_brw_async_args *aa, int rc)
1522 {
1523         struct ptlrpc_request *new_req;
1524         struct osc_brw_async_args *new_aa;
1525         struct osc_async_page *oap;
1526         ENTRY;
1527
1528         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1529                   "redo for recoverable error %d", rc);
1530
1531         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1532                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1533                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1534                                   aa->aa_ppga, &new_req, 1);
1535         if (rc)
1536                 RETURN(rc);
1537
1538         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1539                 if (oap->oap_request != NULL) {
1540                         LASSERTF(request == oap->oap_request,
1541                                  "request %p != oap_request %p\n",
1542                                  request, oap->oap_request);
1543                         if (oap->oap_interrupted) {
1544                                 ptlrpc_req_finished(new_req);
1545                                 RETURN(-EINTR);
1546                         }
1547                 }
1548         }
1549         /* New request takes over pga and oaps from old request.
1550          * Note that copying a list_head doesn't work, need to move it... */
1551         aa->aa_resends++;
1552         new_req->rq_interpret_reply = request->rq_interpret_reply;
1553         new_req->rq_async_args = request->rq_async_args;
1554         new_req->rq_commit_cb = request->rq_commit_cb;
1555         /* cap resend delay to the current request timeout, this is similar to
1556          * what ptlrpc does (see after_reply()) */
1557         if (aa->aa_resends > new_req->rq_timeout)
1558                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1559         else
1560                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1561         new_req->rq_generation_set = 1;
1562         new_req->rq_import_generation = request->rq_import_generation;
1563
1564         new_aa = ptlrpc_req_async_args(new_req);
1565
1566         INIT_LIST_HEAD(&new_aa->aa_oaps);
1567         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1568         INIT_LIST_HEAD(&new_aa->aa_exts);
1569         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1570         new_aa->aa_resends = aa->aa_resends;
1571
1572         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1573                 if (oap->oap_request) {
1574                         ptlrpc_req_finished(oap->oap_request);
1575                         oap->oap_request = ptlrpc_request_addref(new_req);
1576                 }
1577         }
1578
1579         /* XXX: This code will run into problem if we're going to support
1580          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1581          * and wait for all of them to be finished. We should inherit request
1582          * set from old request. */
1583         ptlrpcd_add_req(new_req);
1584
1585         DEBUG_REQ(D_INFO, new_req, "new request");
1586         RETURN(0);
1587 }
1588
1589 /*
1590  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1591  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1592  * fine for our small page arrays and doesn't require allocation.  its an
1593  * insertion sort that swaps elements that are strides apart, shrinking the
1594  * stride down until its '1' and the array is sorted.
1595  */
1596 static void sort_brw_pages(struct brw_page **array, int num)
1597 {
1598         int stride, i, j;
1599         struct brw_page *tmp;
1600
1601         if (num == 1)
1602                 return;
1603         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1604                 ;
1605
1606         do {
1607                 stride /= 3;
1608                 for (i = stride ; i < num ; i++) {
1609                         tmp = array[i];
1610                         j = i;
1611                         while (j >= stride && array[j - stride]->off > tmp->off) {
1612                                 array[j] = array[j - stride];
1613                                 j -= stride;
1614                         }
1615                         array[j] = tmp;
1616                 }
1617         } while (stride > 1);
1618 }
1619
1620 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1621 {
1622         LASSERT(ppga != NULL);
1623         OBD_FREE(ppga, sizeof(*ppga) * count);
1624 }
1625
1626 static int brw_interpret(const struct lu_env *env,
1627                          struct ptlrpc_request *req, void *data, int rc)
1628 {
1629         struct osc_brw_async_args *aa = data;
1630         struct osc_extent *ext;
1631         struct osc_extent *tmp;
1632         struct client_obd *cli = aa->aa_cli;
1633         ENTRY;
1634
1635         rc = osc_brw_fini_request(req, rc);
1636         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1637         /* When server return -EINPROGRESS, client should always retry
1638          * regardless of the number of times the bulk was resent already. */
1639         if (osc_recoverable_error(rc)) {
1640                 if (req->rq_import_generation !=
1641                     req->rq_import->imp_generation) {
1642                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1643                                ""DOSTID", rc = %d.\n",
1644                                req->rq_import->imp_obd->obd_name,
1645                                POSTID(&aa->aa_oa->o_oi), rc);
1646                 } else if (rc == -EINPROGRESS ||
1647                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1648                         rc = osc_brw_redo_request(req, aa, rc);
1649                 } else {
1650                         CERROR("%s: too many resent retries for object: "
1651                                "%llu:%llu, rc = %d.\n",
1652                                req->rq_import->imp_obd->obd_name,
1653                                POSTID(&aa->aa_oa->o_oi), rc);
1654                 }
1655
1656                 if (rc == 0)
1657                         RETURN(0);
1658                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1659                         rc = -EIO;
1660         }
1661
1662         if (rc == 0) {
1663                 struct obdo *oa = aa->aa_oa;
1664                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1665                 unsigned long valid = 0;
1666                 struct cl_object *obj;
1667                 struct osc_async_page *last;
1668
1669                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1670                 obj = osc2cl(last->oap_obj);
1671
1672                 cl_object_attr_lock(obj);
1673                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1674                         attr->cat_blocks = oa->o_blocks;
1675                         valid |= CAT_BLOCKS;
1676                 }
1677                 if (oa->o_valid & OBD_MD_FLMTIME) {
1678                         attr->cat_mtime = oa->o_mtime;
1679                         valid |= CAT_MTIME;
1680                 }
1681                 if (oa->o_valid & OBD_MD_FLATIME) {
1682                         attr->cat_atime = oa->o_atime;
1683                         valid |= CAT_ATIME;
1684                 }
1685                 if (oa->o_valid & OBD_MD_FLCTIME) {
1686                         attr->cat_ctime = oa->o_ctime;
1687                         valid |= CAT_CTIME;
1688                 }
1689
1690                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1691                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1692                         loff_t last_off = last->oap_count + last->oap_obj_off +
1693                                 last->oap_page_off;
1694
1695                         /* Change file size if this is an out of quota or
1696                          * direct IO write and it extends the file size */
1697                         if (loi->loi_lvb.lvb_size < last_off) {
1698                                 attr->cat_size = last_off;
1699                                 valid |= CAT_SIZE;
1700                         }
1701                         /* Extend KMS if it's not a lockless write */
1702                         if (loi->loi_kms < last_off &&
1703                             oap2osc_page(last)->ops_srvlock == 0) {
1704                                 attr->cat_kms = last_off;
1705                                 valid |= CAT_KMS;
1706                         }
1707                 }
1708
1709                 if (valid != 0)
1710                         cl_object_attr_update(env, obj, attr, valid);
1711                 cl_object_attr_unlock(obj);
1712         }
1713         OBDO_FREE(aa->aa_oa);
1714
1715         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1716                 osc_inc_unstable_pages(req);
1717
1718         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1719                 list_del_init(&ext->oe_link);
1720                 osc_extent_finish(env, ext, 1, rc);
1721         }
1722         LASSERT(list_empty(&aa->aa_exts));
1723         LASSERT(list_empty(&aa->aa_oaps));
1724
1725         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1726         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1727
1728         spin_lock(&cli->cl_loi_list_lock);
1729         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1730          * is called so we know whether to go to sync BRWs or wait for more
1731          * RPCs to complete */
1732         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1733                 cli->cl_w_in_flight--;
1734         else
1735                 cli->cl_r_in_flight--;
1736         osc_wake_cache_waiters(cli);
1737         spin_unlock(&cli->cl_loi_list_lock);
1738
1739         osc_io_unplug(env, cli, NULL);
1740         RETURN(rc);
1741 }
1742
1743 static void brw_commit(struct ptlrpc_request *req)
1744 {
1745         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1746          * this called via the rq_commit_cb, I need to ensure
1747          * osc_dec_unstable_pages is still called. Otherwise unstable
1748          * pages may be leaked. */
1749         spin_lock(&req->rq_lock);
1750         if (likely(req->rq_unstable)) {
1751                 req->rq_unstable = 0;
1752                 spin_unlock(&req->rq_lock);
1753
1754                 osc_dec_unstable_pages(req);
1755         } else {
1756                 req->rq_committed = 1;
1757                 spin_unlock(&req->rq_lock);
1758         }
1759 }
1760
1761 /**
1762  * Build an RPC by the list of extent @ext_list. The caller must ensure
1763  * that the total pages in this list are NOT over max pages per RPC.
1764  * Extents in the list must be in OES_RPC state.
1765  */
1766 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1767                   struct list_head *ext_list, int cmd)
1768 {
1769         struct ptlrpc_request           *req = NULL;
1770         struct osc_extent               *ext;
1771         struct brw_page                 **pga = NULL;
1772         struct osc_brw_async_args       *aa = NULL;
1773         struct obdo                     *oa = NULL;
1774         struct osc_async_page           *oap;
1775         struct osc_object               *obj = NULL;
1776         struct cl_req_attr              *crattr = NULL;
1777         loff_t                          starting_offset = OBD_OBJECT_EOF;
1778         loff_t                          ending_offset = 0;
1779         int                             mpflag = 0;
1780         int                             mem_tight = 0;
1781         int                             page_count = 0;
1782         bool                            soft_sync = false;
1783         bool                            interrupted = false;
1784         int                             i;
1785         int                             grant = 0;
1786         int                             rc;
1787         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1788         struct ost_body                 *body;
1789         ENTRY;
1790         LASSERT(!list_empty(ext_list));
1791
1792         /* add pages into rpc_list to build BRW rpc */
1793         list_for_each_entry(ext, ext_list, oe_link) {
1794                 LASSERT(ext->oe_state == OES_RPC);
1795                 mem_tight |= ext->oe_memalloc;
1796                 grant += ext->oe_grants;
1797                 page_count += ext->oe_nr_pages;
1798                 if (obj == NULL)
1799                         obj = ext->oe_obj;
1800         }
1801
1802         soft_sync = osc_over_unstable_soft_limit(cli);
1803         if (mem_tight)
1804                 mpflag = cfs_memory_pressure_get_and_set();
1805
1806         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1807         if (pga == NULL)
1808                 GOTO(out, rc = -ENOMEM);
1809
1810         OBDO_ALLOC(oa);
1811         if (oa == NULL)
1812                 GOTO(out, rc = -ENOMEM);
1813
1814         i = 0;
1815         list_for_each_entry(ext, ext_list, oe_link) {
1816                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1817                         if (mem_tight)
1818                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1819                         if (soft_sync)
1820                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1821                         pga[i] = &oap->oap_brw_page;
1822                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1823                         i++;
1824
1825                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1826                         if (starting_offset == OBD_OBJECT_EOF ||
1827                             starting_offset > oap->oap_obj_off)
1828                                 starting_offset = oap->oap_obj_off;
1829                         else
1830                                 LASSERT(oap->oap_page_off == 0);
1831                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1832                                 ending_offset = oap->oap_obj_off +
1833                                                 oap->oap_count;
1834                         else
1835                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1836                                         PAGE_SIZE);
1837                         if (oap->oap_interrupted)
1838                                 interrupted = true;
1839                 }
1840         }
1841
1842         /* first page in the list */
1843         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1844
1845         crattr = &osc_env_info(env)->oti_req_attr;
1846         memset(crattr, 0, sizeof(*crattr));
1847         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1848         crattr->cra_flags = ~0ULL;
1849         crattr->cra_page = oap2cl_page(oap);
1850         crattr->cra_oa = oa;
1851         cl_req_attr_set(env, osc2cl(obj), crattr);
1852
1853         if (cmd == OBD_BRW_WRITE)
1854                 oa->o_grant_used = grant;
1855
1856         sort_brw_pages(pga, page_count);
1857         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1858         if (rc != 0) {
1859                 CERROR("prep_req failed: %d\n", rc);
1860                 GOTO(out, rc);
1861         }
1862
1863         req->rq_commit_cb = brw_commit;
1864         req->rq_interpret_reply = brw_interpret;
1865         req->rq_memalloc = mem_tight != 0;
1866         oap->oap_request = ptlrpc_request_addref(req);
1867         if (interrupted && !req->rq_intr)
1868                 ptlrpc_mark_interrupted(req);
1869
1870         /* Need to update the timestamps after the request is built in case
1871          * we race with setattr (locally or in queue at OST).  If OST gets
1872          * later setattr before earlier BRW (as determined by the request xid),
1873          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1874          * way to do this in a single call.  bug 10150 */
1875         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1876         crattr->cra_oa = &body->oa;
1877         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1878         cl_req_attr_set(env, osc2cl(obj), crattr);
1879         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1880
1881         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1882         aa = ptlrpc_req_async_args(req);
1883         INIT_LIST_HEAD(&aa->aa_oaps);
1884         list_splice_init(&rpc_list, &aa->aa_oaps);
1885         INIT_LIST_HEAD(&aa->aa_exts);
1886         list_splice_init(ext_list, &aa->aa_exts);
1887
1888         spin_lock(&cli->cl_loi_list_lock);
1889         starting_offset >>= PAGE_SHIFT;
1890         if (cmd == OBD_BRW_READ) {
1891                 cli->cl_r_in_flight++;
1892                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1893                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1894                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1895                                       starting_offset + 1);
1896         } else {
1897                 cli->cl_w_in_flight++;
1898                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1899                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1900                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1901                                       starting_offset + 1);
1902         }
1903         spin_unlock(&cli->cl_loi_list_lock);
1904
1905         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1906                   page_count, aa, cli->cl_r_in_flight,
1907                   cli->cl_w_in_flight);
1908         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1909
1910         ptlrpcd_add_req(req);
1911         rc = 0;
1912         EXIT;
1913
1914 out:
1915         if (mem_tight != 0)
1916                 cfs_memory_pressure_restore(mpflag);
1917
1918         if (rc != 0) {
1919                 LASSERT(req == NULL);
1920
1921                 if (oa)
1922                         OBDO_FREE(oa);
1923                 if (pga)
1924                         OBD_FREE(pga, sizeof(*pga) * page_count);
1925                 /* this should happen rarely and is pretty bad, it makes the
1926                  * pending list not follow the dirty order */
1927                 while (!list_empty(ext_list)) {
1928                         ext = list_entry(ext_list->next, struct osc_extent,
1929                                          oe_link);
1930                         list_del_init(&ext->oe_link);
1931                         osc_extent_finish(env, ext, 0, rc);
1932                 }
1933         }
1934         RETURN(rc);
1935 }
1936
1937 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1938 {
1939         int set = 0;
1940
1941         LASSERT(lock != NULL);
1942
1943         lock_res_and_lock(lock);
1944
1945         if (lock->l_ast_data == NULL)
1946                 lock->l_ast_data = data;
1947         if (lock->l_ast_data == data)
1948                 set = 1;
1949
1950         unlock_res_and_lock(lock);
1951
1952         return set;
1953 }
1954
1955 static int osc_enqueue_fini(struct ptlrpc_request *req,
1956                             osc_enqueue_upcall_f upcall, void *cookie,
1957                             struct lustre_handle *lockh, enum ldlm_mode mode,
1958                             __u64 *flags, int agl, int errcode)
1959 {
1960         bool intent = *flags & LDLM_FL_HAS_INTENT;
1961         int rc;
1962         ENTRY;
1963
1964         /* The request was created before ldlm_cli_enqueue call. */
1965         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1966                 struct ldlm_reply *rep;
1967
1968                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1969                 LASSERT(rep != NULL);
1970
1971                 rep->lock_policy_res1 =
1972                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1973                 if (rep->lock_policy_res1)
1974                         errcode = rep->lock_policy_res1;
1975                 if (!agl)
1976                         *flags |= LDLM_FL_LVB_READY;
1977         } else if (errcode == ELDLM_OK) {
1978                 *flags |= LDLM_FL_LVB_READY;
1979         }
1980
1981         /* Call the update callback. */
1982         rc = (*upcall)(cookie, lockh, errcode);
1983
1984         /* release the reference taken in ldlm_cli_enqueue() */
1985         if (errcode == ELDLM_LOCK_MATCHED)
1986                 errcode = ELDLM_OK;
1987         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1988                 ldlm_lock_decref(lockh, mode);
1989
1990         RETURN(rc);
1991 }
1992
1993 static int osc_enqueue_interpret(const struct lu_env *env,
1994                                  struct ptlrpc_request *req,
1995                                  struct osc_enqueue_args *aa, int rc)
1996 {
1997         struct ldlm_lock *lock;
1998         struct lustre_handle *lockh = &aa->oa_lockh;
1999         enum ldlm_mode mode = aa->oa_mode;
2000         struct ost_lvb *lvb = aa->oa_lvb;
2001         __u32 lvb_len = sizeof(*lvb);
2002         __u64 flags = 0;
2003
2004         ENTRY;
2005
2006         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2007          * be valid. */
2008         lock = ldlm_handle2lock(lockh);
2009         LASSERTF(lock != NULL,
2010                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2011                  lockh->cookie, req, aa);
2012
2013         /* Take an additional reference so that a blocking AST that
2014          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2015          * to arrive after an upcall has been executed by
2016          * osc_enqueue_fini(). */
2017         ldlm_lock_addref(lockh, mode);
2018
2019         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2020         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2021
2022         /* Let CP AST to grant the lock first. */
2023         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2024
2025         if (aa->oa_agl) {
2026                 LASSERT(aa->oa_lvb == NULL);
2027                 LASSERT(aa->oa_flags == NULL);
2028                 aa->oa_flags = &flags;
2029         }
2030
2031         /* Complete obtaining the lock procedure. */
2032         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2033                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2034                                    lockh, rc);
2035         /* Complete osc stuff. */
2036         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2037                               aa->oa_flags, aa->oa_agl, rc);
2038
2039         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2040
2041         ldlm_lock_decref(lockh, mode);
2042         LDLM_LOCK_PUT(lock);
2043         RETURN(rc);
2044 }
2045
2046 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2047
2048 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2049  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2050  * other synchronous requests, however keeping some locks and trying to obtain
2051  * others may take a considerable amount of time in a case of ost failure; and
2052  * when other sync requests do not get released lock from a client, the client
2053  * is evicted from the cluster -- such scenarious make the life difficult, so
2054  * release locks just after they are obtained. */
2055 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2056                      __u64 *flags, union ldlm_policy_data *policy,
2057                      struct ost_lvb *lvb, int kms_valid,
2058                      osc_enqueue_upcall_f upcall, void *cookie,
2059                      struct ldlm_enqueue_info *einfo,
2060                      struct ptlrpc_request_set *rqset, int async, int agl)
2061 {
2062         struct obd_device *obd = exp->exp_obd;
2063         struct lustre_handle lockh = { 0 };
2064         struct ptlrpc_request *req = NULL;
2065         int intent = *flags & LDLM_FL_HAS_INTENT;
2066         __u64 match_flags = *flags;
2067         enum ldlm_mode mode;
2068         int rc;
2069         ENTRY;
2070
2071         /* Filesystem lock extents are extended to page boundaries so that
2072          * dealing with the page cache is a little smoother.  */
2073         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2074         policy->l_extent.end |= ~PAGE_MASK;
2075
2076         /*
2077          * kms is not valid when either object is completely fresh (so that no
2078          * locks are cached), or object was evicted. In the latter case cached
2079          * lock cannot be used, because it would prime inode state with
2080          * potentially stale LVB.
2081          */
2082         if (!kms_valid)
2083                 goto no_match;
2084
2085         /* Next, search for already existing extent locks that will cover us */
2086         /* If we're trying to read, we also search for an existing PW lock.  The
2087          * VFS and page cache already protect us locally, so lots of readers/
2088          * writers can share a single PW lock.
2089          *
2090          * There are problems with conversion deadlocks, so instead of
2091          * converting a read lock to a write lock, we'll just enqueue a new
2092          * one.
2093          *
2094          * At some point we should cancel the read lock instead of making them
2095          * send us a blocking callback, but there are problems with canceling
2096          * locks out from other users right now, too. */
2097         mode = einfo->ei_mode;
2098         if (einfo->ei_mode == LCK_PR)
2099                 mode |= LCK_PW;
2100         if (agl == 0)
2101                 match_flags |= LDLM_FL_LVB_READY;
2102         if (intent != 0)
2103                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2104         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2105                                einfo->ei_type, policy, mode, &lockh, 0);
2106         if (mode) {
2107                 struct ldlm_lock *matched;
2108
2109                 if (*flags & LDLM_FL_TEST_LOCK)
2110                         RETURN(ELDLM_OK);
2111
2112                 matched = ldlm_handle2lock(&lockh);
2113                 if (agl) {
2114                         /* AGL enqueues DLM locks speculatively. Therefore if
2115                          * it already exists a DLM lock, it wll just inform the
2116                          * caller to cancel the AGL process for this stripe. */
2117                         ldlm_lock_decref(&lockh, mode);
2118                         LDLM_LOCK_PUT(matched);
2119                         RETURN(-ECANCELED);
2120                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2121                         *flags |= LDLM_FL_LVB_READY;
2122
2123                         /* We already have a lock, and it's referenced. */
2124                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2125
2126                         ldlm_lock_decref(&lockh, mode);
2127                         LDLM_LOCK_PUT(matched);
2128                         RETURN(ELDLM_OK);
2129                 } else {
2130                         ldlm_lock_decref(&lockh, mode);
2131                         LDLM_LOCK_PUT(matched);
2132                 }
2133         }
2134
2135 no_match:
2136         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2137                 RETURN(-ENOLCK);
2138
2139         if (intent) {
2140                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2141                                            &RQF_LDLM_ENQUEUE_LVB);
2142                 if (req == NULL)
2143                         RETURN(-ENOMEM);
2144
2145                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2146                 if (rc) {
2147                         ptlrpc_request_free(req);
2148                         RETURN(rc);
2149                 }
2150
2151                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2152                                      sizeof *lvb);
2153                 ptlrpc_request_set_replen(req);
2154         }
2155
2156         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2157         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2158
2159         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2160                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2161         if (async) {
2162                 if (!rc) {
2163                         struct osc_enqueue_args *aa;
2164                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2165                         aa = ptlrpc_req_async_args(req);
2166                         aa->oa_exp    = exp;
2167                         aa->oa_mode   = einfo->ei_mode;
2168                         aa->oa_type   = einfo->ei_type;
2169                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2170                         aa->oa_upcall = upcall;
2171                         aa->oa_cookie = cookie;
2172                         aa->oa_agl    = !!agl;
2173                         if (!agl) {
2174                                 aa->oa_flags  = flags;
2175                                 aa->oa_lvb    = lvb;
2176                         } else {
2177                                 /* AGL is essentially to enqueue an DLM lock
2178                                  * in advance, so we don't care about the
2179                                  * result of AGL enqueue. */
2180                                 aa->oa_lvb    = NULL;
2181                                 aa->oa_flags  = NULL;
2182                         }
2183
2184                         req->rq_interpret_reply =
2185                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2186                         if (rqset == PTLRPCD_SET)
2187                                 ptlrpcd_add_req(req);
2188                         else
2189                                 ptlrpc_set_add_req(rqset, req);
2190                 } else if (intent) {
2191                         ptlrpc_req_finished(req);
2192                 }
2193                 RETURN(rc);
2194         }
2195
2196         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2197                               flags, agl, rc);
2198         if (intent)
2199                 ptlrpc_req_finished(req);
2200
2201         RETURN(rc);
2202 }
2203
2204 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2205                    enum ldlm_type type, union ldlm_policy_data *policy,
2206                    enum ldlm_mode mode, __u64 *flags, void *data,
2207                    struct lustre_handle *lockh, int unref)
2208 {
2209         struct obd_device *obd = exp->exp_obd;
2210         __u64 lflags = *flags;
2211         enum ldlm_mode rc;
2212         ENTRY;
2213
2214         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2215                 RETURN(-EIO);
2216
2217         /* Filesystem lock extents are extended to page boundaries so that
2218          * dealing with the page cache is a little smoother */
2219         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2220         policy->l_extent.end |= ~PAGE_MASK;
2221
2222         /* Next, search for already existing extent locks that will cover us */
2223         /* If we're trying to read, we also search for an existing PW lock.  The
2224          * VFS and page cache already protect us locally, so lots of readers/
2225          * writers can share a single PW lock. */
2226         rc = mode;
2227         if (mode == LCK_PR)
2228                 rc |= LCK_PW;
2229         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2230                              res_id, type, policy, rc, lockh, unref);
2231         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2232                 RETURN(rc);
2233
2234         if (data != NULL) {
2235                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2236
2237                 LASSERT(lock != NULL);
2238                 if (!osc_set_lock_data(lock, data)) {
2239                         ldlm_lock_decref(lockh, rc);
2240                         rc = 0;
2241                 }
2242                 LDLM_LOCK_PUT(lock);
2243         }
2244         RETURN(rc);
2245 }
2246
2247 static int osc_statfs_interpret(const struct lu_env *env,
2248                                 struct ptlrpc_request *req,
2249                                 struct osc_async_args *aa, int rc)
2250 {
2251         struct obd_statfs *msfs;
2252         ENTRY;
2253
2254         if (rc == -EBADR)
2255                 /* The request has in fact never been sent
2256                  * due to issues at a higher level (LOV).
2257                  * Exit immediately since the caller is
2258                  * aware of the problem and takes care
2259                  * of the clean up */
2260                  RETURN(rc);
2261
2262         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2263             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2264                 GOTO(out, rc = 0);
2265
2266         if (rc != 0)
2267                 GOTO(out, rc);
2268
2269         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2270         if (msfs == NULL) {
2271                 GOTO(out, rc = -EPROTO);
2272         }
2273
2274         *aa->aa_oi->oi_osfs = *msfs;
2275 out:
2276         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2277         RETURN(rc);
2278 }
2279
2280 static int osc_statfs_async(struct obd_export *exp,
2281                             struct obd_info *oinfo, __u64 max_age,
2282                             struct ptlrpc_request_set *rqset)
2283 {
2284         struct obd_device     *obd = class_exp2obd(exp);
2285         struct ptlrpc_request *req;
2286         struct osc_async_args *aa;
2287         int                    rc;
2288         ENTRY;
2289
2290         /* We could possibly pass max_age in the request (as an absolute
2291          * timestamp or a "seconds.usec ago") so the target can avoid doing
2292          * extra calls into the filesystem if that isn't necessary (e.g.
2293          * during mount that would help a bit).  Having relative timestamps
2294          * is not so great if request processing is slow, while absolute
2295          * timestamps are not ideal because they need time synchronization. */
2296         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2297         if (req == NULL)
2298                 RETURN(-ENOMEM);
2299
2300         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2301         if (rc) {
2302                 ptlrpc_request_free(req);
2303                 RETURN(rc);
2304         }
2305         ptlrpc_request_set_replen(req);
2306         req->rq_request_portal = OST_CREATE_PORTAL;
2307         ptlrpc_at_set_req_timeout(req);
2308
2309         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2310                 /* procfs requests not want stat in wait for avoid deadlock */
2311                 req->rq_no_resend = 1;
2312                 req->rq_no_delay = 1;
2313         }
2314
2315         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2316         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2317         aa = ptlrpc_req_async_args(req);
2318         aa->aa_oi = oinfo;
2319
2320         ptlrpc_set_add_req(rqset, req);
2321         RETURN(0);
2322 }
2323
2324 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2325                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2326 {
2327         struct obd_device     *obd = class_exp2obd(exp);
2328         struct obd_statfs     *msfs;
2329         struct ptlrpc_request *req;
2330         struct obd_import     *imp = NULL;
2331         int rc;
2332         ENTRY;
2333
2334         /*Since the request might also come from lprocfs, so we need
2335          *sync this with client_disconnect_export Bug15684*/
2336         down_read(&obd->u.cli.cl_sem);
2337         if (obd->u.cli.cl_import)
2338                 imp = class_import_get(obd->u.cli.cl_import);
2339         up_read(&obd->u.cli.cl_sem);
2340         if (!imp)
2341                 RETURN(-ENODEV);
2342
2343         /* We could possibly pass max_age in the request (as an absolute
2344          * timestamp or a "seconds.usec ago") so the target can avoid doing
2345          * extra calls into the filesystem if that isn't necessary (e.g.
2346          * during mount that would help a bit).  Having relative timestamps
2347          * is not so great if request processing is slow, while absolute
2348          * timestamps are not ideal because they need time synchronization. */
2349         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2350
2351         class_import_put(imp);
2352
2353         if (req == NULL)
2354                 RETURN(-ENOMEM);
2355
2356         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2357         if (rc) {
2358                 ptlrpc_request_free(req);
2359                 RETURN(rc);
2360         }
2361         ptlrpc_request_set_replen(req);
2362         req->rq_request_portal = OST_CREATE_PORTAL;
2363         ptlrpc_at_set_req_timeout(req);
2364
2365         if (flags & OBD_STATFS_NODELAY) {
2366                 /* procfs requests not want stat in wait for avoid deadlock */
2367                 req->rq_no_resend = 1;
2368                 req->rq_no_delay = 1;
2369         }
2370
2371         rc = ptlrpc_queue_wait(req);
2372         if (rc)
2373                 GOTO(out, rc);
2374
2375         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2376         if (msfs == NULL) {
2377                 GOTO(out, rc = -EPROTO);
2378         }
2379
2380         *osfs = *msfs;
2381
2382         EXIT;
2383  out:
2384         ptlrpc_req_finished(req);
2385         return rc;
2386 }
2387
2388 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2389                          void *karg, void __user *uarg)
2390 {
2391         struct obd_device *obd = exp->exp_obd;
2392         struct obd_ioctl_data *data = karg;
2393         int err = 0;
2394         ENTRY;
2395
2396         if (!try_module_get(THIS_MODULE)) {
2397                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2398                        module_name(THIS_MODULE));
2399                 return -EINVAL;
2400         }
2401         switch (cmd) {
2402         case OBD_IOC_CLIENT_RECOVER:
2403                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2404                                             data->ioc_inlbuf1, 0);
2405                 if (err > 0)
2406                         err = 0;
2407                 GOTO(out, err);
2408         case IOC_OSC_SET_ACTIVE:
2409                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2410                                                data->ioc_offset);
2411                 GOTO(out, err);
2412         case OBD_IOC_PING_TARGET:
2413                 err = ptlrpc_obd_ping(obd);
2414                 GOTO(out, err);
2415         default:
2416                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2417                        cmd, current_comm());
2418                 GOTO(out, err = -ENOTTY);
2419         }
2420 out:
2421         module_put(THIS_MODULE);
2422         return err;
2423 }
2424
2425 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2426                               u32 keylen, void *key,
2427                               u32 vallen, void *val,
2428                               struct ptlrpc_request_set *set)
2429 {
2430         struct ptlrpc_request *req;
2431         struct obd_device     *obd = exp->exp_obd;
2432         struct obd_import     *imp = class_exp2cliimp(exp);
2433         char                  *tmp;
2434         int                    rc;
2435         ENTRY;
2436
2437         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2438
2439         if (KEY_IS(KEY_CHECKSUM)) {
2440                 if (vallen != sizeof(int))
2441                         RETURN(-EINVAL);
2442                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2443                 RETURN(0);
2444         }
2445
2446         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2447                 sptlrpc_conf_client_adapt(obd);
2448                 RETURN(0);
2449         }
2450
2451         if (KEY_IS(KEY_FLUSH_CTX)) {
2452                 sptlrpc_import_flush_my_ctx(imp);
2453                 RETURN(0);
2454         }
2455
2456         if (KEY_IS(KEY_CACHE_SET)) {
2457                 struct client_obd *cli = &obd->u.cli;
2458
2459                 LASSERT(cli->cl_cache == NULL); /* only once */
2460                 cli->cl_cache = (struct cl_client_cache *)val;
2461                 cl_cache_incref(cli->cl_cache);
2462                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2463
2464                 /* add this osc into entity list */
2465                 LASSERT(list_empty(&cli->cl_lru_osc));
2466                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2467                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2468                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2469
2470                 RETURN(0);
2471         }
2472
2473         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2474                 struct client_obd *cli = &obd->u.cli;
2475                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2476                 long target = *(long *)val;
2477
2478                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2479                 *(long *)val -= nr;
2480                 RETURN(0);
2481         }
2482
2483         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2484                 RETURN(-EINVAL);
2485
2486         /* We pass all other commands directly to OST. Since nobody calls osc
2487            methods directly and everybody is supposed to go through LOV, we
2488            assume lov checked invalid values for us.
2489            The only recognised values so far are evict_by_nid and mds_conn.
2490            Even if something bad goes through, we'd get a -EINVAL from OST
2491            anyway. */
2492
2493         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2494                                                 &RQF_OST_SET_GRANT_INFO :
2495                                                 &RQF_OBD_SET_INFO);
2496         if (req == NULL)
2497                 RETURN(-ENOMEM);
2498
2499         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2500                              RCL_CLIENT, keylen);
2501         if (!KEY_IS(KEY_GRANT_SHRINK))
2502                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2503                                      RCL_CLIENT, vallen);
2504         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2505         if (rc) {
2506                 ptlrpc_request_free(req);
2507                 RETURN(rc);
2508         }
2509
2510         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2511         memcpy(tmp, key, keylen);
2512         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2513                                                         &RMF_OST_BODY :
2514                                                         &RMF_SETINFO_VAL);
2515         memcpy(tmp, val, vallen);
2516
2517         if (KEY_IS(KEY_GRANT_SHRINK)) {
2518                 struct osc_grant_args *aa;
2519                 struct obdo *oa;
2520
2521                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2522                 aa = ptlrpc_req_async_args(req);
2523                 OBDO_ALLOC(oa);
2524                 if (!oa) {
2525                         ptlrpc_req_finished(req);
2526                         RETURN(-ENOMEM);
2527                 }
2528                 *oa = ((struct ost_body *)val)->oa;
2529                 aa->aa_oa = oa;
2530                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2531         }
2532
2533         ptlrpc_request_set_replen(req);
2534         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2535                 LASSERT(set != NULL);
2536                 ptlrpc_set_add_req(set, req);
2537                 ptlrpc_check_set(NULL, set);
2538         } else {
2539                 ptlrpcd_add_req(req);
2540         }
2541
2542         RETURN(0);
2543 }
2544
2545 static int osc_reconnect(const struct lu_env *env,
2546                          struct obd_export *exp, struct obd_device *obd,
2547                          struct obd_uuid *cluuid,
2548                          struct obd_connect_data *data,
2549                          void *localdata)
2550 {
2551         struct client_obd *cli = &obd->u.cli;
2552
2553         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2554                 long lost_grant;
2555                 long grant;
2556
2557                 spin_lock(&cli->cl_loi_list_lock);
2558                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2559                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2560                         grant += cli->cl_dirty_grant;
2561                 else
2562                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2563                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2564                 lost_grant = cli->cl_lost_grant;
2565                 cli->cl_lost_grant = 0;
2566                 spin_unlock(&cli->cl_loi_list_lock);
2567
2568                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2569                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2570                        data->ocd_version, data->ocd_grant, lost_grant);
2571         }
2572
2573         RETURN(0);
2574 }
2575
2576 static int osc_disconnect(struct obd_export *exp)
2577 {
2578         struct obd_device *obd = class_exp2obd(exp);
2579         int rc;
2580
2581         rc = client_disconnect_export(exp);
2582         /**
2583          * Initially we put del_shrink_grant before disconnect_export, but it
2584          * causes the following problem if setup (connect) and cleanup
2585          * (disconnect) are tangled together.
2586          *      connect p1                     disconnect p2
2587          *   ptlrpc_connect_import
2588          *     ...............               class_manual_cleanup
2589          *                                     osc_disconnect
2590          *                                     del_shrink_grant
2591          *   ptlrpc_connect_interrupt
2592          *     init_grant_shrink
2593          *   add this client to shrink list
2594          *                                      cleanup_osc
2595          * Bang! pinger trigger the shrink.
2596          * So the osc should be disconnected from the shrink list, after we
2597          * are sure the import has been destroyed. BUG18662
2598          */
2599         if (obd->u.cli.cl_import == NULL)
2600                 osc_del_shrink_grant(&obd->u.cli);
2601         return rc;
2602 }
2603
2604 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2605         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2606 {
2607         struct lu_env *env = arg;
2608         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2609         struct ldlm_lock *lock;
2610         struct osc_object *osc = NULL;
2611         ENTRY;
2612
2613         lock_res(res);
2614         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2615                 if (lock->l_ast_data != NULL && osc == NULL) {
2616                         osc = lock->l_ast_data;
2617                         cl_object_get(osc2cl(osc));
2618                 }
2619
2620                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2621                  * by the 2nd round of ldlm_namespace_clean() call in
2622                  * osc_import_event(). */
2623                 ldlm_clear_cleaned(lock);
2624         }
2625         unlock_res(res);
2626
2627         if (osc != NULL) {
2628                 osc_object_invalidate(env, osc);
2629                 cl_object_put(env, osc2cl(osc));
2630         }
2631
2632         RETURN(0);
2633 }
2634
2635 static int osc_import_event(struct obd_device *obd,
2636                             struct obd_import *imp,
2637                             enum obd_import_event event)
2638 {
2639         struct client_obd *cli;
2640         int rc = 0;
2641
2642         ENTRY;
2643         LASSERT(imp->imp_obd == obd);
2644
2645         switch (event) {
2646         case IMP_EVENT_DISCON: {
2647                 cli = &obd->u.cli;
2648                 spin_lock(&cli->cl_loi_list_lock);
2649                 cli->cl_avail_grant = 0;
2650                 cli->cl_lost_grant = 0;
2651                 spin_unlock(&cli->cl_loi_list_lock);
2652                 break;
2653         }
2654         case IMP_EVENT_INACTIVE: {
2655                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2656                 break;
2657         }
2658         case IMP_EVENT_INVALIDATE: {
2659                 struct ldlm_namespace *ns = obd->obd_namespace;
2660                 struct lu_env         *env;
2661                 __u16                  refcheck;
2662
2663                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2664
2665                 env = cl_env_get(&refcheck);
2666                 if (!IS_ERR(env)) {
2667                         osc_io_unplug(env, &obd->u.cli, NULL);
2668
2669                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2670                                                  osc_ldlm_resource_invalidate,
2671                                                  env, 0);
2672                         cl_env_put(env, &refcheck);
2673
2674                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2675                 } else
2676                         rc = PTR_ERR(env);
2677                 break;
2678         }
2679         case IMP_EVENT_ACTIVE: {
2680                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2681                 break;
2682         }
2683         case IMP_EVENT_OCD: {
2684                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2685
2686                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2687                         osc_init_grant(&obd->u.cli, ocd);
2688
2689                 /* See bug 7198 */
2690                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2691                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2692
2693                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2694                 break;
2695         }
2696         case IMP_EVENT_DEACTIVATE: {
2697                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2698                 break;
2699         }
2700         case IMP_EVENT_ACTIVATE: {
2701                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2702                 break;
2703         }
2704         default:
2705                 CERROR("Unknown import event %d\n", event);
2706                 LBUG();
2707         }
2708         RETURN(rc);
2709 }
2710
2711 /**
2712  * Determine whether the lock can be canceled before replaying the lock
2713  * during recovery, see bug16774 for detailed information.
2714  *
2715  * \retval zero the lock can't be canceled
2716  * \retval other ok to cancel
2717  */
2718 static int osc_cancel_weight(struct ldlm_lock *lock)
2719 {
2720         /*
2721          * Cancel all unused and granted extent lock.
2722          */
2723         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2724             lock->l_granted_mode == lock->l_req_mode &&
2725             osc_ldlm_weigh_ast(lock) == 0)
2726                 RETURN(1);
2727
2728         RETURN(0);
2729 }
2730
2731 static int brw_queue_work(const struct lu_env *env, void *data)
2732 {
2733         struct client_obd *cli = data;
2734
2735         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2736
2737         osc_io_unplug(env, cli, NULL);
2738         RETURN(0);
2739 }
2740
2741 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2742 {
2743         struct client_obd *cli = &obd->u.cli;
2744         struct obd_type   *type;
2745         void              *handler;
2746         int                rc;
2747         int                adding;
2748         int                added;
2749         int                req_count;
2750         ENTRY;
2751
2752         rc = ptlrpcd_addref();
2753         if (rc)
2754                 RETURN(rc);
2755
2756         rc = client_obd_setup(obd, lcfg);
2757         if (rc)
2758                 GOTO(out_ptlrpcd, rc);
2759
2760         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2761         if (IS_ERR(handler))
2762                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2763         cli->cl_writeback_work = handler;
2764
2765         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2766         if (IS_ERR(handler))
2767                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2768         cli->cl_lru_work = handler;
2769
2770         rc = osc_quota_setup(obd);
2771         if (rc)
2772                 GOTO(out_ptlrpcd_work, rc);
2773
2774         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2775
2776 #ifdef CONFIG_PROC_FS
2777         obd->obd_vars = lprocfs_osc_obd_vars;
2778 #endif
2779         /* If this is true then both client (osc) and server (osp) are on the
2780          * same node. The osp layer if loaded first will register the osc proc
2781          * directory. In that case this obd_device will be attached its proc
2782          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2783         type = class_search_type(LUSTRE_OSP_NAME);
2784         if (type && type->typ_procsym) {
2785                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2786                                                        type->typ_procsym,
2787                                                        obd->obd_vars, obd);
2788                 if (IS_ERR(obd->obd_proc_entry)) {
2789                         rc = PTR_ERR(obd->obd_proc_entry);
2790                         CERROR("error %d setting up lprocfs for %s\n", rc,
2791                                obd->obd_name);
2792                         obd->obd_proc_entry = NULL;
2793                 }
2794         } else {
2795                 rc = lprocfs_obd_setup(obd);
2796         }
2797
2798         /* If the basic OSC proc tree construction succeeded then
2799          * lets do the rest. */
2800         if (rc == 0) {
2801                 lproc_osc_attach_seqstat(obd);
2802                 sptlrpc_lprocfs_cliobd_attach(obd);
2803                 ptlrpc_lprocfs_register_obd(obd);
2804         }
2805
2806         /*
2807          * We try to control the total number of requests with a upper limit
2808          * osc_reqpool_maxreqcount. There might be some race which will cause
2809          * over-limit allocation, but it is fine.
2810          */
2811         req_count = atomic_read(&osc_pool_req_count);
2812         if (req_count < osc_reqpool_maxreqcount) {
2813                 adding = cli->cl_max_rpcs_in_flight + 2;
2814                 if (req_count + adding > osc_reqpool_maxreqcount)
2815                         adding = osc_reqpool_maxreqcount - req_count;
2816
2817                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2818                 atomic_add(added, &osc_pool_req_count);
2819         }
2820
2821         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2822         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2823
2824         spin_lock(&osc_shrink_lock);
2825         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2826         spin_unlock(&osc_shrink_lock);
2827
2828         RETURN(0);
2829
2830 out_ptlrpcd_work:
2831         if (cli->cl_writeback_work != NULL) {
2832                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2833                 cli->cl_writeback_work = NULL;
2834         }
2835         if (cli->cl_lru_work != NULL) {
2836                 ptlrpcd_destroy_work(cli->cl_lru_work);
2837                 cli->cl_lru_work = NULL;
2838         }
2839 out_client_setup:
2840         client_obd_cleanup(obd);
2841 out_ptlrpcd:
2842         ptlrpcd_decref();
2843         RETURN(rc);
2844 }
2845
2846 static int osc_precleanup(struct obd_device *obd)
2847 {
2848         struct client_obd *cli = &obd->u.cli;
2849         ENTRY;
2850
2851         /* LU-464
2852          * for echo client, export may be on zombie list, wait for
2853          * zombie thread to cull it, because cli.cl_import will be
2854          * cleared in client_disconnect_export():
2855          *   class_export_destroy() -> obd_cleanup() ->
2856          *   echo_device_free() -> echo_client_cleanup() ->
2857          *   obd_disconnect() -> osc_disconnect() ->
2858          *   client_disconnect_export()
2859          */
2860         obd_zombie_barrier();
2861         if (cli->cl_writeback_work) {
2862                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2863                 cli->cl_writeback_work = NULL;
2864         }
2865
2866         if (cli->cl_lru_work) {
2867                 ptlrpcd_destroy_work(cli->cl_lru_work);
2868                 cli->cl_lru_work = NULL;
2869         }
2870
2871         obd_cleanup_client_import(obd);
2872         ptlrpc_lprocfs_unregister_obd(obd);
2873         lprocfs_obd_cleanup(obd);
2874         RETURN(0);
2875 }
2876
2877 int osc_cleanup(struct obd_device *obd)
2878 {
2879         struct client_obd *cli = &obd->u.cli;
2880         int rc;
2881
2882         ENTRY;
2883
2884         spin_lock(&osc_shrink_lock);
2885         list_del(&cli->cl_shrink_list);
2886         spin_unlock(&osc_shrink_lock);
2887
2888         /* lru cleanup */
2889         if (cli->cl_cache != NULL) {
2890                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2891                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2892                 list_del_init(&cli->cl_lru_osc);
2893                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2894                 cli->cl_lru_left = NULL;
2895                 cl_cache_decref(cli->cl_cache);
2896                 cli->cl_cache = NULL;
2897         }
2898
2899         /* free memory of osc quota cache */
2900         osc_quota_cleanup(obd);
2901
2902         rc = client_obd_cleanup(obd);
2903
2904         ptlrpcd_decref();
2905         RETURN(rc);
2906 }
2907
2908 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2909 {
2910         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2911         return rc > 0 ? 0: rc;
2912 }
2913
2914 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2915 {
2916         return osc_process_config_base(obd, buf);
2917 }
2918
2919 static struct obd_ops osc_obd_ops = {
2920         .o_owner                = THIS_MODULE,
2921         .o_setup                = osc_setup,
2922         .o_precleanup           = osc_precleanup,
2923         .o_cleanup              = osc_cleanup,
2924         .o_add_conn             = client_import_add_conn,
2925         .o_del_conn             = client_import_del_conn,
2926         .o_connect              = client_connect_import,
2927         .o_reconnect            = osc_reconnect,
2928         .o_disconnect           = osc_disconnect,
2929         .o_statfs               = osc_statfs,
2930         .o_statfs_async         = osc_statfs_async,
2931         .o_create               = osc_create,
2932         .o_destroy              = osc_destroy,
2933         .o_getattr              = osc_getattr,
2934         .o_setattr              = osc_setattr,
2935         .o_iocontrol            = osc_iocontrol,
2936         .o_set_info_async       = osc_set_info_async,
2937         .o_import_event         = osc_import_event,
2938         .o_process_config       = osc_process_config,
2939         .o_quotactl             = osc_quotactl,
2940 };
2941
2942 static struct shrinker *osc_cache_shrinker;
2943 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2944 DEFINE_SPINLOCK(osc_shrink_lock);
2945
2946 #ifndef HAVE_SHRINKER_COUNT
2947 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2948 {
2949         struct shrink_control scv = {
2950                 .nr_to_scan = shrink_param(sc, nr_to_scan),
2951                 .gfp_mask   = shrink_param(sc, gfp_mask)
2952         };
2953 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2954         struct shrinker *shrinker = NULL;
2955 #endif
2956
2957         (void)osc_cache_shrink_scan(shrinker, &scv);
2958
2959         return osc_cache_shrink_count(shrinker, &scv);
2960 }
2961 #endif
2962
2963 static int __init osc_init(void)
2964 {
2965         bool enable_proc = true;
2966         struct obd_type *type;
2967         unsigned int reqpool_size;
2968         unsigned int reqsize;
2969         int rc;
2970         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2971                          osc_cache_shrink_count, osc_cache_shrink_scan);
2972         ENTRY;
2973
2974         /* print an address of _any_ initialized kernel symbol from this
2975          * module, to allow debugging with gdb that doesn't support data
2976          * symbols from modules.*/
2977         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2978
2979         rc = lu_kmem_init(osc_caches);
2980         if (rc)
2981                 RETURN(rc);
2982
2983         type = class_search_type(LUSTRE_OSP_NAME);
2984         if (type != NULL && type->typ_procsym != NULL)
2985                 enable_proc = false;
2986
2987         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2988                                  LUSTRE_OSC_NAME, &osc_device_type);
2989         if (rc)
2990                 GOTO(out_kmem, rc);
2991
2992         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2993
2994         /* This is obviously too much memory, only prevent overflow here */
2995         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2996                 GOTO(out_type, rc = -EINVAL);
2997
2998         reqpool_size = osc_reqpool_mem_max << 20;
2999
3000         reqsize = 1;
3001         while (reqsize < OST_IO_MAXREQSIZE)
3002                 reqsize = reqsize << 1;
3003
3004         /*
3005          * We don't enlarge the request count in OSC pool according to
3006          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3007          * tried after normal allocation failed. So a small OSC pool won't
3008          * cause much performance degression in most of cases.
3009          */
3010         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3011
3012         atomic_set(&osc_pool_req_count, 0);
3013         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3014                                           ptlrpc_add_rqs_to_pool);
3015
3016         if (osc_rq_pool != NULL)
3017                 GOTO(out, rc);
3018         rc = -ENOMEM;
3019 out_type:
3020         class_unregister_type(LUSTRE_OSC_NAME);
3021 out_kmem:
3022         lu_kmem_fini(osc_caches);
3023 out:
3024         RETURN(rc);
3025 }
3026
3027 static void __exit osc_exit(void)
3028 {
3029         remove_shrinker(osc_cache_shrinker);
3030         class_unregister_type(LUSTRE_OSC_NAME);
3031         lu_kmem_fini(osc_caches);
3032         ptlrpc_free_rq_pool(osc_rq_pool);
3033 }
3034
3035 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3036 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3037 MODULE_VERSION(LUSTRE_VERSION_STRING);
3038 MODULE_LICENSE("GPL");
3039
3040 module_init(osc_init);
3041 module_exit(osc_exit);