Whamcloud - gitweb
LU-3285 mds: add IO locking to the MDC and MDT
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 struct osc_brw_async_args {
62         struct obdo              *aa_oa;
63         int                       aa_requested_nob;
64         int                       aa_nio_count;
65         u32                       aa_page_count;
66         int                       aa_resends;
67         struct brw_page **aa_ppga;
68         struct client_obd        *aa_cli;
69         struct list_head          aa_oaps;
70         struct list_head          aa_exts;
71 };
72
73 #define osc_grant_args osc_brw_async_args
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct osc_object       *fa_obj;
83         struct obdo             *fa_oa;
84         obd_enqueue_update_f    fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_ladvise_args {
89         struct obdo             *la_oa;
90         obd_enqueue_update_f     la_upcall;
91         void                    *la_cookie;
92 };
93
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
96                          void *data, int rc);
97
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
99 {
100         struct ost_body *body;
101
102         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
103         LASSERT(body);
104
105         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
106 }
107
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
109                        struct obdo *oa)
110 {
111         struct ptlrpc_request   *req;
112         struct ost_body         *body;
113         int                      rc;
114
115         ENTRY;
116         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
117         if (req == NULL)
118                 RETURN(-ENOMEM);
119
120         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
121         if (rc) {
122                 ptlrpc_request_free(req);
123                 RETURN(rc);
124         }
125
126         osc_pack_req_body(req, oa);
127
128         ptlrpc_request_set_replen(req);
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc)
132                 GOTO(out, rc);
133
134         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
135         if (body == NULL)
136                 GOTO(out, rc = -EPROTO);
137
138         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
140
141         oa->o_blksize = cli_brw_size(exp->exp_obd);
142         oa->o_valid |= OBD_MD_FLBLKSZ;
143
144         EXIT;
145 out:
146         ptlrpc_req_finished(req);
147
148         return rc;
149 }
150
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
152                        struct obdo *oa)
153 {
154         struct ptlrpc_request   *req;
155         struct ost_body         *body;
156         int                      rc;
157
158         ENTRY;
159         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
160
161         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
162         if (req == NULL)
163                 RETURN(-ENOMEM);
164
165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
166         if (rc) {
167                 ptlrpc_request_free(req);
168                 RETURN(rc);
169         }
170
171         osc_pack_req_body(req, oa);
172
173         ptlrpc_request_set_replen(req);
174
175         rc = ptlrpc_queue_wait(req);
176         if (rc)
177                 GOTO(out, rc);
178
179         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
180         if (body == NULL)
181                 GOTO(out, rc = -EPROTO);
182
183         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
184
185         EXIT;
186 out:
187         ptlrpc_req_finished(req);
188
189         RETURN(rc);
190 }
191
192 static int osc_setattr_interpret(const struct lu_env *env,
193                                  struct ptlrpc_request *req,
194                                  struct osc_setattr_args *sa, int rc)
195 {
196         struct ost_body *body;
197         ENTRY;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body == NULL)
204                 GOTO(out, rc = -EPROTO);
205
206         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
207                              &body->oa);
208 out:
209         rc = sa->sa_upcall(sa->sa_cookie, rc);
210         RETURN(rc);
211 }
212
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214                       obd_enqueue_update_f upcall, void *cookie,
215                       struct ptlrpc_request_set *rqset)
216 {
217         struct ptlrpc_request   *req;
218         struct osc_setattr_args *sa;
219         int                      rc;
220
221         ENTRY;
222
223         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
224         if (req == NULL)
225                 RETURN(-ENOMEM);
226
227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228         if (rc) {
229                 ptlrpc_request_free(req);
230                 RETURN(rc);
231         }
232
233         osc_pack_req_body(req, oa);
234
235         ptlrpc_request_set_replen(req);
236
237         /* do mds to ost setattr asynchronously */
238         if (!rqset) {
239                 /* Do not wait for response. */
240                 ptlrpcd_add_req(req);
241         } else {
242                 req->rq_interpret_reply =
243                         (ptlrpc_interpterer_t)osc_setattr_interpret;
244
245                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246                 sa = ptlrpc_req_async_args(req);
247                 sa->sa_oa = oa;
248                 sa->sa_upcall = upcall;
249                 sa->sa_cookie = cookie;
250
251                 if (rqset == PTLRPCD_SET)
252                         ptlrpcd_add_req(req);
253                 else
254                         ptlrpc_set_add_req(rqset, req);
255         }
256
257         RETURN(0);
258 }
259
260 static int osc_ladvise_interpret(const struct lu_env *env,
261                                  struct ptlrpc_request *req,
262                                  void *arg, int rc)
263 {
264         struct osc_ladvise_args *la = arg;
265         struct ost_body *body;
266         ENTRY;
267
268         if (rc != 0)
269                 GOTO(out, rc);
270
271         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
272         if (body == NULL)
273                 GOTO(out, rc = -EPROTO);
274
275         *la->la_oa = body->oa;
276 out:
277         rc = la->la_upcall(la->la_cookie, rc);
278         RETURN(rc);
279 }
280
281 /**
282  * If rqset is NULL, do not wait for response. Upcall and cookie could also
283  * be NULL in this case
284  */
285 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
286                      struct ladvise_hdr *ladvise_hdr,
287                      obd_enqueue_update_f upcall, void *cookie,
288                      struct ptlrpc_request_set *rqset)
289 {
290         struct ptlrpc_request   *req;
291         struct ost_body         *body;
292         struct osc_ladvise_args *la;
293         int                      rc;
294         struct lu_ladvise       *req_ladvise;
295         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
296         int                      num_advise = ladvise_hdr->lah_count;
297         struct ladvise_hdr      *req_ladvise_hdr;
298         ENTRY;
299
300         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
301         if (req == NULL)
302                 RETURN(-ENOMEM);
303
304         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
305                              num_advise * sizeof(*ladvise));
306         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
307         if (rc != 0) {
308                 ptlrpc_request_free(req);
309                 RETURN(rc);
310         }
311         req->rq_request_portal = OST_IO_PORTAL;
312         ptlrpc_at_set_req_timeout(req);
313
314         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
315         LASSERT(body);
316         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
317                              oa);
318
319         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
320                                                  &RMF_OST_LADVISE_HDR);
321         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
322
323         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
324         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
325         ptlrpc_request_set_replen(req);
326
327         if (rqset == NULL) {
328                 /* Do not wait for response. */
329                 ptlrpcd_add_req(req);
330                 RETURN(0);
331         }
332
333         req->rq_interpret_reply = osc_ladvise_interpret;
334         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
335         la = ptlrpc_req_async_args(req);
336         la->la_oa = oa;
337         la->la_upcall = upcall;
338         la->la_cookie = cookie;
339
340         if (rqset == PTLRPCD_SET)
341                 ptlrpcd_add_req(req);
342         else
343                 ptlrpc_set_add_req(rqset, req);
344
345         RETURN(0);
346 }
347
348 static int osc_create(const struct lu_env *env, struct obd_export *exp,
349                       struct obdo *oa)
350 {
351         struct ptlrpc_request *req;
352         struct ost_body       *body;
353         int                    rc;
354         ENTRY;
355
356         LASSERT(oa != NULL);
357         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
358         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
359
360         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
361         if (req == NULL)
362                 GOTO(out, rc = -ENOMEM);
363
364         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
365         if (rc) {
366                 ptlrpc_request_free(req);
367                 GOTO(out, rc);
368         }
369
370         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
371         LASSERT(body);
372
373         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
374
375         ptlrpc_request_set_replen(req);
376
377         rc = ptlrpc_queue_wait(req);
378         if (rc)
379                 GOTO(out_req, rc);
380
381         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
382         if (body == NULL)
383                 GOTO(out_req, rc = -EPROTO);
384
385         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
386         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
387
388         oa->o_blksize = cli_brw_size(exp->exp_obd);
389         oa->o_valid |= OBD_MD_FLBLKSZ;
390
391         CDEBUG(D_HA, "transno: %lld\n",
392                lustre_msg_get_transno(req->rq_repmsg));
393 out_req:
394         ptlrpc_req_finished(req);
395 out:
396         RETURN(rc);
397 }
398
399 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
400                    obd_enqueue_update_f upcall, void *cookie)
401 {
402         struct ptlrpc_request *req;
403         struct osc_setattr_args *sa;
404         struct obd_import *imp = class_exp2cliimp(exp);
405         struct ost_body *body;
406         int rc;
407
408         ENTRY;
409
410         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
411         if (req == NULL)
412                 RETURN(-ENOMEM);
413
414         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
415         if (rc < 0) {
416                 ptlrpc_request_free(req);
417                 RETURN(rc);
418         }
419
420         osc_set_io_portal(req);
421
422         ptlrpc_at_set_req_timeout(req);
423
424         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
425
426         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
427
428         ptlrpc_request_set_replen(req);
429
430         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
431         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
432         sa = ptlrpc_req_async_args(req);
433         sa->sa_oa = oa;
434         sa->sa_upcall = upcall;
435         sa->sa_cookie = cookie;
436
437         ptlrpcd_add_req(req);
438
439         RETURN(0);
440 }
441 EXPORT_SYMBOL(osc_punch_send);
442
443 static int osc_sync_interpret(const struct lu_env *env,
444                               struct ptlrpc_request *req,
445                               void *arg, int rc)
446 {
447         struct osc_fsync_args   *fa = arg;
448         struct ost_body         *body;
449         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
450         unsigned long           valid = 0;
451         struct cl_object        *obj;
452         ENTRY;
453
454         if (rc != 0)
455                 GOTO(out, rc);
456
457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         if (body == NULL) {
459                 CERROR("can't unpack ost_body\n");
460                 GOTO(out, rc = -EPROTO);
461         }
462
463         *fa->fa_oa = body->oa;
464         obj = osc2cl(fa->fa_obj);
465
466         /* Update osc object's blocks attribute */
467         cl_object_attr_lock(obj);
468         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
469                 attr->cat_blocks = body->oa.o_blocks;
470                 valid |= CAT_BLOCKS;
471         }
472
473         if (valid != 0)
474                 cl_object_attr_update(env, obj, attr, valid);
475         cl_object_attr_unlock(obj);
476
477 out:
478         rc = fa->fa_upcall(fa->fa_cookie, rc);
479         RETURN(rc);
480 }
481
482 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
483                   obd_enqueue_update_f upcall, void *cookie,
484                   struct ptlrpc_request_set *rqset)
485 {
486         struct obd_export     *exp = osc_export(obj);
487         struct ptlrpc_request *req;
488         struct ost_body       *body;
489         struct osc_fsync_args *fa;
490         int                    rc;
491         ENTRY;
492
493         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
494         if (req == NULL)
495                 RETURN(-ENOMEM);
496
497         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
498         if (rc) {
499                 ptlrpc_request_free(req);
500                 RETURN(rc);
501         }
502
503         /* overload the size and blocks fields in the oa with start/end */
504         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
505         LASSERT(body);
506         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
507
508         ptlrpc_request_set_replen(req);
509         req->rq_interpret_reply = osc_sync_interpret;
510
511         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
512         fa = ptlrpc_req_async_args(req);
513         fa->fa_obj = obj;
514         fa->fa_oa = oa;
515         fa->fa_upcall = upcall;
516         fa->fa_cookie = cookie;
517
518         if (rqset == PTLRPCD_SET)
519                 ptlrpcd_add_req(req);
520         else
521                 ptlrpc_set_add_req(rqset, req);
522
523         RETURN (0);
524 }
525
526 /* Find and cancel locally locks matched by @mode in the resource found by
527  * @objid. Found locks are added into @cancel list. Returns the amount of
528  * locks added to @cancels list. */
529 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
530                                    struct list_head *cancels,
531                                    enum ldlm_mode mode, __u64 lock_flags)
532 {
533         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
534         struct ldlm_res_id res_id;
535         struct ldlm_resource *res;
536         int count;
537         ENTRY;
538
539         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
540          * export) but disabled through procfs (flag in NS).
541          *
542          * This distinguishes from a case when ELC is not supported originally,
543          * when we still want to cancel locks in advance and just cancel them
544          * locally, without sending any RPC. */
545         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
546                 RETURN(0);
547
548         ostid_build_res_name(&oa->o_oi, &res_id);
549         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
550         if (IS_ERR(res))
551                 RETURN(0);
552
553         LDLM_RESOURCE_ADDREF(res);
554         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
555                                            lock_flags, 0, NULL);
556         LDLM_RESOURCE_DELREF(res);
557         ldlm_resource_putref(res);
558         RETURN(count);
559 }
560
561 static int osc_destroy_interpret(const struct lu_env *env,
562                                  struct ptlrpc_request *req, void *data,
563                                  int rc)
564 {
565         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
566
567         atomic_dec(&cli->cl_destroy_in_flight);
568         wake_up(&cli->cl_destroy_waitq);
569         return 0;
570 }
571
572 static int osc_can_send_destroy(struct client_obd *cli)
573 {
574         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
575             cli->cl_max_rpcs_in_flight) {
576                 /* The destroy request can be sent */
577                 return 1;
578         }
579         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
580             cli->cl_max_rpcs_in_flight) {
581                 /*
582                  * The counter has been modified between the two atomic
583                  * operations.
584                  */
585                 wake_up(&cli->cl_destroy_waitq);
586         }
587         return 0;
588 }
589
590 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
591                        struct obdo *oa)
592 {
593         struct client_obd     *cli = &exp->exp_obd->u.cli;
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct list_head       cancels = LIST_HEAD_INIT(cancels);
597         int rc, count;
598         ENTRY;
599
600         if (!oa) {
601                 CDEBUG(D_INFO, "oa NULL\n");
602                 RETURN(-EINVAL);
603         }
604
605         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
606                                         LDLM_FL_DISCARD_DATA);
607
608         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
609         if (req == NULL) {
610                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
611                 RETURN(-ENOMEM);
612         }
613
614         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
615                                0, &cancels, count);
616         if (rc) {
617                 ptlrpc_request_free(req);
618                 RETURN(rc);
619         }
620
621         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
622         ptlrpc_at_set_req_timeout(req);
623
624         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
625         LASSERT(body);
626         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
627
628         ptlrpc_request_set_replen(req);
629
630         req->rq_interpret_reply = osc_destroy_interpret;
631         if (!osc_can_send_destroy(cli)) {
632                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
633
634                 /*
635                  * Wait until the number of on-going destroy RPCs drops
636                  * under max_rpc_in_flight
637                  */
638                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
639                                             osc_can_send_destroy(cli), &lwi);
640                 if (rc) {
641                         ptlrpc_req_finished(req);
642                         RETURN(rc);
643                 }
644         }
645
646         /* Do not wait for response */
647         ptlrpcd_add_req(req);
648         RETURN(0);
649 }
650
651 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
652                                 long writing_bytes)
653 {
654         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
655
656         LASSERT(!(oa->o_valid & bits));
657
658         oa->o_valid |= bits;
659         spin_lock(&cli->cl_loi_list_lock);
660         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
661                 oa->o_dirty = cli->cl_dirty_grant;
662         else
663                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
664         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
665                      cli->cl_dirty_max_pages)) {
666                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
667                        cli->cl_dirty_pages, cli->cl_dirty_transit,
668                        cli->cl_dirty_max_pages);
669                 oa->o_undirty = 0;
670         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
671                             atomic_long_read(&obd_dirty_transit_pages) >
672                             (long)(obd_max_dirty_pages + 1))) {
673                 /* The atomic_read() allowing the atomic_inc() are
674                  * not covered by a lock thus they may safely race and trip
675                  * this CERROR() unless we add in a small fudge factor (+1). */
676                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
677                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
678                        atomic_long_read(&obd_dirty_transit_pages),
679                        obd_max_dirty_pages);
680                 oa->o_undirty = 0;
681         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
682                             0x7fffffff)) {
683                 CERROR("dirty %lu - dirty_max %lu too big???\n",
684                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
685                 oa->o_undirty = 0;
686         } else {
687                 unsigned long nrpages;
688
689                 nrpages = cli->cl_max_pages_per_rpc;
690                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
691                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
692                 oa->o_undirty = nrpages << PAGE_SHIFT;
693                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
694                                  GRANT_PARAM)) {
695                         int nrextents;
696
697                         /* take extent tax into account when asking for more
698                          * grant space */
699                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
700                                      cli->cl_max_extent_pages;
701                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
702                 }
703         }
704         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
705         oa->o_dropped = cli->cl_lost_grant;
706         cli->cl_lost_grant = 0;
707         spin_unlock(&cli->cl_loi_list_lock);
708         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
709                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
710 }
711
712 void osc_update_next_shrink(struct client_obd *cli)
713 {
714         cli->cl_next_shrink_grant =
715                 cfs_time_shift(cli->cl_grant_shrink_interval);
716         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
717                cli->cl_next_shrink_grant);
718 }
719
720 static void __osc_update_grant(struct client_obd *cli, u64 grant)
721 {
722         spin_lock(&cli->cl_loi_list_lock);
723         cli->cl_avail_grant += grant;
724         spin_unlock(&cli->cl_loi_list_lock);
725 }
726
727 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
728 {
729         if (body->oa.o_valid & OBD_MD_FLGRANT) {
730                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
731                 __osc_update_grant(cli, body->oa.o_grant);
732         }
733 }
734
735 static int osc_shrink_grant_interpret(const struct lu_env *env,
736                                       struct ptlrpc_request *req,
737                                       void *aa, int rc)
738 {
739         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
740         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
741         struct ost_body *body;
742
743         if (rc != 0) {
744                 __osc_update_grant(cli, oa->o_grant);
745                 GOTO(out, rc);
746         }
747
748         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
749         LASSERT(body);
750         osc_update_grant(cli, body);
751 out:
752         OBDO_FREE(oa);
753         return rc;
754 }
755
756 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
757 {
758         spin_lock(&cli->cl_loi_list_lock);
759         oa->o_grant = cli->cl_avail_grant / 4;
760         cli->cl_avail_grant -= oa->o_grant;
761         spin_unlock(&cli->cl_loi_list_lock);
762         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
763                 oa->o_valid |= OBD_MD_FLFLAGS;
764                 oa->o_flags = 0;
765         }
766         oa->o_flags |= OBD_FL_SHRINK_GRANT;
767         osc_update_next_shrink(cli);
768 }
769
770 /* Shrink the current grant, either from some large amount to enough for a
771  * full set of in-flight RPCs, or if we have already shrunk to that limit
772  * then to enough for a single RPC.  This avoids keeping more grant than
773  * needed, and avoids shrinking the grant piecemeal. */
774 static int osc_shrink_grant(struct client_obd *cli)
775 {
776         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
777                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
778
779         spin_lock(&cli->cl_loi_list_lock);
780         if (cli->cl_avail_grant <= target_bytes)
781                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
782         spin_unlock(&cli->cl_loi_list_lock);
783
784         return osc_shrink_grant_to_target(cli, target_bytes);
785 }
786
787 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
788 {
789         int                     rc = 0;
790         struct ost_body        *body;
791         ENTRY;
792
793         spin_lock(&cli->cl_loi_list_lock);
794         /* Don't shrink if we are already above or below the desired limit
795          * We don't want to shrink below a single RPC, as that will negatively
796          * impact block allocation and long-term performance. */
797         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
798                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799
800         if (target_bytes >= cli->cl_avail_grant) {
801                 spin_unlock(&cli->cl_loi_list_lock);
802                 RETURN(0);
803         }
804         spin_unlock(&cli->cl_loi_list_lock);
805
806         OBD_ALLOC_PTR(body);
807         if (!body)
808                 RETURN(-ENOMEM);
809
810         osc_announce_cached(cli, &body->oa, 0);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
814         cli->cl_avail_grant = target_bytes;
815         spin_unlock(&cli->cl_loi_list_lock);
816         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
817                 body->oa.o_valid |= OBD_MD_FLFLAGS;
818                 body->oa.o_flags = 0;
819         }
820         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
821         osc_update_next_shrink(cli);
822
823         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
824                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
825                                 sizeof(*body), body, NULL);
826         if (rc != 0)
827                 __osc_update_grant(cli, body->oa.o_grant);
828         OBD_FREE_PTR(body);
829         RETURN(rc);
830 }
831
832 static int osc_should_shrink_grant(struct client_obd *client)
833 {
834         cfs_time_t time = cfs_time_current();
835         cfs_time_t next_shrink = client->cl_next_shrink_grant;
836
837         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
838              OBD_CONNECT_GRANT_SHRINK) == 0)
839                 return 0;
840
841         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
842                 /* Get the current RPC size directly, instead of going via:
843                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844                  * Keep comment here so that it can be found by searching. */
845                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
846
847                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848                     client->cl_avail_grant > brw_size)
849                         return 1;
850                 else
851                         osc_update_next_shrink(client);
852         }
853         return 0;
854 }
855
856 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
857 {
858         struct client_obd *client;
859
860         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
861                 if (osc_should_shrink_grant(client))
862                         osc_shrink_grant(client);
863         }
864         return 0;
865 }
866
867 static int osc_add_shrink_grant(struct client_obd *client)
868 {
869         int rc;
870
871         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
872                                        TIMEOUT_GRANT,
873                                        osc_grant_shrink_grant_cb, NULL,
874                                        &client->cl_grant_shrink_list);
875         if (rc) {
876                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
877                 return rc;
878         }
879         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
880         osc_update_next_shrink(client);
881         return 0;
882 }
883
884 static int osc_del_shrink_grant(struct client_obd *client)
885 {
886         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
887                                          TIMEOUT_GRANT);
888 }
889
890 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
891 {
892         /*
893          * ocd_grant is the total grant amount we're expect to hold: if we've
894          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
895          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
896          * dirty.
897          *
898          * race is tolerable here: if we're evicted, but imp_state already
899          * left EVICTED state, then cl_dirty_pages must be 0 already.
900          */
901         spin_lock(&cli->cl_loi_list_lock);
902         cli->cl_avail_grant = ocd->ocd_grant;
903         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
904                 cli->cl_avail_grant -= cli->cl_reserved_grant;
905                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
906                         cli->cl_avail_grant -= cli->cl_dirty_grant;
907                 else
908                         cli->cl_avail_grant -=
909                                         cli->cl_dirty_pages << PAGE_SHIFT;
910         }
911
912         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
913                 u64 size;
914                 int chunk_mask;
915
916                 /* overhead for each extent insertion */
917                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
918                 /* determine the appropriate chunk size used by osc_extent. */
919                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
920                                           ocd->ocd_grant_blkbits);
921                 /* max_pages_per_rpc must be chunk aligned */
922                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
923                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
924                                              ~chunk_mask) & chunk_mask;
925                 /* determine maximum extent size, in #pages */
926                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
927                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
928                 if (cli->cl_max_extent_pages == 0)
929                         cli->cl_max_extent_pages = 1;
930         } else {
931                 cli->cl_grant_extent_tax = 0;
932                 cli->cl_chunkbits = PAGE_SHIFT;
933                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
934         }
935         spin_unlock(&cli->cl_loi_list_lock);
936
937         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
938                 "chunk bits: %d cl_max_extent_pages: %d\n",
939                 cli_name(cli),
940                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
941                 cli->cl_max_extent_pages);
942
943         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
944             list_empty(&cli->cl_grant_shrink_list))
945                 osc_add_shrink_grant(cli);
946 }
947 EXPORT_SYMBOL(osc_init_grant);
948
949 /* We assume that the reason this OSC got a short read is because it read
950  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
951  * via the LOV, and it _knows_ it's reading inside the file, it's just that
952  * this stripe never got written at or beyond this stripe offset yet. */
953 static void handle_short_read(int nob_read, size_t page_count,
954                               struct brw_page **pga)
955 {
956         char *ptr;
957         int i = 0;
958
959         /* skip bytes read OK */
960         while (nob_read > 0) {
961                 LASSERT (page_count > 0);
962
963                 if (pga[i]->count > nob_read) {
964                         /* EOF inside this page */
965                         ptr = kmap(pga[i]->pg) +
966                                 (pga[i]->off & ~PAGE_MASK);
967                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
968                         kunmap(pga[i]->pg);
969                         page_count--;
970                         i++;
971                         break;
972                 }
973
974                 nob_read -= pga[i]->count;
975                 page_count--;
976                 i++;
977         }
978
979         /* zero remaining pages */
980         while (page_count-- > 0) {
981                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
982                 memset(ptr, 0, pga[i]->count);
983                 kunmap(pga[i]->pg);
984                 i++;
985         }
986 }
987
988 static int check_write_rcs(struct ptlrpc_request *req,
989                            int requested_nob, int niocount,
990                            size_t page_count, struct brw_page **pga)
991 {
992         int     i;
993         __u32   *remote_rcs;
994
995         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
996                                                   sizeof(*remote_rcs) *
997                                                   niocount);
998         if (remote_rcs == NULL) {
999                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1000                 return(-EPROTO);
1001         }
1002
1003         /* return error if any niobuf was in error */
1004         for (i = 0; i < niocount; i++) {
1005                 if ((int)remote_rcs[i] < 0)
1006                         return(remote_rcs[i]);
1007
1008                 if (remote_rcs[i] != 0) {
1009                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1010                                 i, remote_rcs[i], req);
1011                         return(-EPROTO);
1012                 }
1013         }
1014
1015         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1016                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1017                        req->rq_bulk->bd_nob_transferred, requested_nob);
1018                 return(-EPROTO);
1019         }
1020
1021         return (0);
1022 }
1023
1024 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1025 {
1026         if (p1->flag != p2->flag) {
1027                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1028                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1029                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1030
1031                 /* warn if we try to combine flags that we don't know to be
1032                  * safe to combine */
1033                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1034                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1035                               "report this at https://jira.hpdd.intel.com/\n",
1036                               p1->flag, p2->flag);
1037                 }
1038                 return 0;
1039         }
1040
1041         return (p1->off + p1->count == p2->off);
1042 }
1043
1044 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1045                              struct brw_page **pga, int opc,
1046                              enum cksum_types cksum_type)
1047 {
1048         u32                             cksum;
1049         int                             i = 0;
1050         struct cfs_crypto_hash_desc     *hdesc;
1051         unsigned int                    bufsize;
1052         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1053
1054         LASSERT(pg_count > 0);
1055
1056         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1057         if (IS_ERR(hdesc)) {
1058                 CERROR("Unable to initialize checksum hash %s\n",
1059                        cfs_crypto_hash_name(cfs_alg));
1060                 return PTR_ERR(hdesc);
1061         }
1062
1063         while (nob > 0 && pg_count > 0) {
1064                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1065
1066                 /* corrupt the data before we compute the checksum, to
1067                  * simulate an OST->client data error */
1068                 if (i == 0 && opc == OST_READ &&
1069                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1070                         unsigned char *ptr = kmap(pga[i]->pg);
1071                         int off = pga[i]->off & ~PAGE_MASK;
1072
1073                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1074                         kunmap(pga[i]->pg);
1075                 }
1076                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1077                                             pga[i]->off & ~PAGE_MASK,
1078                                             count);
1079                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1080                                (int)(pga[i]->off & ~PAGE_MASK));
1081
1082                 nob -= pga[i]->count;
1083                 pg_count--;
1084                 i++;
1085         }
1086
1087         bufsize = sizeof(cksum);
1088         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1089
1090         /* For sending we only compute the wrong checksum instead
1091          * of corrupting the data so it is still correct on a redo */
1092         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1093                 cksum++;
1094
1095         return cksum;
1096 }
1097
1098 static int
1099 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1100                      u32 page_count, struct brw_page **pga,
1101                      struct ptlrpc_request **reqp, int resend)
1102 {
1103         struct ptlrpc_request   *req;
1104         struct ptlrpc_bulk_desc *desc;
1105         struct ost_body         *body;
1106         struct obd_ioobj        *ioobj;
1107         struct niobuf_remote    *niobuf;
1108         int niocount, i, requested_nob, opc, rc;
1109         struct osc_brw_async_args *aa;
1110         struct req_capsule      *pill;
1111         struct brw_page *pg_prev;
1112
1113         ENTRY;
1114         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1115                 RETURN(-ENOMEM); /* Recoverable */
1116         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1117                 RETURN(-EINVAL); /* Fatal */
1118
1119         if ((cmd & OBD_BRW_WRITE) != 0) {
1120                 opc = OST_WRITE;
1121                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1122                                                 osc_rq_pool,
1123                                                 &RQF_OST_BRW_WRITE);
1124         } else {
1125                 opc = OST_READ;
1126                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1127         }
1128         if (req == NULL)
1129                 RETURN(-ENOMEM);
1130
1131         for (niocount = i = 1; i < page_count; i++) {
1132                 if (!can_merge_pages(pga[i - 1], pga[i]))
1133                         niocount++;
1134         }
1135
1136         pill = &req->rq_pill;
1137         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1138                              sizeof(*ioobj));
1139         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1140                              niocount * sizeof(*niobuf));
1141
1142         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1143         if (rc) {
1144                 ptlrpc_request_free(req);
1145                 RETURN(rc);
1146         }
1147         osc_set_io_portal(req);
1148
1149         ptlrpc_at_set_req_timeout(req);
1150         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1151          * retry logic */
1152         req->rq_no_retry_einprogress = 1;
1153
1154         desc = ptlrpc_prep_bulk_imp(req, page_count,
1155                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1156                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1157                         PTLRPC_BULK_PUT_SINK) |
1158                         PTLRPC_BULK_BUF_KIOV,
1159                 OST_BULK_PORTAL,
1160                 &ptlrpc_bulk_kiov_pin_ops);
1161
1162         if (desc == NULL)
1163                 GOTO(out, rc = -ENOMEM);
1164         /* NB request now owns desc and will free it when it gets freed */
1165
1166         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1167         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1168         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1169         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1170
1171         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1172
1173         obdo_to_ioobj(oa, ioobj);
1174         ioobj->ioo_bufcnt = niocount;
1175         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1176          * that might be send for this request.  The actual number is decided
1177          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1178          * "max - 1" for old client compatibility sending "0", and also so the
1179          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1180         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1181         LASSERT(page_count > 0);
1182         pg_prev = pga[0];
1183         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1184                 struct brw_page *pg = pga[i];
1185                 int poff = pg->off & ~PAGE_MASK;
1186
1187                 LASSERT(pg->count > 0);
1188                 /* make sure there is no gap in the middle of page array */
1189                 LASSERTF(page_count == 1 ||
1190                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1191                           ergo(i > 0 && i < page_count - 1,
1192                                poff == 0 && pg->count == PAGE_SIZE)   &&
1193                           ergo(i == page_count - 1, poff == 0)),
1194                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1195                          i, page_count, pg, pg->off, pg->count);
1196                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1197                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1198                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1199                          i, page_count,
1200                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1201                          pg_prev->pg, page_private(pg_prev->pg),
1202                          pg_prev->pg->index, pg_prev->off);
1203                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1204                         (pg->flag & OBD_BRW_SRVLOCK));
1205
1206                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1207                 requested_nob += pg->count;
1208
1209                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1210                         niobuf--;
1211                         niobuf->rnb_len += pg->count;
1212                 } else {
1213                         niobuf->rnb_offset = pg->off;
1214                         niobuf->rnb_len    = pg->count;
1215                         niobuf->rnb_flags  = pg->flag;
1216                 }
1217                 pg_prev = pg;
1218         }
1219
1220         LASSERTF((void *)(niobuf - niocount) ==
1221                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1222                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1223                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1224
1225         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1226         if (resend) {
1227                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1228                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1229                         body->oa.o_flags = 0;
1230                 }
1231                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1232         }
1233
1234         if (osc_should_shrink_grant(cli))
1235                 osc_shrink_grant_local(cli, &body->oa);
1236
1237         /* size[REQ_REC_OFF] still sizeof (*body) */
1238         if (opc == OST_WRITE) {
1239                 if (cli->cl_checksum &&
1240                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1241                         /* store cl_cksum_type in a local variable since
1242                          * it can be changed via lprocfs */
1243                         enum cksum_types cksum_type = cli->cl_cksum_type;
1244
1245                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1246                                 body->oa.o_flags = 0;
1247
1248                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1249                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1250                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1251                                                              page_count, pga,
1252                                                              OST_WRITE,
1253                                                              cksum_type);
1254                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1255                                body->oa.o_cksum);
1256                         /* save this in 'oa', too, for later checking */
1257                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1258                         oa->o_flags |= cksum_type_pack(cksum_type);
1259                 } else {
1260                         /* clear out the checksum flag, in case this is a
1261                          * resend but cl_checksum is no longer set. b=11238 */
1262                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1263                 }
1264                 oa->o_cksum = body->oa.o_cksum;
1265                 /* 1 RC per niobuf */
1266                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1267                                      sizeof(__u32) * niocount);
1268         } else {
1269                 if (cli->cl_checksum &&
1270                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1271                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1272                                 body->oa.o_flags = 0;
1273                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1274                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1275                 }
1276
1277                 /* Client cksum has been already copied to wire obdo in previous
1278                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1279                  * resent due to cksum error, this will allow Server to
1280                  * check+dump pages on its side */
1281         }
1282         ptlrpc_request_set_replen(req);
1283
1284         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1285         aa = ptlrpc_req_async_args(req);
1286         aa->aa_oa = oa;
1287         aa->aa_requested_nob = requested_nob;
1288         aa->aa_nio_count = niocount;
1289         aa->aa_page_count = page_count;
1290         aa->aa_resends = 0;
1291         aa->aa_ppga = pga;
1292         aa->aa_cli = cli;
1293         INIT_LIST_HEAD(&aa->aa_oaps);
1294
1295         *reqp = req;
1296         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1297         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1298                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1299                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1300         RETURN(0);
1301
1302  out:
1303         ptlrpc_req_finished(req);
1304         RETURN(rc);
1305 }
1306
1307 char dbgcksum_file_name[PATH_MAX];
1308
1309 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1310                                 struct brw_page **pga, __u32 server_cksum,
1311                                 __u32 client_cksum)
1312 {
1313         struct file *filp;
1314         int rc, i;
1315         unsigned int len;
1316         char *buf;
1317         mm_segment_t oldfs;
1318
1319         /* will only keep dump of pages on first error for the same range in
1320          * file/fid, not during the resends/retries. */
1321         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1322                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1323                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1324                   libcfs_debug_file_path_arr :
1325                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1326                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1327                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1328                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1329                  pga[0]->off,
1330                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1331                  client_cksum, server_cksum);
1332         filp = filp_open(dbgcksum_file_name,
1333                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1334         if (IS_ERR(filp)) {
1335                 rc = PTR_ERR(filp);
1336                 if (rc == -EEXIST)
1337                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1338                                "checksum error: rc = %d\n", dbgcksum_file_name,
1339                                rc);
1340                 else
1341                         CERROR("%s: can't open to dump pages with checksum "
1342                                "error: rc = %d\n", dbgcksum_file_name, rc);
1343                 return;
1344         }
1345
1346         oldfs = get_fs();
1347         set_fs(KERNEL_DS);
1348         for (i = 0; i < page_count; i++) {
1349                 len = pga[i]->count;
1350                 buf = kmap(pga[i]->pg);
1351                 while (len != 0) {
1352                         rc = vfs_write(filp, (__force const char __user *)buf,
1353                                        len, &filp->f_pos);
1354                         if (rc < 0) {
1355                                 CERROR("%s: wanted to write %u but got %d "
1356                                        "error\n", dbgcksum_file_name, len, rc);
1357                                 break;
1358                         }
1359                         len -= rc;
1360                         buf += rc;
1361                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1362                                dbgcksum_file_name, rc);
1363                 }
1364                 kunmap(pga[i]->pg);
1365         }
1366         set_fs(oldfs);
1367
1368         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1369         if (rc)
1370                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1371         filp_close(filp, NULL);
1372         return;
1373 }
1374
1375 static int
1376 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1377                                 __u32 client_cksum, __u32 server_cksum,
1378                                 struct osc_brw_async_args *aa)
1379 {
1380         __u32 new_cksum;
1381         char *msg;
1382         enum cksum_types cksum_type;
1383
1384         if (server_cksum == client_cksum) {
1385                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1386                 return 0;
1387         }
1388
1389         if (aa->aa_cli->cl_checksum_dump)
1390                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1391                                     server_cksum, client_cksum);
1392
1393         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1394                                        oa->o_flags : 0);
1395         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1396                                       aa->aa_ppga, OST_WRITE, cksum_type);
1397
1398         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1399                 msg = "the server did not use the checksum type specified in "
1400                       "the original request - likely a protocol problem";
1401         else if (new_cksum == server_cksum)
1402                 msg = "changed on the client after we checksummed it - "
1403                       "likely false positive due to mmap IO (bug 11742)";
1404         else if (new_cksum == client_cksum)
1405                 msg = "changed in transit before arrival at OST";
1406         else
1407                 msg = "changed in transit AND doesn't match the original - "
1408                       "likely false positive due to mmap IO (bug 11742)";
1409
1410         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1411                            DFID " object "DOSTID" extent [%llu-%llu], original "
1412                            "client csum %x (type %x), server csum %x (type %x),"
1413                            " client csum now %x\n",
1414                            aa->aa_cli->cl_import->imp_obd->obd_name,
1415                            msg, libcfs_nid2str(peer->nid),
1416                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1417                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1418                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1419                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1420                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1421                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1422                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1423                            server_cksum, cksum_type, new_cksum);
1424         return 1;
1425 }
1426
1427 /* Note rc enters this function as number of bytes transferred */
1428 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1429 {
1430         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1431         const struct lnet_process_id *peer =
1432                         &req->rq_import->imp_connection->c_peer;
1433         struct client_obd *cli = aa->aa_cli;
1434         struct ost_body *body;
1435         u32 client_cksum = 0;
1436         ENTRY;
1437
1438         if (rc < 0 && rc != -EDQUOT) {
1439                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1440                 RETURN(rc);
1441         }
1442
1443         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1444         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1445         if (body == NULL) {
1446                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1447                 RETURN(-EPROTO);
1448         }
1449
1450         /* set/clear over quota flag for a uid/gid/projid */
1451         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1452             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1453                 unsigned qid[LL_MAXQUOTAS] = {
1454                                          body->oa.o_uid, body->oa.o_gid,
1455                                          body->oa.o_projid };
1456                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1457                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1458                        body->oa.o_valid, body->oa.o_flags);
1459                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1460                                        body->oa.o_flags);
1461         }
1462
1463         osc_update_grant(cli, body);
1464
1465         if (rc < 0)
1466                 RETURN(rc);
1467
1468         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1469                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1470
1471         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1472                 if (rc > 0) {
1473                         CERROR("Unexpected +ve rc %d\n", rc);
1474                         RETURN(-EPROTO);
1475                 }
1476                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1477
1478                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1479                         RETURN(-EAGAIN);
1480
1481                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1482                     check_write_checksum(&body->oa, peer, client_cksum,
1483                                          body->oa.o_cksum, aa))
1484                         RETURN(-EAGAIN);
1485
1486                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1487                                      aa->aa_page_count, aa->aa_ppga);
1488                 GOTO(out, rc);
1489         }
1490
1491         /* The rest of this function executes only for OST_READs */
1492
1493         /* if unwrap_bulk failed, return -EAGAIN to retry */
1494         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1495         if (rc < 0)
1496                 GOTO(out, rc = -EAGAIN);
1497
1498         if (rc > aa->aa_requested_nob) {
1499                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1500                        aa->aa_requested_nob);
1501                 RETURN(-EPROTO);
1502         }
1503
1504         if (rc != req->rq_bulk->bd_nob_transferred) {
1505                 CERROR ("Unexpected rc %d (%d transferred)\n",
1506                         rc, req->rq_bulk->bd_nob_transferred);
1507                 return (-EPROTO);
1508         }
1509
1510         if (rc < aa->aa_requested_nob)
1511                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1512
1513         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1514                 static int cksum_counter;
1515                 u32        server_cksum = body->oa.o_cksum;
1516                 char      *via = "";
1517                 char      *router = "";
1518                 enum cksum_types cksum_type;
1519
1520                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1521                                                body->oa.o_flags : 0);
1522                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1523                                                  aa->aa_ppga, OST_READ,
1524                                                  cksum_type);
1525
1526                 if (peer->nid != req->rq_bulk->bd_sender) {
1527                         via = " via ";
1528                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1529                 }
1530
1531                 if (server_cksum != client_cksum) {
1532                         struct ost_body *clbody;
1533                         u32 page_count = aa->aa_page_count;
1534
1535                         clbody = req_capsule_client_get(&req->rq_pill,
1536                                                         &RMF_OST_BODY);
1537                         if (cli->cl_checksum_dump)
1538                                 dump_all_bulk_pages(&clbody->oa, page_count,
1539                                                     aa->aa_ppga, server_cksum,
1540                                                     client_cksum);
1541
1542                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1543                                            "%s%s%s inode "DFID" object "DOSTID
1544                                            " extent [%llu-%llu], client %x, "
1545                                            "server %x, cksum_type %x\n",
1546                                            req->rq_import->imp_obd->obd_name,
1547                                            libcfs_nid2str(peer->nid),
1548                                            via, router,
1549                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1550                                                 clbody->oa.o_parent_seq : 0ULL,
1551                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1552                                                 clbody->oa.o_parent_oid : 0,
1553                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1554                                                 clbody->oa.o_parent_ver : 0,
1555                                            POSTID(&body->oa.o_oi),
1556                                            aa->aa_ppga[0]->off,
1557                                            aa->aa_ppga[page_count-1]->off +
1558                                            aa->aa_ppga[page_count-1]->count - 1,
1559                                            client_cksum, server_cksum,
1560                                            cksum_type);
1561                         cksum_counter = 0;
1562                         aa->aa_oa->o_cksum = client_cksum;
1563                         rc = -EAGAIN;
1564                 } else {
1565                         cksum_counter++;
1566                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1567                         rc = 0;
1568                 }
1569         } else if (unlikely(client_cksum)) {
1570                 static int cksum_missed;
1571
1572                 cksum_missed++;
1573                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1574                         CERROR("Checksum %u requested from %s but not sent\n",
1575                                cksum_missed, libcfs_nid2str(peer->nid));
1576         } else {
1577                 rc = 0;
1578         }
1579 out:
1580         if (rc >= 0)
1581                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1582                                      aa->aa_oa, &body->oa);
1583
1584         RETURN(rc);
1585 }
1586
1587 static int osc_brw_redo_request(struct ptlrpc_request *request,
1588                                 struct osc_brw_async_args *aa, int rc)
1589 {
1590         struct ptlrpc_request *new_req;
1591         struct osc_brw_async_args *new_aa;
1592         struct osc_async_page *oap;
1593         ENTRY;
1594
1595         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1596                   "redo for recoverable error %d", rc);
1597
1598         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1599                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1600                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1601                                   aa->aa_ppga, &new_req, 1);
1602         if (rc)
1603                 RETURN(rc);
1604
1605         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1606                 if (oap->oap_request != NULL) {
1607                         LASSERTF(request == oap->oap_request,
1608                                  "request %p != oap_request %p\n",
1609                                  request, oap->oap_request);
1610                         if (oap->oap_interrupted) {
1611                                 ptlrpc_req_finished(new_req);
1612                                 RETURN(-EINTR);
1613                         }
1614                 }
1615         }
1616         /* New request takes over pga and oaps from old request.
1617          * Note that copying a list_head doesn't work, need to move it... */
1618         aa->aa_resends++;
1619         new_req->rq_interpret_reply = request->rq_interpret_reply;
1620         new_req->rq_async_args = request->rq_async_args;
1621         new_req->rq_commit_cb = request->rq_commit_cb;
1622         /* cap resend delay to the current request timeout, this is similar to
1623          * what ptlrpc does (see after_reply()) */
1624         if (aa->aa_resends > new_req->rq_timeout)
1625                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1626         else
1627                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1628         new_req->rq_generation_set = 1;
1629         new_req->rq_import_generation = request->rq_import_generation;
1630
1631         new_aa = ptlrpc_req_async_args(new_req);
1632
1633         INIT_LIST_HEAD(&new_aa->aa_oaps);
1634         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1635         INIT_LIST_HEAD(&new_aa->aa_exts);
1636         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1637         new_aa->aa_resends = aa->aa_resends;
1638
1639         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1640                 if (oap->oap_request) {
1641                         ptlrpc_req_finished(oap->oap_request);
1642                         oap->oap_request = ptlrpc_request_addref(new_req);
1643                 }
1644         }
1645
1646         /* XXX: This code will run into problem if we're going to support
1647          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1648          * and wait for all of them to be finished. We should inherit request
1649          * set from old request. */
1650         ptlrpcd_add_req(new_req);
1651
1652         DEBUG_REQ(D_INFO, new_req, "new request");
1653         RETURN(0);
1654 }
1655
1656 /*
1657  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1658  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1659  * fine for our small page arrays and doesn't require allocation.  its an
1660  * insertion sort that swaps elements that are strides apart, shrinking the
1661  * stride down until its '1' and the array is sorted.
1662  */
1663 static void sort_brw_pages(struct brw_page **array, int num)
1664 {
1665         int stride, i, j;
1666         struct brw_page *tmp;
1667
1668         if (num == 1)
1669                 return;
1670         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1671                 ;
1672
1673         do {
1674                 stride /= 3;
1675                 for (i = stride ; i < num ; i++) {
1676                         tmp = array[i];
1677                         j = i;
1678                         while (j >= stride && array[j - stride]->off > tmp->off) {
1679                                 array[j] = array[j - stride];
1680                                 j -= stride;
1681                         }
1682                         array[j] = tmp;
1683                 }
1684         } while (stride > 1);
1685 }
1686
1687 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1688 {
1689         LASSERT(ppga != NULL);
1690         OBD_FREE(ppga, sizeof(*ppga) * count);
1691 }
1692
1693 static int brw_interpret(const struct lu_env *env,
1694                          struct ptlrpc_request *req, void *data, int rc)
1695 {
1696         struct osc_brw_async_args *aa = data;
1697         struct osc_extent *ext;
1698         struct osc_extent *tmp;
1699         struct client_obd *cli = aa->aa_cli;
1700         ENTRY;
1701
1702         rc = osc_brw_fini_request(req, rc);
1703         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1704         /* When server return -EINPROGRESS, client should always retry
1705          * regardless of the number of times the bulk was resent already. */
1706         if (osc_recoverable_error(rc)) {
1707                 if (req->rq_import_generation !=
1708                     req->rq_import->imp_generation) {
1709                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1710                                ""DOSTID", rc = %d.\n",
1711                                req->rq_import->imp_obd->obd_name,
1712                                POSTID(&aa->aa_oa->o_oi), rc);
1713                 } else if (rc == -EINPROGRESS ||
1714                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1715                         rc = osc_brw_redo_request(req, aa, rc);
1716                 } else {
1717                         CERROR("%s: too many resent retries for object: "
1718                                "%llu:%llu, rc = %d.\n",
1719                                req->rq_import->imp_obd->obd_name,
1720                                POSTID(&aa->aa_oa->o_oi), rc);
1721                 }
1722
1723                 if (rc == 0)
1724                         RETURN(0);
1725                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1726                         rc = -EIO;
1727         }
1728
1729         if (rc == 0) {
1730                 struct obdo *oa = aa->aa_oa;
1731                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1732                 unsigned long valid = 0;
1733                 struct cl_object *obj;
1734                 struct osc_async_page *last;
1735
1736                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1737                 obj = osc2cl(last->oap_obj);
1738
1739                 cl_object_attr_lock(obj);
1740                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1741                         attr->cat_blocks = oa->o_blocks;
1742                         valid |= CAT_BLOCKS;
1743                 }
1744                 if (oa->o_valid & OBD_MD_FLMTIME) {
1745                         attr->cat_mtime = oa->o_mtime;
1746                         valid |= CAT_MTIME;
1747                 }
1748                 if (oa->o_valid & OBD_MD_FLATIME) {
1749                         attr->cat_atime = oa->o_atime;
1750                         valid |= CAT_ATIME;
1751                 }
1752                 if (oa->o_valid & OBD_MD_FLCTIME) {
1753                         attr->cat_ctime = oa->o_ctime;
1754                         valid |= CAT_CTIME;
1755                 }
1756
1757                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1758                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1759                         loff_t last_off = last->oap_count + last->oap_obj_off +
1760                                 last->oap_page_off;
1761
1762                         /* Change file size if this is an out of quota or
1763                          * direct IO write and it extends the file size */
1764                         if (loi->loi_lvb.lvb_size < last_off) {
1765                                 attr->cat_size = last_off;
1766                                 valid |= CAT_SIZE;
1767                         }
1768                         /* Extend KMS if it's not a lockless write */
1769                         if (loi->loi_kms < last_off &&
1770                             oap2osc_page(last)->ops_srvlock == 0) {
1771                                 attr->cat_kms = last_off;
1772                                 valid |= CAT_KMS;
1773                         }
1774                 }
1775
1776                 if (valid != 0)
1777                         cl_object_attr_update(env, obj, attr, valid);
1778                 cl_object_attr_unlock(obj);
1779         }
1780         OBDO_FREE(aa->aa_oa);
1781
1782         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1783                 osc_inc_unstable_pages(req);
1784
1785         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1786                 list_del_init(&ext->oe_link);
1787                 osc_extent_finish(env, ext, 1, rc);
1788         }
1789         LASSERT(list_empty(&aa->aa_exts));
1790         LASSERT(list_empty(&aa->aa_oaps));
1791
1792         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1793         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1794
1795         spin_lock(&cli->cl_loi_list_lock);
1796         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1797          * is called so we know whether to go to sync BRWs or wait for more
1798          * RPCs to complete */
1799         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1800                 cli->cl_w_in_flight--;
1801         else
1802                 cli->cl_r_in_flight--;
1803         osc_wake_cache_waiters(cli);
1804         spin_unlock(&cli->cl_loi_list_lock);
1805
1806         osc_io_unplug(env, cli, NULL);
1807         RETURN(rc);
1808 }
1809
1810 static void brw_commit(struct ptlrpc_request *req)
1811 {
1812         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1813          * this called via the rq_commit_cb, I need to ensure
1814          * osc_dec_unstable_pages is still called. Otherwise unstable
1815          * pages may be leaked. */
1816         spin_lock(&req->rq_lock);
1817         if (likely(req->rq_unstable)) {
1818                 req->rq_unstable = 0;
1819                 spin_unlock(&req->rq_lock);
1820
1821                 osc_dec_unstable_pages(req);
1822         } else {
1823                 req->rq_committed = 1;
1824                 spin_unlock(&req->rq_lock);
1825         }
1826 }
1827
1828 /**
1829  * Build an RPC by the list of extent @ext_list. The caller must ensure
1830  * that the total pages in this list are NOT over max pages per RPC.
1831  * Extents in the list must be in OES_RPC state.
1832  */
1833 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1834                   struct list_head *ext_list, int cmd)
1835 {
1836         struct ptlrpc_request           *req = NULL;
1837         struct osc_extent               *ext;
1838         struct brw_page                 **pga = NULL;
1839         struct osc_brw_async_args       *aa = NULL;
1840         struct obdo                     *oa = NULL;
1841         struct osc_async_page           *oap;
1842         struct osc_object               *obj = NULL;
1843         struct cl_req_attr              *crattr = NULL;
1844         loff_t                          starting_offset = OBD_OBJECT_EOF;
1845         loff_t                          ending_offset = 0;
1846         int                             mpflag = 0;
1847         int                             mem_tight = 0;
1848         int                             page_count = 0;
1849         bool                            soft_sync = false;
1850         bool                            interrupted = false;
1851         int                             i;
1852         int                             grant = 0;
1853         int                             rc;
1854         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1855         struct ost_body                 *body;
1856         ENTRY;
1857         LASSERT(!list_empty(ext_list));
1858
1859         /* add pages into rpc_list to build BRW rpc */
1860         list_for_each_entry(ext, ext_list, oe_link) {
1861                 LASSERT(ext->oe_state == OES_RPC);
1862                 mem_tight |= ext->oe_memalloc;
1863                 grant += ext->oe_grants;
1864                 page_count += ext->oe_nr_pages;
1865                 if (obj == NULL)
1866                         obj = ext->oe_obj;
1867         }
1868
1869         soft_sync = osc_over_unstable_soft_limit(cli);
1870         if (mem_tight)
1871                 mpflag = cfs_memory_pressure_get_and_set();
1872
1873         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1874         if (pga == NULL)
1875                 GOTO(out, rc = -ENOMEM);
1876
1877         OBDO_ALLOC(oa);
1878         if (oa == NULL)
1879                 GOTO(out, rc = -ENOMEM);
1880
1881         i = 0;
1882         list_for_each_entry(ext, ext_list, oe_link) {
1883                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1884                         if (mem_tight)
1885                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1886                         if (soft_sync)
1887                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1888                         pga[i] = &oap->oap_brw_page;
1889                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1890                         i++;
1891
1892                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1893                         if (starting_offset == OBD_OBJECT_EOF ||
1894                             starting_offset > oap->oap_obj_off)
1895                                 starting_offset = oap->oap_obj_off;
1896                         else
1897                                 LASSERT(oap->oap_page_off == 0);
1898                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1899                                 ending_offset = oap->oap_obj_off +
1900                                                 oap->oap_count;
1901                         else
1902                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1903                                         PAGE_SIZE);
1904                         if (oap->oap_interrupted)
1905                                 interrupted = true;
1906                 }
1907         }
1908
1909         /* first page in the list */
1910         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1911
1912         crattr = &osc_env_info(env)->oti_req_attr;
1913         memset(crattr, 0, sizeof(*crattr));
1914         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1915         crattr->cra_flags = ~0ULL;
1916         crattr->cra_page = oap2cl_page(oap);
1917         crattr->cra_oa = oa;
1918         cl_req_attr_set(env, osc2cl(obj), crattr);
1919
1920         if (cmd == OBD_BRW_WRITE)
1921                 oa->o_grant_used = grant;
1922
1923         sort_brw_pages(pga, page_count);
1924         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1925         if (rc != 0) {
1926                 CERROR("prep_req failed: %d\n", rc);
1927                 GOTO(out, rc);
1928         }
1929
1930         req->rq_commit_cb = brw_commit;
1931         req->rq_interpret_reply = brw_interpret;
1932         req->rq_memalloc = mem_tight != 0;
1933         oap->oap_request = ptlrpc_request_addref(req);
1934         if (interrupted && !req->rq_intr)
1935                 ptlrpc_mark_interrupted(req);
1936
1937         /* Need to update the timestamps after the request is built in case
1938          * we race with setattr (locally or in queue at OST).  If OST gets
1939          * later setattr before earlier BRW (as determined by the request xid),
1940          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1941          * way to do this in a single call.  bug 10150 */
1942         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1943         crattr->cra_oa = &body->oa;
1944         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1945         cl_req_attr_set(env, osc2cl(obj), crattr);
1946         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1947
1948         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1949         aa = ptlrpc_req_async_args(req);
1950         INIT_LIST_HEAD(&aa->aa_oaps);
1951         list_splice_init(&rpc_list, &aa->aa_oaps);
1952         INIT_LIST_HEAD(&aa->aa_exts);
1953         list_splice_init(ext_list, &aa->aa_exts);
1954
1955         spin_lock(&cli->cl_loi_list_lock);
1956         starting_offset >>= PAGE_SHIFT;
1957         if (cmd == OBD_BRW_READ) {
1958                 cli->cl_r_in_flight++;
1959                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1960                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1961                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1962                                       starting_offset + 1);
1963         } else {
1964                 cli->cl_w_in_flight++;
1965                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1966                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1967                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1968                                       starting_offset + 1);
1969         }
1970         spin_unlock(&cli->cl_loi_list_lock);
1971
1972         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1973                   page_count, aa, cli->cl_r_in_flight,
1974                   cli->cl_w_in_flight);
1975         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1976
1977         ptlrpcd_add_req(req);
1978         rc = 0;
1979         EXIT;
1980
1981 out:
1982         if (mem_tight != 0)
1983                 cfs_memory_pressure_restore(mpflag);
1984
1985         if (rc != 0) {
1986                 LASSERT(req == NULL);
1987
1988                 if (oa)
1989                         OBDO_FREE(oa);
1990                 if (pga)
1991                         OBD_FREE(pga, sizeof(*pga) * page_count);
1992                 /* this should happen rarely and is pretty bad, it makes the
1993                  * pending list not follow the dirty order */
1994                 while (!list_empty(ext_list)) {
1995                         ext = list_entry(ext_list->next, struct osc_extent,
1996                                          oe_link);
1997                         list_del_init(&ext->oe_link);
1998                         osc_extent_finish(env, ext, 0, rc);
1999                 }
2000         }
2001         RETURN(rc);
2002 }
2003
2004 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2005 {
2006         int set = 0;
2007
2008         LASSERT(lock != NULL);
2009
2010         lock_res_and_lock(lock);
2011
2012         if (lock->l_ast_data == NULL)
2013                 lock->l_ast_data = data;
2014         if (lock->l_ast_data == data)
2015                 set = 1;
2016
2017         unlock_res_and_lock(lock);
2018
2019         return set;
2020 }
2021
2022 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2023                      void *cookie, struct lustre_handle *lockh,
2024                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2025                      int errcode)
2026 {
2027         bool intent = *flags & LDLM_FL_HAS_INTENT;
2028         int rc;
2029         ENTRY;
2030
2031         /* The request was created before ldlm_cli_enqueue call. */
2032         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2033                 struct ldlm_reply *rep;
2034
2035                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2036                 LASSERT(rep != NULL);
2037
2038                 rep->lock_policy_res1 =
2039                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2040                 if (rep->lock_policy_res1)
2041                         errcode = rep->lock_policy_res1;
2042                 if (!speculative)
2043                         *flags |= LDLM_FL_LVB_READY;
2044         } else if (errcode == ELDLM_OK) {
2045                 *flags |= LDLM_FL_LVB_READY;
2046         }
2047
2048         /* Call the update callback. */
2049         rc = (*upcall)(cookie, lockh, errcode);
2050
2051         /* release the reference taken in ldlm_cli_enqueue() */
2052         if (errcode == ELDLM_LOCK_MATCHED)
2053                 errcode = ELDLM_OK;
2054         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2055                 ldlm_lock_decref(lockh, mode);
2056
2057         RETURN(rc);
2058 }
2059
2060 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2061                           struct osc_enqueue_args *aa, int rc)
2062 {
2063         struct ldlm_lock *lock;
2064         struct lustre_handle *lockh = &aa->oa_lockh;
2065         enum ldlm_mode mode = aa->oa_mode;
2066         struct ost_lvb *lvb = aa->oa_lvb;
2067         __u32 lvb_len = sizeof(*lvb);
2068         __u64 flags = 0;
2069
2070         ENTRY;
2071
2072         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2073          * be valid. */
2074         lock = ldlm_handle2lock(lockh);
2075         LASSERTF(lock != NULL,
2076                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2077                  lockh->cookie, req, aa);
2078
2079         /* Take an additional reference so that a blocking AST that
2080          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2081          * to arrive after an upcall has been executed by
2082          * osc_enqueue_fini(). */
2083         ldlm_lock_addref(lockh, mode);
2084
2085         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2086         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2087
2088         /* Let CP AST to grant the lock first. */
2089         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2090
2091         if (aa->oa_speculative) {
2092                 LASSERT(aa->oa_lvb == NULL);
2093                 LASSERT(aa->oa_flags == NULL);
2094                 aa->oa_flags = &flags;
2095         }
2096
2097         /* Complete obtaining the lock procedure. */
2098         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2099                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2100                                    lockh, rc);
2101         /* Complete osc stuff. */
2102         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2103                               aa->oa_flags, aa->oa_speculative, rc);
2104
2105         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2106
2107         ldlm_lock_decref(lockh, mode);
2108         LDLM_LOCK_PUT(lock);
2109         RETURN(rc);
2110 }
2111
2112 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2113
2114 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2115  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2116  * other synchronous requests, however keeping some locks and trying to obtain
2117  * others may take a considerable amount of time in a case of ost failure; and
2118  * when other sync requests do not get released lock from a client, the client
2119  * is evicted from the cluster -- such scenarious make the life difficult, so
2120  * release locks just after they are obtained. */
2121 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2122                      __u64 *flags, union ldlm_policy_data *policy,
2123                      struct ost_lvb *lvb, int kms_valid,
2124                      osc_enqueue_upcall_f upcall, void *cookie,
2125                      struct ldlm_enqueue_info *einfo,
2126                      struct ptlrpc_request_set *rqset, int async,
2127                      bool speculative)
2128 {
2129         struct obd_device *obd = exp->exp_obd;
2130         struct lustre_handle lockh = { 0 };
2131         struct ptlrpc_request *req = NULL;
2132         int intent = *flags & LDLM_FL_HAS_INTENT;
2133         __u64 match_flags = *flags;
2134         enum ldlm_mode mode;
2135         int rc;
2136         ENTRY;
2137
2138         /* Filesystem lock extents are extended to page boundaries so that
2139          * dealing with the page cache is a little smoother.  */
2140         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2141         policy->l_extent.end |= ~PAGE_MASK;
2142
2143         /*
2144          * kms is not valid when either object is completely fresh (so that no
2145          * locks are cached), or object was evicted. In the latter case cached
2146          * lock cannot be used, because it would prime inode state with
2147          * potentially stale LVB.
2148          */
2149         if (!kms_valid)
2150                 goto no_match;
2151
2152         /* Next, search for already existing extent locks that will cover us */
2153         /* If we're trying to read, we also search for an existing PW lock.  The
2154          * VFS and page cache already protect us locally, so lots of readers/
2155          * writers can share a single PW lock.
2156          *
2157          * There are problems with conversion deadlocks, so instead of
2158          * converting a read lock to a write lock, we'll just enqueue a new
2159          * one.
2160          *
2161          * At some point we should cancel the read lock instead of making them
2162          * send us a blocking callback, but there are problems with canceling
2163          * locks out from other users right now, too. */
2164         mode = einfo->ei_mode;
2165         if (einfo->ei_mode == LCK_PR)
2166                 mode |= LCK_PW;
2167         /* Normal lock requests must wait for the LVB to be ready before
2168          * matching a lock; speculative lock requests do not need to,
2169          * because they will not actually use the lock. */
2170         if (!speculative)
2171                 match_flags |= LDLM_FL_LVB_READY;
2172         if (intent != 0)
2173                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2174         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2175                                einfo->ei_type, policy, mode, &lockh, 0);
2176         if (mode) {
2177                 struct ldlm_lock *matched;
2178
2179                 if (*flags & LDLM_FL_TEST_LOCK)
2180                         RETURN(ELDLM_OK);
2181
2182                 matched = ldlm_handle2lock(&lockh);
2183                 if (speculative) {
2184                         /* This DLM lock request is speculative, and does not
2185                          * have an associated IO request. Therefore if there
2186                          * is already a DLM lock, it wll just inform the
2187                          * caller to cancel the request for this stripe.*/
2188                         lock_res_and_lock(matched);
2189                         if (ldlm_extent_equal(&policy->l_extent,
2190                             &matched->l_policy_data.l_extent))
2191                                 rc = -EEXIST;
2192                         else
2193                                 rc = -ECANCELED;
2194                         unlock_res_and_lock(matched);
2195
2196                         ldlm_lock_decref(&lockh, mode);
2197                         LDLM_LOCK_PUT(matched);
2198                         RETURN(rc);
2199                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2200                         *flags |= LDLM_FL_LVB_READY;
2201
2202                         /* We already have a lock, and it's referenced. */
2203                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2204
2205                         ldlm_lock_decref(&lockh, mode);
2206                         LDLM_LOCK_PUT(matched);
2207                         RETURN(ELDLM_OK);
2208                 } else {
2209                         ldlm_lock_decref(&lockh, mode);
2210                         LDLM_LOCK_PUT(matched);
2211                 }
2212         }
2213
2214 no_match:
2215         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2216                 RETURN(-ENOLCK);
2217
2218         if (intent) {
2219                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2220                                            &RQF_LDLM_ENQUEUE_LVB);
2221                 if (req == NULL)
2222                         RETURN(-ENOMEM);
2223
2224                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2225                 if (rc) {
2226                         ptlrpc_request_free(req);
2227                         RETURN(rc);
2228                 }
2229
2230                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2231                                      sizeof *lvb);
2232                 ptlrpc_request_set_replen(req);
2233         }
2234
2235         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2236         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2237
2238         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2239                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2240         if (async) {
2241                 if (!rc) {
2242                         struct osc_enqueue_args *aa;
2243                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2244                         aa = ptlrpc_req_async_args(req);
2245                         aa->oa_exp         = exp;
2246                         aa->oa_mode        = einfo->ei_mode;
2247                         aa->oa_type        = einfo->ei_type;
2248                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2249                         aa->oa_upcall      = upcall;
2250                         aa->oa_cookie      = cookie;
2251                         aa->oa_speculative = speculative;
2252                         if (!speculative) {
2253                                 aa->oa_flags  = flags;
2254                                 aa->oa_lvb    = lvb;
2255                         } else {
2256                                 /* speculative locks are essentially to enqueue
2257                                  * a DLM lock  in advance, so we don't care
2258                                  * about the result of the enqueue. */
2259                                 aa->oa_lvb    = NULL;
2260                                 aa->oa_flags  = NULL;
2261                         }
2262
2263                         req->rq_interpret_reply =
2264                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2265                         if (rqset == PTLRPCD_SET)
2266                                 ptlrpcd_add_req(req);
2267                         else
2268                                 ptlrpc_set_add_req(rqset, req);
2269                 } else if (intent) {
2270                         ptlrpc_req_finished(req);
2271                 }
2272                 RETURN(rc);
2273         }
2274
2275         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2276                               flags, speculative, rc);
2277         if (intent)
2278                 ptlrpc_req_finished(req);
2279
2280         RETURN(rc);
2281 }
2282
2283 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2284                    enum ldlm_type type, union ldlm_policy_data *policy,
2285                    enum ldlm_mode mode, __u64 *flags, void *data,
2286                    struct lustre_handle *lockh, int unref)
2287 {
2288         struct obd_device *obd = exp->exp_obd;
2289         __u64 lflags = *flags;
2290         enum ldlm_mode rc;
2291         ENTRY;
2292
2293         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2294                 RETURN(-EIO);
2295
2296         /* Filesystem lock extents are extended to page boundaries so that
2297          * dealing with the page cache is a little smoother */
2298         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2299         policy->l_extent.end |= ~PAGE_MASK;
2300
2301         /* Next, search for already existing extent locks that will cover us */
2302         /* If we're trying to read, we also search for an existing PW lock.  The
2303          * VFS and page cache already protect us locally, so lots of readers/
2304          * writers can share a single PW lock. */
2305         rc = mode;
2306         if (mode == LCK_PR)
2307                 rc |= LCK_PW;
2308         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2309                              res_id, type, policy, rc, lockh, unref);
2310         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2311                 RETURN(rc);
2312
2313         if (data != NULL) {
2314                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2315
2316                 LASSERT(lock != NULL);
2317                 if (!osc_set_lock_data(lock, data)) {
2318                         ldlm_lock_decref(lockh, rc);
2319                         rc = 0;
2320                 }
2321                 LDLM_LOCK_PUT(lock);
2322         }
2323         RETURN(rc);
2324 }
2325
2326 static int osc_statfs_interpret(const struct lu_env *env,
2327                                 struct ptlrpc_request *req,
2328                                 struct osc_async_args *aa, int rc)
2329 {
2330         struct obd_statfs *msfs;
2331         ENTRY;
2332
2333         if (rc == -EBADR)
2334                 /* The request has in fact never been sent
2335                  * due to issues at a higher level (LOV).
2336                  * Exit immediately since the caller is
2337                  * aware of the problem and takes care
2338                  * of the clean up */
2339                  RETURN(rc);
2340
2341         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2342             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2343                 GOTO(out, rc = 0);
2344
2345         if (rc != 0)
2346                 GOTO(out, rc);
2347
2348         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2349         if (msfs == NULL) {
2350                 GOTO(out, rc = -EPROTO);
2351         }
2352
2353         *aa->aa_oi->oi_osfs = *msfs;
2354 out:
2355         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2356         RETURN(rc);
2357 }
2358
2359 static int osc_statfs_async(struct obd_export *exp,
2360                             struct obd_info *oinfo, __u64 max_age,
2361                             struct ptlrpc_request_set *rqset)
2362 {
2363         struct obd_device     *obd = class_exp2obd(exp);
2364         struct ptlrpc_request *req;
2365         struct osc_async_args *aa;
2366         int                    rc;
2367         ENTRY;
2368
2369         /* We could possibly pass max_age in the request (as an absolute
2370          * timestamp or a "seconds.usec ago") so the target can avoid doing
2371          * extra calls into the filesystem if that isn't necessary (e.g.
2372          * during mount that would help a bit).  Having relative timestamps
2373          * is not so great if request processing is slow, while absolute
2374          * timestamps are not ideal because they need time synchronization. */
2375         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2376         if (req == NULL)
2377                 RETURN(-ENOMEM);
2378
2379         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2380         if (rc) {
2381                 ptlrpc_request_free(req);
2382                 RETURN(rc);
2383         }
2384         ptlrpc_request_set_replen(req);
2385         req->rq_request_portal = OST_CREATE_PORTAL;
2386         ptlrpc_at_set_req_timeout(req);
2387
2388         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2389                 /* procfs requests not want stat in wait for avoid deadlock */
2390                 req->rq_no_resend = 1;
2391                 req->rq_no_delay = 1;
2392         }
2393
2394         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2395         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2396         aa = ptlrpc_req_async_args(req);
2397         aa->aa_oi = oinfo;
2398
2399         ptlrpc_set_add_req(rqset, req);
2400         RETURN(0);
2401 }
2402
2403 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2404                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2405 {
2406         struct obd_device     *obd = class_exp2obd(exp);
2407         struct obd_statfs     *msfs;
2408         struct ptlrpc_request *req;
2409         struct obd_import     *imp = NULL;
2410         int rc;
2411         ENTRY;
2412
2413         /*Since the request might also come from lprocfs, so we need
2414          *sync this with client_disconnect_export Bug15684*/
2415         down_read(&obd->u.cli.cl_sem);
2416         if (obd->u.cli.cl_import)
2417                 imp = class_import_get(obd->u.cli.cl_import);
2418         up_read(&obd->u.cli.cl_sem);
2419         if (!imp)
2420                 RETURN(-ENODEV);
2421
2422         /* We could possibly pass max_age in the request (as an absolute
2423          * timestamp or a "seconds.usec ago") so the target can avoid doing
2424          * extra calls into the filesystem if that isn't necessary (e.g.
2425          * during mount that would help a bit).  Having relative timestamps
2426          * is not so great if request processing is slow, while absolute
2427          * timestamps are not ideal because they need time synchronization. */
2428         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2429
2430         class_import_put(imp);
2431
2432         if (req == NULL)
2433                 RETURN(-ENOMEM);
2434
2435         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2436         if (rc) {
2437                 ptlrpc_request_free(req);
2438                 RETURN(rc);
2439         }
2440         ptlrpc_request_set_replen(req);
2441         req->rq_request_portal = OST_CREATE_PORTAL;
2442         ptlrpc_at_set_req_timeout(req);
2443
2444         if (flags & OBD_STATFS_NODELAY) {
2445                 /* procfs requests not want stat in wait for avoid deadlock */
2446                 req->rq_no_resend = 1;
2447                 req->rq_no_delay = 1;
2448         }
2449
2450         rc = ptlrpc_queue_wait(req);
2451         if (rc)
2452                 GOTO(out, rc);
2453
2454         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2455         if (msfs == NULL) {
2456                 GOTO(out, rc = -EPROTO);
2457         }
2458
2459         *osfs = *msfs;
2460
2461         EXIT;
2462  out:
2463         ptlrpc_req_finished(req);
2464         return rc;
2465 }
2466
2467 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2468                          void *karg, void __user *uarg)
2469 {
2470         struct obd_device *obd = exp->exp_obd;
2471         struct obd_ioctl_data *data = karg;
2472         int err = 0;
2473         ENTRY;
2474
2475         if (!try_module_get(THIS_MODULE)) {
2476                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2477                        module_name(THIS_MODULE));
2478                 return -EINVAL;
2479         }
2480         switch (cmd) {
2481         case OBD_IOC_CLIENT_RECOVER:
2482                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2483                                             data->ioc_inlbuf1, 0);
2484                 if (err > 0)
2485                         err = 0;
2486                 GOTO(out, err);
2487         case IOC_OSC_SET_ACTIVE:
2488                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2489                                                data->ioc_offset);
2490                 GOTO(out, err);
2491         case OBD_IOC_PING_TARGET:
2492                 err = ptlrpc_obd_ping(obd);
2493                 GOTO(out, err);
2494         default:
2495                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2496                        cmd, current_comm());
2497                 GOTO(out, err = -ENOTTY);
2498         }
2499 out:
2500         module_put(THIS_MODULE);
2501         return err;
2502 }
2503
2504 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2505                        u32 keylen, void *key, u32 vallen, void *val,
2506                        struct ptlrpc_request_set *set)
2507 {
2508         struct ptlrpc_request *req;
2509         struct obd_device     *obd = exp->exp_obd;
2510         struct obd_import     *imp = class_exp2cliimp(exp);
2511         char                  *tmp;
2512         int                    rc;
2513         ENTRY;
2514
2515         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2516
2517         if (KEY_IS(KEY_CHECKSUM)) {
2518                 if (vallen != sizeof(int))
2519                         RETURN(-EINVAL);
2520                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2521                 RETURN(0);
2522         }
2523
2524         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2525                 sptlrpc_conf_client_adapt(obd);
2526                 RETURN(0);
2527         }
2528
2529         if (KEY_IS(KEY_FLUSH_CTX)) {
2530                 sptlrpc_import_flush_my_ctx(imp);
2531                 RETURN(0);
2532         }
2533
2534         if (KEY_IS(KEY_CACHE_SET)) {
2535                 struct client_obd *cli = &obd->u.cli;
2536
2537                 LASSERT(cli->cl_cache == NULL); /* only once */
2538                 cli->cl_cache = (struct cl_client_cache *)val;
2539                 cl_cache_incref(cli->cl_cache);
2540                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2541
2542                 /* add this osc into entity list */
2543                 LASSERT(list_empty(&cli->cl_lru_osc));
2544                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2545                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2546                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2547
2548                 RETURN(0);
2549         }
2550
2551         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2552                 struct client_obd *cli = &obd->u.cli;
2553                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2554                 long target = *(long *)val;
2555
2556                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2557                 *(long *)val -= nr;
2558                 RETURN(0);
2559         }
2560
2561         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2562                 RETURN(-EINVAL);
2563
2564         /* We pass all other commands directly to OST. Since nobody calls osc
2565            methods directly and everybody is supposed to go through LOV, we
2566            assume lov checked invalid values for us.
2567            The only recognised values so far are evict_by_nid and mds_conn.
2568            Even if something bad goes through, we'd get a -EINVAL from OST
2569            anyway. */
2570
2571         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2572                                                 &RQF_OST_SET_GRANT_INFO :
2573                                                 &RQF_OBD_SET_INFO);
2574         if (req == NULL)
2575                 RETURN(-ENOMEM);
2576
2577         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2578                              RCL_CLIENT, keylen);
2579         if (!KEY_IS(KEY_GRANT_SHRINK))
2580                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2581                                      RCL_CLIENT, vallen);
2582         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2583         if (rc) {
2584                 ptlrpc_request_free(req);
2585                 RETURN(rc);
2586         }
2587
2588         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2589         memcpy(tmp, key, keylen);
2590         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2591                                                         &RMF_OST_BODY :
2592                                                         &RMF_SETINFO_VAL);
2593         memcpy(tmp, val, vallen);
2594
2595         if (KEY_IS(KEY_GRANT_SHRINK)) {
2596                 struct osc_grant_args *aa;
2597                 struct obdo *oa;
2598
2599                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2600                 aa = ptlrpc_req_async_args(req);
2601                 OBDO_ALLOC(oa);
2602                 if (!oa) {
2603                         ptlrpc_req_finished(req);
2604                         RETURN(-ENOMEM);
2605                 }
2606                 *oa = ((struct ost_body *)val)->oa;
2607                 aa->aa_oa = oa;
2608                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2609         }
2610
2611         ptlrpc_request_set_replen(req);
2612         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2613                 LASSERT(set != NULL);
2614                 ptlrpc_set_add_req(set, req);
2615                 ptlrpc_check_set(NULL, set);
2616         } else {
2617                 ptlrpcd_add_req(req);
2618         }
2619
2620         RETURN(0);
2621 }
2622 EXPORT_SYMBOL(osc_set_info_async);
2623
2624 static int osc_reconnect(const struct lu_env *env,
2625                          struct obd_export *exp, struct obd_device *obd,
2626                          struct obd_uuid *cluuid,
2627                          struct obd_connect_data *data,
2628                          void *localdata)
2629 {
2630         struct client_obd *cli = &obd->u.cli;
2631
2632         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2633                 long lost_grant;
2634                 long grant;
2635
2636                 spin_lock(&cli->cl_loi_list_lock);
2637                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2638                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2639                         grant += cli->cl_dirty_grant;
2640                 else
2641                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2642                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2643                 lost_grant = cli->cl_lost_grant;
2644                 cli->cl_lost_grant = 0;
2645                 spin_unlock(&cli->cl_loi_list_lock);
2646
2647                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2648                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2649                        data->ocd_version, data->ocd_grant, lost_grant);
2650         }
2651
2652         RETURN(0);
2653 }
2654
2655 static int osc_disconnect(struct obd_export *exp)
2656 {
2657         struct obd_device *obd = class_exp2obd(exp);
2658         int rc;
2659
2660         rc = client_disconnect_export(exp);
2661         /**
2662          * Initially we put del_shrink_grant before disconnect_export, but it
2663          * causes the following problem if setup (connect) and cleanup
2664          * (disconnect) are tangled together.
2665          *      connect p1                     disconnect p2
2666          *   ptlrpc_connect_import
2667          *     ...............               class_manual_cleanup
2668          *                                     osc_disconnect
2669          *                                     del_shrink_grant
2670          *   ptlrpc_connect_interrupt
2671          *     init_grant_shrink
2672          *   add this client to shrink list
2673          *                                      cleanup_osc
2674          * Bang! pinger trigger the shrink.
2675          * So the osc should be disconnected from the shrink list, after we
2676          * are sure the import has been destroyed. BUG18662
2677          */
2678         if (obd->u.cli.cl_import == NULL)
2679                 osc_del_shrink_grant(&obd->u.cli);
2680         return rc;
2681 }
2682
2683 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2684                                  struct hlist_node *hnode, void *arg)
2685 {
2686         struct lu_env *env = arg;
2687         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2688         struct ldlm_lock *lock;
2689         struct osc_object *osc = NULL;
2690         ENTRY;
2691
2692         lock_res(res);
2693         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2694                 if (lock->l_ast_data != NULL && osc == NULL) {
2695                         osc = lock->l_ast_data;
2696                         cl_object_get(osc2cl(osc));
2697                 }
2698
2699                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2700                  * by the 2nd round of ldlm_namespace_clean() call in
2701                  * osc_import_event(). */
2702                 ldlm_clear_cleaned(lock);
2703         }
2704         unlock_res(res);
2705
2706         if (osc != NULL) {
2707                 osc_object_invalidate(env, osc);
2708                 cl_object_put(env, osc2cl(osc));
2709         }
2710
2711         RETURN(0);
2712 }
2713 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2714
2715 static int osc_import_event(struct obd_device *obd,
2716                             struct obd_import *imp,
2717                             enum obd_import_event event)
2718 {
2719         struct client_obd *cli;
2720         int rc = 0;
2721
2722         ENTRY;
2723         LASSERT(imp->imp_obd == obd);
2724
2725         switch (event) {
2726         case IMP_EVENT_DISCON: {
2727                 cli = &obd->u.cli;
2728                 spin_lock(&cli->cl_loi_list_lock);
2729                 cli->cl_avail_grant = 0;
2730                 cli->cl_lost_grant = 0;
2731                 spin_unlock(&cli->cl_loi_list_lock);
2732                 break;
2733         }
2734         case IMP_EVENT_INACTIVE: {
2735                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2736                 break;
2737         }
2738         case IMP_EVENT_INVALIDATE: {
2739                 struct ldlm_namespace *ns = obd->obd_namespace;
2740                 struct lu_env         *env;
2741                 __u16                  refcheck;
2742
2743                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2744
2745                 env = cl_env_get(&refcheck);
2746                 if (!IS_ERR(env)) {
2747                         osc_io_unplug(env, &obd->u.cli, NULL);
2748
2749                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2750                                                  osc_ldlm_resource_invalidate,
2751                                                  env, 0);
2752                         cl_env_put(env, &refcheck);
2753
2754                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2755                 } else
2756                         rc = PTR_ERR(env);
2757                 break;
2758         }
2759         case IMP_EVENT_ACTIVE: {
2760                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2761                 break;
2762         }
2763         case IMP_EVENT_OCD: {
2764                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2765
2766                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2767                         osc_init_grant(&obd->u.cli, ocd);
2768
2769                 /* See bug 7198 */
2770                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2771                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2772
2773                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2774                 break;
2775         }
2776         case IMP_EVENT_DEACTIVATE: {
2777                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2778                 break;
2779         }
2780         case IMP_EVENT_ACTIVATE: {
2781                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2782                 break;
2783         }
2784         default:
2785                 CERROR("Unknown import event %d\n", event);
2786                 LBUG();
2787         }
2788         RETURN(rc);
2789 }
2790
2791 /**
2792  * Determine whether the lock can be canceled before replaying the lock
2793  * during recovery, see bug16774 for detailed information.
2794  *
2795  * \retval zero the lock can't be canceled
2796  * \retval other ok to cancel
2797  */
2798 static int osc_cancel_weight(struct ldlm_lock *lock)
2799 {
2800         /*
2801          * Cancel all unused and granted extent lock.
2802          */
2803         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2804             lock->l_granted_mode == lock->l_req_mode &&
2805             osc_ldlm_weigh_ast(lock) == 0)
2806                 RETURN(1);
2807
2808         RETURN(0);
2809 }
2810
2811 static int brw_queue_work(const struct lu_env *env, void *data)
2812 {
2813         struct client_obd *cli = data;
2814
2815         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2816
2817         osc_io_unplug(env, cli, NULL);
2818         RETURN(0);
2819 }
2820
2821 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2822 {
2823         struct client_obd *cli = &obd->u.cli;
2824         void *handler;
2825         int rc;
2826
2827         ENTRY;
2828
2829         rc = ptlrpcd_addref();
2830         if (rc)
2831                 RETURN(rc);
2832
2833         rc = client_obd_setup(obd, lcfg);
2834         if (rc)
2835                 GOTO(out_ptlrpcd, rc);
2836
2837
2838         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2839         if (IS_ERR(handler))
2840                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2841         cli->cl_writeback_work = handler;
2842
2843         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2844         if (IS_ERR(handler))
2845                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2846         cli->cl_lru_work = handler;
2847
2848         rc = osc_quota_setup(obd);
2849         if (rc)
2850                 GOTO(out_ptlrpcd_work, rc);
2851
2852         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2853
2854         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2855         RETURN(rc);
2856
2857 out_ptlrpcd_work:
2858         if (cli->cl_writeback_work != NULL) {
2859                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2860                 cli->cl_writeback_work = NULL;
2861         }
2862         if (cli->cl_lru_work != NULL) {
2863                 ptlrpcd_destroy_work(cli->cl_lru_work);
2864                 cli->cl_lru_work = NULL;
2865         }
2866         client_obd_cleanup(obd);
2867 out_ptlrpcd:
2868         ptlrpcd_decref();
2869         RETURN(rc);
2870 }
2871 EXPORT_SYMBOL(osc_setup_common);
2872
2873 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2874 {
2875         struct client_obd *cli = &obd->u.cli;
2876         struct obd_type   *type;
2877         int                adding;
2878         int                added;
2879         int                req_count;
2880         int                rc;
2881
2882         ENTRY;
2883
2884         rc = osc_setup_common(obd, lcfg);
2885         if (rc < 0)
2886                 RETURN(rc);
2887
2888 #ifdef CONFIG_PROC_FS
2889         obd->obd_vars = lprocfs_osc_obd_vars;
2890 #endif
2891         /* If this is true then both client (osc) and server (osp) are on the
2892          * same node. The osp layer if loaded first will register the osc proc
2893          * directory. In that case this obd_device will be attached its proc
2894          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2895          */
2896         type = class_search_type(LUSTRE_OSP_NAME);
2897         if (type && type->typ_procsym) {
2898                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2899                                                        type->typ_procsym,
2900                                                        obd->obd_vars, obd);
2901                 if (IS_ERR(obd->obd_proc_entry)) {
2902                         rc = PTR_ERR(obd->obd_proc_entry);
2903                         CERROR("error %d setting up lprocfs for %s\n", rc,
2904                                obd->obd_name);
2905                         obd->obd_proc_entry = NULL;
2906                 }
2907         }
2908
2909         rc = lprocfs_obd_setup(obd, false);
2910         if (!rc) {
2911                 /* If the basic OSC proc tree construction succeeded then
2912                  * lets do the rest.
2913                  */
2914                 lproc_osc_attach_seqstat(obd);
2915                 sptlrpc_lprocfs_cliobd_attach(obd);
2916                 ptlrpc_lprocfs_register_obd(obd);
2917         }
2918
2919         /*
2920          * We try to control the total number of requests with a upper limit
2921          * osc_reqpool_maxreqcount. There might be some race which will cause
2922          * over-limit allocation, but it is fine.
2923          */
2924         req_count = atomic_read(&osc_pool_req_count);
2925         if (req_count < osc_reqpool_maxreqcount) {
2926                 adding = cli->cl_max_rpcs_in_flight + 2;
2927                 if (req_count + adding > osc_reqpool_maxreqcount)
2928                         adding = osc_reqpool_maxreqcount - req_count;
2929
2930                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2931                 atomic_add(added, &osc_pool_req_count);
2932         }
2933
2934         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2935         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2936
2937         spin_lock(&osc_shrink_lock);
2938         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2939         spin_unlock(&osc_shrink_lock);
2940
2941         RETURN(0);
2942 }
2943
2944 int osc_precleanup_common(struct obd_device *obd)
2945 {
2946         struct client_obd *cli = &obd->u.cli;
2947         ENTRY;
2948
2949         /* LU-464
2950          * for echo client, export may be on zombie list, wait for
2951          * zombie thread to cull it, because cli.cl_import will be
2952          * cleared in client_disconnect_export():
2953          *   class_export_destroy() -> obd_cleanup() ->
2954          *   echo_device_free() -> echo_client_cleanup() ->
2955          *   obd_disconnect() -> osc_disconnect() ->
2956          *   client_disconnect_export()
2957          */
2958         obd_zombie_barrier();
2959         if (cli->cl_writeback_work) {
2960                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2961                 cli->cl_writeback_work = NULL;
2962         }
2963
2964         if (cli->cl_lru_work) {
2965                 ptlrpcd_destroy_work(cli->cl_lru_work);
2966                 cli->cl_lru_work = NULL;
2967         }
2968
2969         obd_cleanup_client_import(obd);
2970         RETURN(0);
2971 }
2972 EXPORT_SYMBOL(osc_precleanup_common);
2973
2974 static int osc_precleanup(struct obd_device *obd)
2975 {
2976         ENTRY;
2977
2978         osc_precleanup_common(obd);
2979
2980         ptlrpc_lprocfs_unregister_obd(obd);
2981         lprocfs_obd_cleanup(obd);
2982         RETURN(0);
2983 }
2984
2985 int osc_cleanup_common(struct obd_device *obd)
2986 {
2987         struct client_obd *cli = &obd->u.cli;
2988         int rc;
2989
2990         ENTRY;
2991
2992         spin_lock(&osc_shrink_lock);
2993         list_del(&cli->cl_shrink_list);
2994         spin_unlock(&osc_shrink_lock);
2995
2996         /* lru cleanup */
2997         if (cli->cl_cache != NULL) {
2998                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2999                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3000                 list_del_init(&cli->cl_lru_osc);
3001                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3002                 cli->cl_lru_left = NULL;
3003                 cl_cache_decref(cli->cl_cache);
3004                 cli->cl_cache = NULL;
3005         }
3006
3007         /* free memory of osc quota cache */
3008         osc_quota_cleanup(obd);
3009
3010         rc = client_obd_cleanup(obd);
3011
3012         ptlrpcd_decref();
3013         RETURN(rc);
3014 }
3015 EXPORT_SYMBOL(osc_cleanup_common);
3016
3017 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3018 {
3019         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3020         return rc > 0 ? 0: rc;
3021 }
3022
3023 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3024 {
3025         return osc_process_config_base(obd, buf);
3026 }
3027
3028 static struct obd_ops osc_obd_ops = {
3029         .o_owner                = THIS_MODULE,
3030         .o_setup                = osc_setup,
3031         .o_precleanup           = osc_precleanup,
3032         .o_cleanup              = osc_cleanup_common,
3033         .o_add_conn             = client_import_add_conn,
3034         .o_del_conn             = client_import_del_conn,
3035         .o_connect              = client_connect_import,
3036         .o_reconnect            = osc_reconnect,
3037         .o_disconnect           = osc_disconnect,
3038         .o_statfs               = osc_statfs,
3039         .o_statfs_async         = osc_statfs_async,
3040         .o_create               = osc_create,
3041         .o_destroy              = osc_destroy,
3042         .o_getattr              = osc_getattr,
3043         .o_setattr              = osc_setattr,
3044         .o_iocontrol            = osc_iocontrol,
3045         .o_set_info_async       = osc_set_info_async,
3046         .o_import_event         = osc_import_event,
3047         .o_process_config       = osc_process_config,
3048         .o_quotactl             = osc_quotactl,
3049 };
3050
3051 static struct shrinker *osc_cache_shrinker;
3052 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3053 DEFINE_SPINLOCK(osc_shrink_lock);
3054
3055 #ifndef HAVE_SHRINKER_COUNT
3056 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3057 {
3058         struct shrink_control scv = {
3059                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3060                 .gfp_mask   = shrink_param(sc, gfp_mask)
3061         };
3062 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3063         struct shrinker *shrinker = NULL;
3064 #endif
3065
3066         (void)osc_cache_shrink_scan(shrinker, &scv);
3067
3068         return osc_cache_shrink_count(shrinker, &scv);
3069 }
3070 #endif
3071
3072 static int __init osc_init(void)
3073 {
3074         bool enable_proc = true;
3075         struct obd_type *type;
3076         unsigned int reqpool_size;
3077         unsigned int reqsize;
3078         int rc;
3079         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3080                          osc_cache_shrink_count, osc_cache_shrink_scan);
3081         ENTRY;
3082
3083         /* print an address of _any_ initialized kernel symbol from this
3084          * module, to allow debugging with gdb that doesn't support data
3085          * symbols from modules.*/
3086         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3087
3088         rc = lu_kmem_init(osc_caches);
3089         if (rc)
3090                 RETURN(rc);
3091
3092         type = class_search_type(LUSTRE_OSP_NAME);
3093         if (type != NULL && type->typ_procsym != NULL)
3094                 enable_proc = false;
3095
3096         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3097                                  LUSTRE_OSC_NAME, &osc_device_type);
3098         if (rc)
3099                 GOTO(out_kmem, rc);
3100
3101         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3102
3103         /* This is obviously too much memory, only prevent overflow here */
3104         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3105                 GOTO(out_type, rc = -EINVAL);
3106
3107         reqpool_size = osc_reqpool_mem_max << 20;
3108
3109         reqsize = 1;
3110         while (reqsize < OST_IO_MAXREQSIZE)
3111                 reqsize = reqsize << 1;
3112
3113         /*
3114          * We don't enlarge the request count in OSC pool according to
3115          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3116          * tried after normal allocation failed. So a small OSC pool won't
3117          * cause much performance degression in most of cases.
3118          */
3119         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3120
3121         atomic_set(&osc_pool_req_count, 0);
3122         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3123                                           ptlrpc_add_rqs_to_pool);
3124
3125         if (osc_rq_pool != NULL)
3126                 GOTO(out, rc);
3127         rc = -ENOMEM;
3128 out_type:
3129         class_unregister_type(LUSTRE_OSC_NAME);
3130 out_kmem:
3131         lu_kmem_fini(osc_caches);
3132 out:
3133         RETURN(rc);
3134 }
3135
3136 static void __exit osc_exit(void)
3137 {
3138         remove_shrinker(osc_cache_shrinker);
3139         class_unregister_type(LUSTRE_OSC_NAME);
3140         lu_kmem_fini(osc_caches);
3141         ptlrpc_free_rq_pool(osc_rq_pool);
3142 }
3143
3144 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3145 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3146 MODULE_VERSION(LUSTRE_VERSION_STRING);
3147 MODULE_LICENSE("GPL");
3148
3149 module_init(osc_init);
3150 module_exit(osc_exit);