Whamcloud - gitweb
LU-1757 brw: add short io osc/ost transfer.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 #define osc_grant_args osc_brw_async_args
62
63 struct osc_setattr_args {
64         struct obdo             *sa_oa;
65         obd_enqueue_update_f     sa_upcall;
66         void                    *sa_cookie;
67 };
68
69 struct osc_fsync_args {
70         struct osc_object       *fa_obj;
71         struct obdo             *fa_oa;
72         obd_enqueue_update_f    fa_upcall;
73         void                    *fa_cookie;
74 };
75
76 struct osc_ladvise_args {
77         struct obdo             *la_oa;
78         obd_enqueue_update_f     la_upcall;
79         void                    *la_cookie;
80 };
81
82 struct osc_enqueue_args {
83         struct obd_export       *oa_exp;
84         enum ldlm_type          oa_type;
85         enum ldlm_mode          oa_mode;
86         __u64                   *oa_flags;
87         osc_enqueue_upcall_f    oa_upcall;
88         void                    *oa_cookie;
89         struct ost_lvb          *oa_lvb;
90         struct lustre_handle    oa_lockh;
91         bool                    oa_speculative;
92 };
93
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
96                          void *data, int rc);
97
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
99 {
100         struct ost_body *body;
101
102         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
103         LASSERT(body);
104
105         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
106 }
107
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
109                        struct obdo *oa)
110 {
111         struct ptlrpc_request   *req;
112         struct ost_body         *body;
113         int                      rc;
114
115         ENTRY;
116         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
117         if (req == NULL)
118                 RETURN(-ENOMEM);
119
120         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
121         if (rc) {
122                 ptlrpc_request_free(req);
123                 RETURN(rc);
124         }
125
126         osc_pack_req_body(req, oa);
127
128         ptlrpc_request_set_replen(req);
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc)
132                 GOTO(out, rc);
133
134         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
135         if (body == NULL)
136                 GOTO(out, rc = -EPROTO);
137
138         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
140
141         oa->o_blksize = cli_brw_size(exp->exp_obd);
142         oa->o_valid |= OBD_MD_FLBLKSZ;
143
144         EXIT;
145 out:
146         ptlrpc_req_finished(req);
147
148         return rc;
149 }
150
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
152                        struct obdo *oa)
153 {
154         struct ptlrpc_request   *req;
155         struct ost_body         *body;
156         int                      rc;
157
158         ENTRY;
159         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
160
161         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
162         if (req == NULL)
163                 RETURN(-ENOMEM);
164
165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
166         if (rc) {
167                 ptlrpc_request_free(req);
168                 RETURN(rc);
169         }
170
171         osc_pack_req_body(req, oa);
172
173         ptlrpc_request_set_replen(req);
174
175         rc = ptlrpc_queue_wait(req);
176         if (rc)
177                 GOTO(out, rc);
178
179         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
180         if (body == NULL)
181                 GOTO(out, rc = -EPROTO);
182
183         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
184
185         EXIT;
186 out:
187         ptlrpc_req_finished(req);
188
189         RETURN(rc);
190 }
191
192 static int osc_setattr_interpret(const struct lu_env *env,
193                                  struct ptlrpc_request *req,
194                                  struct osc_setattr_args *sa, int rc)
195 {
196         struct ost_body *body;
197         ENTRY;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body == NULL)
204                 GOTO(out, rc = -EPROTO);
205
206         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
207                              &body->oa);
208 out:
209         rc = sa->sa_upcall(sa->sa_cookie, rc);
210         RETURN(rc);
211 }
212
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214                       obd_enqueue_update_f upcall, void *cookie,
215                       struct ptlrpc_request_set *rqset)
216 {
217         struct ptlrpc_request   *req;
218         struct osc_setattr_args *sa;
219         int                      rc;
220
221         ENTRY;
222
223         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
224         if (req == NULL)
225                 RETURN(-ENOMEM);
226
227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228         if (rc) {
229                 ptlrpc_request_free(req);
230                 RETURN(rc);
231         }
232
233         osc_pack_req_body(req, oa);
234
235         ptlrpc_request_set_replen(req);
236
237         /* do mds to ost setattr asynchronously */
238         if (!rqset) {
239                 /* Do not wait for response. */
240                 ptlrpcd_add_req(req);
241         } else {
242                 req->rq_interpret_reply =
243                         (ptlrpc_interpterer_t)osc_setattr_interpret;
244
245                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246                 sa = ptlrpc_req_async_args(req);
247                 sa->sa_oa = oa;
248                 sa->sa_upcall = upcall;
249                 sa->sa_cookie = cookie;
250
251                 if (rqset == PTLRPCD_SET)
252                         ptlrpcd_add_req(req);
253                 else
254                         ptlrpc_set_add_req(rqset, req);
255         }
256
257         RETURN(0);
258 }
259
260 static int osc_ladvise_interpret(const struct lu_env *env,
261                                  struct ptlrpc_request *req,
262                                  void *arg, int rc)
263 {
264         struct osc_ladvise_args *la = arg;
265         struct ost_body *body;
266         ENTRY;
267
268         if (rc != 0)
269                 GOTO(out, rc);
270
271         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
272         if (body == NULL)
273                 GOTO(out, rc = -EPROTO);
274
275         *la->la_oa = body->oa;
276 out:
277         rc = la->la_upcall(la->la_cookie, rc);
278         RETURN(rc);
279 }
280
281 /**
282  * If rqset is NULL, do not wait for response. Upcall and cookie could also
283  * be NULL in this case
284  */
285 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
286                      struct ladvise_hdr *ladvise_hdr,
287                      obd_enqueue_update_f upcall, void *cookie,
288                      struct ptlrpc_request_set *rqset)
289 {
290         struct ptlrpc_request   *req;
291         struct ost_body         *body;
292         struct osc_ladvise_args *la;
293         int                      rc;
294         struct lu_ladvise       *req_ladvise;
295         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
296         int                      num_advise = ladvise_hdr->lah_count;
297         struct ladvise_hdr      *req_ladvise_hdr;
298         ENTRY;
299
300         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
301         if (req == NULL)
302                 RETURN(-ENOMEM);
303
304         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
305                              num_advise * sizeof(*ladvise));
306         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
307         if (rc != 0) {
308                 ptlrpc_request_free(req);
309                 RETURN(rc);
310         }
311         req->rq_request_portal = OST_IO_PORTAL;
312         ptlrpc_at_set_req_timeout(req);
313
314         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
315         LASSERT(body);
316         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
317                              oa);
318
319         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
320                                                  &RMF_OST_LADVISE_HDR);
321         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
322
323         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
324         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
325         ptlrpc_request_set_replen(req);
326
327         if (rqset == NULL) {
328                 /* Do not wait for response. */
329                 ptlrpcd_add_req(req);
330                 RETURN(0);
331         }
332
333         req->rq_interpret_reply = osc_ladvise_interpret;
334         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
335         la = ptlrpc_req_async_args(req);
336         la->la_oa = oa;
337         la->la_upcall = upcall;
338         la->la_cookie = cookie;
339
340         if (rqset == PTLRPCD_SET)
341                 ptlrpcd_add_req(req);
342         else
343                 ptlrpc_set_add_req(rqset, req);
344
345         RETURN(0);
346 }
347
348 static int osc_create(const struct lu_env *env, struct obd_export *exp,
349                       struct obdo *oa)
350 {
351         struct ptlrpc_request *req;
352         struct ost_body       *body;
353         int                    rc;
354         ENTRY;
355
356         LASSERT(oa != NULL);
357         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
358         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
359
360         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
361         if (req == NULL)
362                 GOTO(out, rc = -ENOMEM);
363
364         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
365         if (rc) {
366                 ptlrpc_request_free(req);
367                 GOTO(out, rc);
368         }
369
370         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
371         LASSERT(body);
372
373         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
374
375         ptlrpc_request_set_replen(req);
376
377         rc = ptlrpc_queue_wait(req);
378         if (rc)
379                 GOTO(out_req, rc);
380
381         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
382         if (body == NULL)
383                 GOTO(out_req, rc = -EPROTO);
384
385         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
386         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
387
388         oa->o_blksize = cli_brw_size(exp->exp_obd);
389         oa->o_valid |= OBD_MD_FLBLKSZ;
390
391         CDEBUG(D_HA, "transno: %lld\n",
392                lustre_msg_get_transno(req->rq_repmsg));
393 out_req:
394         ptlrpc_req_finished(req);
395 out:
396         RETURN(rc);
397 }
398
399 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
400                    obd_enqueue_update_f upcall, void *cookie,
401                    struct ptlrpc_request_set *rqset)
402 {
403         struct ptlrpc_request   *req;
404         struct osc_setattr_args *sa;
405         struct ost_body         *body;
406         int                      rc;
407         ENTRY;
408
409         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
410         if (req == NULL)
411                 RETURN(-ENOMEM);
412
413         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
414         if (rc) {
415                 ptlrpc_request_free(req);
416                 RETURN(rc);
417         }
418         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
419         ptlrpc_at_set_req_timeout(req);
420
421         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
422         LASSERT(body);
423         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
424
425         ptlrpc_request_set_replen(req);
426
427         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
428         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
429         sa = ptlrpc_req_async_args(req);
430         sa->sa_oa = oa;
431         sa->sa_upcall = upcall;
432         sa->sa_cookie = cookie;
433         if (rqset == PTLRPCD_SET)
434                 ptlrpcd_add_req(req);
435         else
436                 ptlrpc_set_add_req(rqset, req);
437
438         RETURN(0);
439 }
440
441 static int osc_sync_interpret(const struct lu_env *env,
442                               struct ptlrpc_request *req,
443                               void *arg, int rc)
444 {
445         struct osc_fsync_args   *fa = arg;
446         struct ost_body         *body;
447         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
448         unsigned long           valid = 0;
449         struct cl_object        *obj;
450         ENTRY;
451
452         if (rc != 0)
453                 GOTO(out, rc);
454
455         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456         if (body == NULL) {
457                 CERROR("can't unpack ost_body\n");
458                 GOTO(out, rc = -EPROTO);
459         }
460
461         *fa->fa_oa = body->oa;
462         obj = osc2cl(fa->fa_obj);
463
464         /* Update osc object's blocks attribute */
465         cl_object_attr_lock(obj);
466         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
467                 attr->cat_blocks = body->oa.o_blocks;
468                 valid |= CAT_BLOCKS;
469         }
470
471         if (valid != 0)
472                 cl_object_attr_update(env, obj, attr, valid);
473         cl_object_attr_unlock(obj);
474
475 out:
476         rc = fa->fa_upcall(fa->fa_cookie, rc);
477         RETURN(rc);
478 }
479
480 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
481                   obd_enqueue_update_f upcall, void *cookie,
482                   struct ptlrpc_request_set *rqset)
483 {
484         struct obd_export     *exp = osc_export(obj);
485         struct ptlrpc_request *req;
486         struct ost_body       *body;
487         struct osc_fsync_args *fa;
488         int                    rc;
489         ENTRY;
490
491         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
492         if (req == NULL)
493                 RETURN(-ENOMEM);
494
495         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
496         if (rc) {
497                 ptlrpc_request_free(req);
498                 RETURN(rc);
499         }
500
501         /* overload the size and blocks fields in the oa with start/end */
502         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
503         LASSERT(body);
504         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
505
506         ptlrpc_request_set_replen(req);
507         req->rq_interpret_reply = osc_sync_interpret;
508
509         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
510         fa = ptlrpc_req_async_args(req);
511         fa->fa_obj = obj;
512         fa->fa_oa = oa;
513         fa->fa_upcall = upcall;
514         fa->fa_cookie = cookie;
515
516         if (rqset == PTLRPCD_SET)
517                 ptlrpcd_add_req(req);
518         else
519                 ptlrpc_set_add_req(rqset, req);
520
521         RETURN (0);
522 }
523
524 /* Find and cancel locally locks matched by @mode in the resource found by
525  * @objid. Found locks are added into @cancel list. Returns the amount of
526  * locks added to @cancels list. */
527 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
528                                    struct list_head *cancels,
529                                    enum ldlm_mode mode, __u64 lock_flags)
530 {
531         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
532         struct ldlm_res_id res_id;
533         struct ldlm_resource *res;
534         int count;
535         ENTRY;
536
537         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
538          * export) but disabled through procfs (flag in NS).
539          *
540          * This distinguishes from a case when ELC is not supported originally,
541          * when we still want to cancel locks in advance and just cancel them
542          * locally, without sending any RPC. */
543         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
544                 RETURN(0);
545
546         ostid_build_res_name(&oa->o_oi, &res_id);
547         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
548         if (IS_ERR(res))
549                 RETURN(0);
550
551         LDLM_RESOURCE_ADDREF(res);
552         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
553                                            lock_flags, 0, NULL);
554         LDLM_RESOURCE_DELREF(res);
555         ldlm_resource_putref(res);
556         RETURN(count);
557 }
558
559 static int osc_destroy_interpret(const struct lu_env *env,
560                                  struct ptlrpc_request *req, void *data,
561                                  int rc)
562 {
563         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
564
565         atomic_dec(&cli->cl_destroy_in_flight);
566         wake_up(&cli->cl_destroy_waitq);
567         return 0;
568 }
569
570 static int osc_can_send_destroy(struct client_obd *cli)
571 {
572         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
573             cli->cl_max_rpcs_in_flight) {
574                 /* The destroy request can be sent */
575                 return 1;
576         }
577         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
578             cli->cl_max_rpcs_in_flight) {
579                 /*
580                  * The counter has been modified between the two atomic
581                  * operations.
582                  */
583                 wake_up(&cli->cl_destroy_waitq);
584         }
585         return 0;
586 }
587
588 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
589                        struct obdo *oa)
590 {
591         struct client_obd     *cli = &exp->exp_obd->u.cli;
592         struct ptlrpc_request *req;
593         struct ost_body       *body;
594         struct list_head       cancels = LIST_HEAD_INIT(cancels);
595         int rc, count;
596         ENTRY;
597
598         if (!oa) {
599                 CDEBUG(D_INFO, "oa NULL\n");
600                 RETURN(-EINVAL);
601         }
602
603         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
604                                         LDLM_FL_DISCARD_DATA);
605
606         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
607         if (req == NULL) {
608                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
609                 RETURN(-ENOMEM);
610         }
611
612         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
613                                0, &cancels, count);
614         if (rc) {
615                 ptlrpc_request_free(req);
616                 RETURN(rc);
617         }
618
619         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
620         ptlrpc_at_set_req_timeout(req);
621
622         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
623         LASSERT(body);
624         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
625
626         ptlrpc_request_set_replen(req);
627
628         req->rq_interpret_reply = osc_destroy_interpret;
629         if (!osc_can_send_destroy(cli)) {
630                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
631
632                 /*
633                  * Wait until the number of on-going destroy RPCs drops
634                  * under max_rpc_in_flight
635                  */
636                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
637                                             osc_can_send_destroy(cli), &lwi);
638                 if (rc) {
639                         ptlrpc_req_finished(req);
640                         RETURN(rc);
641                 }
642         }
643
644         /* Do not wait for response */
645         ptlrpcd_add_req(req);
646         RETURN(0);
647 }
648
649 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
650                                 long writing_bytes)
651 {
652         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
653
654         LASSERT(!(oa->o_valid & bits));
655
656         oa->o_valid |= bits;
657         spin_lock(&cli->cl_loi_list_lock);
658         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
659                 oa->o_dirty = cli->cl_dirty_grant;
660         else
661                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
662         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
663                      cli->cl_dirty_max_pages)) {
664                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
665                        cli->cl_dirty_pages, cli->cl_dirty_transit,
666                        cli->cl_dirty_max_pages);
667                 oa->o_undirty = 0;
668         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
669                             atomic_long_read(&obd_dirty_transit_pages) >
670                             (long)(obd_max_dirty_pages + 1))) {
671                 /* The atomic_read() allowing the atomic_inc() are
672                  * not covered by a lock thus they may safely race and trip
673                  * this CERROR() unless we add in a small fudge factor (+1). */
674                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
675                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
676                        atomic_long_read(&obd_dirty_transit_pages),
677                        obd_max_dirty_pages);
678                 oa->o_undirty = 0;
679         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
680                             0x7fffffff)) {
681                 CERROR("dirty %lu - dirty_max %lu too big???\n",
682                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
683                 oa->o_undirty = 0;
684         } else {
685                 unsigned long nrpages;
686
687                 nrpages = cli->cl_max_pages_per_rpc;
688                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
689                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
690                 oa->o_undirty = nrpages << PAGE_SHIFT;
691                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
692                                  GRANT_PARAM)) {
693                         int nrextents;
694
695                         /* take extent tax into account when asking for more
696                          * grant space */
697                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
698                                      cli->cl_max_extent_pages;
699                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
700                 }
701         }
702         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
703         oa->o_dropped = cli->cl_lost_grant;
704         cli->cl_lost_grant = 0;
705         spin_unlock(&cli->cl_loi_list_lock);
706         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
707                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
708 }
709
710 void osc_update_next_shrink(struct client_obd *cli)
711 {
712         cli->cl_next_shrink_grant = ktime_get_seconds() +
713                                     cli->cl_grant_shrink_interval;
714
715         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
716                cli->cl_next_shrink_grant);
717 }
718
719 static void __osc_update_grant(struct client_obd *cli, u64 grant)
720 {
721         spin_lock(&cli->cl_loi_list_lock);
722         cli->cl_avail_grant += grant;
723         spin_unlock(&cli->cl_loi_list_lock);
724 }
725
726 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
727 {
728         if (body->oa.o_valid & OBD_MD_FLGRANT) {
729                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
730                 __osc_update_grant(cli, body->oa.o_grant);
731         }
732 }
733
734 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
735                               u32 keylen, void *key,
736                               u32 vallen, void *val,
737                               struct ptlrpc_request_set *set);
738
739 static int osc_shrink_grant_interpret(const struct lu_env *env,
740                                       struct ptlrpc_request *req,
741                                       void *aa, int rc)
742 {
743         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
744         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
745         struct ost_body *body;
746
747         if (rc != 0) {
748                 __osc_update_grant(cli, oa->o_grant);
749                 GOTO(out, rc);
750         }
751
752         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
753         LASSERT(body);
754         osc_update_grant(cli, body);
755 out:
756         OBDO_FREE(oa);
757         return rc;
758 }
759
760 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
761 {
762         spin_lock(&cli->cl_loi_list_lock);
763         oa->o_grant = cli->cl_avail_grant / 4;
764         cli->cl_avail_grant -= oa->o_grant;
765         spin_unlock(&cli->cl_loi_list_lock);
766         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
767                 oa->o_valid |= OBD_MD_FLFLAGS;
768                 oa->o_flags = 0;
769         }
770         oa->o_flags |= OBD_FL_SHRINK_GRANT;
771         osc_update_next_shrink(cli);
772 }
773
774 /* Shrink the current grant, either from some large amount to enough for a
775  * full set of in-flight RPCs, or if we have already shrunk to that limit
776  * then to enough for a single RPC.  This avoids keeping more grant than
777  * needed, and avoids shrinking the grant piecemeal. */
778 static int osc_shrink_grant(struct client_obd *cli)
779 {
780         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
781                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
782
783         spin_lock(&cli->cl_loi_list_lock);
784         if (cli->cl_avail_grant <= target_bytes)
785                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
786         spin_unlock(&cli->cl_loi_list_lock);
787
788         return osc_shrink_grant_to_target(cli, target_bytes);
789 }
790
791 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
792 {
793         int                     rc = 0;
794         struct ost_body        *body;
795         ENTRY;
796
797         spin_lock(&cli->cl_loi_list_lock);
798         /* Don't shrink if we are already above or below the desired limit
799          * We don't want to shrink below a single RPC, as that will negatively
800          * impact block allocation and long-term performance. */
801         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
802                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
803
804         if (target_bytes >= cli->cl_avail_grant) {
805                 spin_unlock(&cli->cl_loi_list_lock);
806                 RETURN(0);
807         }
808         spin_unlock(&cli->cl_loi_list_lock);
809
810         OBD_ALLOC_PTR(body);
811         if (!body)
812                 RETURN(-ENOMEM);
813
814         osc_announce_cached(cli, &body->oa, 0);
815
816         spin_lock(&cli->cl_loi_list_lock);
817         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
818         cli->cl_avail_grant = target_bytes;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
821                 body->oa.o_valid |= OBD_MD_FLFLAGS;
822                 body->oa.o_flags = 0;
823         }
824         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826
827         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
828                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
829                                 sizeof(*body), body, NULL);
830         if (rc != 0)
831                 __osc_update_grant(cli, body->oa.o_grant);
832         OBD_FREE_PTR(body);
833         RETURN(rc);
834 }
835
836 static int osc_should_shrink_grant(struct client_obd *client)
837 {
838         time64_t next_shrink = client->cl_next_shrink_grant;
839
840         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
841              OBD_CONNECT_GRANT_SHRINK) == 0)
842                 return 0;
843
844         if (ktime_get_seconds() >= next_shrink - 5) {
845                 /* Get the current RPC size directly, instead of going via:
846                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
847                  * Keep comment here so that it can be found by searching. */
848                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
849
850                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
851                     client->cl_avail_grant > brw_size)
852                         return 1;
853                 else
854                         osc_update_next_shrink(client);
855         }
856         return 0;
857 }
858
859 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
860 {
861         struct client_obd *client;
862
863         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
864                 if (osc_should_shrink_grant(client))
865                         osc_shrink_grant(client);
866         }
867         return 0;
868 }
869
870 static int osc_add_shrink_grant(struct client_obd *client)
871 {
872         int rc;
873
874         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
875                                        TIMEOUT_GRANT,
876                                        osc_grant_shrink_grant_cb, NULL,
877                                        &client->cl_grant_shrink_list);
878         if (rc) {
879                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
880                 return rc;
881         }
882         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
883         osc_update_next_shrink(client);
884         return 0;
885 }
886
887 static int osc_del_shrink_grant(struct client_obd *client)
888 {
889         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
890                                          TIMEOUT_GRANT);
891 }
892
893 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
894 {
895         /*
896          * ocd_grant is the total grant amount we're expect to hold: if we've
897          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
898          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
899          * dirty.
900          *
901          * race is tolerable here: if we're evicted, but imp_state already
902          * left EVICTED state, then cl_dirty_pages must be 0 already.
903          */
904         spin_lock(&cli->cl_loi_list_lock);
905         cli->cl_avail_grant = ocd->ocd_grant;
906         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
907                 cli->cl_avail_grant -= cli->cl_reserved_grant;
908                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
909                         cli->cl_avail_grant -= cli->cl_dirty_grant;
910                 else
911                         cli->cl_avail_grant -=
912                                         cli->cl_dirty_pages << PAGE_SHIFT;
913         }
914
915         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
916                 u64 size;
917                 int chunk_mask;
918
919                 /* overhead for each extent insertion */
920                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
921                 /* determine the appropriate chunk size used by osc_extent. */
922                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
923                                           ocd->ocd_grant_blkbits);
924                 /* max_pages_per_rpc must be chunk aligned */
925                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
926                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
927                                              ~chunk_mask) & chunk_mask;
928                 /* determine maximum extent size, in #pages */
929                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
930                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
931                 if (cli->cl_max_extent_pages == 0)
932                         cli->cl_max_extent_pages = 1;
933         } else {
934                 cli->cl_grant_extent_tax = 0;
935                 cli->cl_chunkbits = PAGE_SHIFT;
936                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
937         }
938         spin_unlock(&cli->cl_loi_list_lock);
939
940         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
941                 "chunk bits: %d cl_max_extent_pages: %d\n",
942                 cli_name(cli),
943                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
944                 cli->cl_max_extent_pages);
945
946         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
947             list_empty(&cli->cl_grant_shrink_list))
948                 osc_add_shrink_grant(cli);
949 }
950
951 /* We assume that the reason this OSC got a short read is because it read
952  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
953  * via the LOV, and it _knows_ it's reading inside the file, it's just that
954  * this stripe never got written at or beyond this stripe offset yet. */
955 static void handle_short_read(int nob_read, size_t page_count,
956                               struct brw_page **pga)
957 {
958         char *ptr;
959         int i = 0;
960
961         /* skip bytes read OK */
962         while (nob_read > 0) {
963                 LASSERT (page_count > 0);
964
965                 if (pga[i]->count > nob_read) {
966                         /* EOF inside this page */
967                         ptr = kmap(pga[i]->pg) +
968                                 (pga[i]->off & ~PAGE_MASK);
969                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
970                         kunmap(pga[i]->pg);
971                         page_count--;
972                         i++;
973                         break;
974                 }
975
976                 nob_read -= pga[i]->count;
977                 page_count--;
978                 i++;
979         }
980
981         /* zero remaining pages */
982         while (page_count-- > 0) {
983                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
984                 memset(ptr, 0, pga[i]->count);
985                 kunmap(pga[i]->pg);
986                 i++;
987         }
988 }
989
990 static int check_write_rcs(struct ptlrpc_request *req,
991                            int requested_nob, int niocount,
992                            size_t page_count, struct brw_page **pga)
993 {
994         int     i;
995         __u32   *remote_rcs;
996
997         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
998                                                   sizeof(*remote_rcs) *
999                                                   niocount);
1000         if (remote_rcs == NULL) {
1001                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1002                 return(-EPROTO);
1003         }
1004
1005         /* return error if any niobuf was in error */
1006         for (i = 0; i < niocount; i++) {
1007                 if ((int)remote_rcs[i] < 0)
1008                         return(remote_rcs[i]);
1009
1010                 if (remote_rcs[i] != 0) {
1011                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1012                                 i, remote_rcs[i], req);
1013                         return(-EPROTO);
1014                 }
1015         }
1016         if (req->rq_bulk != NULL &&
1017             req->rq_bulk->bd_nob_transferred != requested_nob) {
1018                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1019                        req->rq_bulk->bd_nob_transferred, requested_nob);
1020                 return(-EPROTO);
1021         }
1022
1023         return (0);
1024 }
1025
1026 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1027 {
1028         if (p1->flag != p2->flag) {
1029                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1030                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1031                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1032
1033                 /* warn if we try to combine flags that we don't know to be
1034                  * safe to combine */
1035                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1036                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1037                               "report this at https://jira.hpdd.intel.com/\n",
1038                               p1->flag, p2->flag);
1039                 }
1040                 return 0;
1041         }
1042
1043         return (p1->off + p1->count == p2->off);
1044 }
1045
1046 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1047                              struct brw_page **pga, int opc,
1048                              enum cksum_types cksum_type)
1049 {
1050         u32                             cksum;
1051         int                             i = 0;
1052         struct cfs_crypto_hash_desc     *hdesc;
1053         unsigned int                    bufsize;
1054         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1055
1056         LASSERT(pg_count > 0);
1057
1058         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1059         if (IS_ERR(hdesc)) {
1060                 CERROR("Unable to initialize checksum hash %s\n",
1061                        cfs_crypto_hash_name(cfs_alg));
1062                 return PTR_ERR(hdesc);
1063         }
1064
1065         while (nob > 0 && pg_count > 0) {
1066                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1067
1068                 /* corrupt the data before we compute the checksum, to
1069                  * simulate an OST->client data error */
1070                 if (i == 0 && opc == OST_READ &&
1071                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1072                         unsigned char *ptr = kmap(pga[i]->pg);
1073                         int off = pga[i]->off & ~PAGE_MASK;
1074
1075                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1076                         kunmap(pga[i]->pg);
1077                 }
1078                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1079                                             pga[i]->off & ~PAGE_MASK,
1080                                             count);
1081                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1082                                (int)(pga[i]->off & ~PAGE_MASK));
1083
1084                 nob -= pga[i]->count;
1085                 pg_count--;
1086                 i++;
1087         }
1088
1089         bufsize = sizeof(cksum);
1090         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1091
1092         /* For sending we only compute the wrong checksum instead
1093          * of corrupting the data so it is still correct on a redo */
1094         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1095                 cksum++;
1096
1097         return cksum;
1098 }
1099
1100 static int
1101 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1102                      u32 page_count, struct brw_page **pga,
1103                      struct ptlrpc_request **reqp, int resend)
1104 {
1105         struct ptlrpc_request   *req;
1106         struct ptlrpc_bulk_desc *desc;
1107         struct ost_body         *body;
1108         struct obd_ioobj        *ioobj;
1109         struct niobuf_remote    *niobuf;
1110         int niocount, i, requested_nob, opc, rc, short_io_size;
1111         struct osc_brw_async_args *aa;
1112         struct req_capsule      *pill;
1113         struct brw_page *pg_prev;
1114         void *short_io_buf;
1115
1116         ENTRY;
1117         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1118                 RETURN(-ENOMEM); /* Recoverable */
1119         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1120                 RETURN(-EINVAL); /* Fatal */
1121
1122         if ((cmd & OBD_BRW_WRITE) != 0) {
1123                 opc = OST_WRITE;
1124                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1125                                                 osc_rq_pool,
1126                                                 &RQF_OST_BRW_WRITE);
1127         } else {
1128                 opc = OST_READ;
1129                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1130         }
1131         if (req == NULL)
1132                 RETURN(-ENOMEM);
1133
1134         for (niocount = i = 1; i < page_count; i++) {
1135                 if (!can_merge_pages(pga[i - 1], pga[i]))
1136                         niocount++;
1137         }
1138
1139         pill = &req->rq_pill;
1140         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1141                              sizeof(*ioobj));
1142         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1143                              niocount * sizeof(*niobuf));
1144
1145         for (i = 0; i < page_count; i++)
1146                 short_io_size += pga[i]->count;
1147
1148         /* Check if we can do a short io. */
1149         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1150             imp_connect_shortio(cli->cl_import)))
1151                 short_io_size = 0;
1152
1153         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1154                              opc == OST_READ ? 0 : short_io_size);
1155         if (opc == OST_READ)
1156                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1157                                      short_io_size);
1158
1159         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1160         if (rc) {
1161                 ptlrpc_request_free(req);
1162                 RETURN(rc);
1163         }
1164         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1165         ptlrpc_at_set_req_timeout(req);
1166
1167         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1168          * retry logic */
1169         req->rq_no_retry_einprogress = 1;
1170
1171         if (short_io_size != 0) {
1172                 desc = NULL;
1173                 short_io_buf = NULL;
1174                 goto no_bulk;
1175         }
1176
1177         desc = ptlrpc_prep_bulk_imp(req, page_count,
1178                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1179                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1180                         PTLRPC_BULK_PUT_SINK) |
1181                         PTLRPC_BULK_BUF_KIOV,
1182                 OST_BULK_PORTAL,
1183                 &ptlrpc_bulk_kiov_pin_ops);
1184
1185         if (desc == NULL)
1186                 GOTO(out, rc = -ENOMEM);
1187         /* NB request now owns desc and will free it when it gets freed */
1188 no_bulk:
1189         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1190         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1191         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1192         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1193
1194         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1195
1196         obdo_to_ioobj(oa, ioobj);
1197         ioobj->ioo_bufcnt = niocount;
1198         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1199          * that might be send for this request.  The actual number is decided
1200          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1201          * "max - 1" for old client compatibility sending "0", and also so the
1202          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1203         if (desc != NULL)
1204                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1205         else /* short io */
1206                 ioobj_max_brw_set(ioobj, 0);
1207
1208         if (short_io_size != 0) {
1209                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1210                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1211                         body->oa.o_flags = 0;
1212                 }
1213                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1214                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1215                        short_io_size);
1216                 if (opc == OST_WRITE) {
1217                         short_io_buf = req_capsule_client_get(pill,
1218                                                               &RMF_SHORT_IO);
1219                         LASSERT(short_io_buf != NULL);
1220                 }
1221         }
1222
1223         LASSERT(page_count > 0);
1224         pg_prev = pga[0];
1225         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1226                 struct brw_page *pg = pga[i];
1227                 int poff = pg->off & ~PAGE_MASK;
1228
1229                 LASSERT(pg->count > 0);
1230                 /* make sure there is no gap in the middle of page array */
1231                 LASSERTF(page_count == 1 ||
1232                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1233                           ergo(i > 0 && i < page_count - 1,
1234                                poff == 0 && pg->count == PAGE_SIZE)   &&
1235                           ergo(i == page_count - 1, poff == 0)),
1236                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1237                          i, page_count, pg, pg->off, pg->count);
1238                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1239                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1240                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1241                          i, page_count,
1242                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1243                          pg_prev->pg, page_private(pg_prev->pg),
1244                          pg_prev->pg->index, pg_prev->off);
1245                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1246                         (pg->flag & OBD_BRW_SRVLOCK));
1247                 if (short_io_size != 0 && opc == OST_WRITE) {
1248                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1249
1250                         LASSERT(short_io_size >= requested_nob + pg->count);
1251                         memcpy(short_io_buf + requested_nob,
1252                                ptr + poff,
1253                                pg->count);
1254                         ll_kunmap_atomic(ptr, KM_USER0);
1255                 } else if (short_io_size == 0) {
1256                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1257                                                          pg->count);
1258                 }
1259                 requested_nob += pg->count;
1260
1261                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1262                         niobuf--;
1263                         niobuf->rnb_len += pg->count;
1264                 } else {
1265                         niobuf->rnb_offset = pg->off;
1266                         niobuf->rnb_len    = pg->count;
1267                         niobuf->rnb_flags  = pg->flag;
1268                 }
1269                 pg_prev = pg;
1270         }
1271
1272         LASSERTF((void *)(niobuf - niocount) ==
1273                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1274                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1275                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1276
1277         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1278         if (resend) {
1279                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1280                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1281                         body->oa.o_flags = 0;
1282                 }
1283                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1284         }
1285
1286         if (osc_should_shrink_grant(cli))
1287                 osc_shrink_grant_local(cli, &body->oa);
1288
1289         /* size[REQ_REC_OFF] still sizeof (*body) */
1290         if (opc == OST_WRITE) {
1291                 if (cli->cl_checksum &&
1292                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1293                         /* store cl_cksum_type in a local variable since
1294                          * it can be changed via lprocfs */
1295                         enum cksum_types cksum_type = cli->cl_cksum_type;
1296
1297                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1298                                 body->oa.o_flags = 0;
1299
1300                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1301                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1302                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1303                                                              page_count, pga,
1304                                                              OST_WRITE,
1305                                                              cksum_type);
1306                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1307                                body->oa.o_cksum);
1308                         /* save this in 'oa', too, for later checking */
1309                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1310                         oa->o_flags |= cksum_type_pack(cksum_type);
1311                 } else {
1312                         /* clear out the checksum flag, in case this is a
1313                          * resend but cl_checksum is no longer set. b=11238 */
1314                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1315                 }
1316                 oa->o_cksum = body->oa.o_cksum;
1317                 /* 1 RC per niobuf */
1318                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1319                                      sizeof(__u32) * niocount);
1320         } else {
1321                 if (cli->cl_checksum &&
1322                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1323                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1324                                 body->oa.o_flags = 0;
1325                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1326                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1327                 }
1328
1329                 /* Client cksum has been already copied to wire obdo in previous
1330                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1331                  * resent due to cksum error, this will allow Server to
1332                  * check+dump pages on its side */
1333         }
1334         ptlrpc_request_set_replen(req);
1335
1336         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1337         aa = ptlrpc_req_async_args(req);
1338         aa->aa_oa = oa;
1339         aa->aa_requested_nob = requested_nob;
1340         aa->aa_nio_count = niocount;
1341         aa->aa_page_count = page_count;
1342         aa->aa_resends = 0;
1343         aa->aa_ppga = pga;
1344         aa->aa_cli = cli;
1345         INIT_LIST_HEAD(&aa->aa_oaps);
1346
1347         *reqp = req;
1348         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1349         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1350                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1351                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1352         RETURN(0);
1353
1354  out:
1355         ptlrpc_req_finished(req);
1356         RETURN(rc);
1357 }
1358
1359 char dbgcksum_file_name[PATH_MAX];
1360
1361 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1362                                 struct brw_page **pga, __u32 server_cksum,
1363                                 __u32 client_cksum)
1364 {
1365         struct file *filp;
1366         int rc, i;
1367         unsigned int len;
1368         char *buf;
1369         mm_segment_t oldfs;
1370
1371         /* will only keep dump of pages on first error for the same range in
1372          * file/fid, not during the resends/retries. */
1373         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1374                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1375                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1376                   libcfs_debug_file_path_arr :
1377                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1378                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1379                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1380                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1381                  pga[0]->off,
1382                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1383                  client_cksum, server_cksum);
1384         filp = filp_open(dbgcksum_file_name,
1385                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1386         if (IS_ERR(filp)) {
1387                 rc = PTR_ERR(filp);
1388                 if (rc == -EEXIST)
1389                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1390                                "checksum error: rc = %d\n", dbgcksum_file_name,
1391                                rc);
1392                 else
1393                         CERROR("%s: can't open to dump pages with checksum "
1394                                "error: rc = %d\n", dbgcksum_file_name, rc);
1395                 return;
1396         }
1397
1398         oldfs = get_fs();
1399         set_fs(KERNEL_DS);
1400         for (i = 0; i < page_count; i++) {
1401                 len = pga[i]->count;
1402                 buf = kmap(pga[i]->pg);
1403                 while (len != 0) {
1404                         rc = vfs_write(filp, (__force const char __user *)buf,
1405                                        len, &filp->f_pos);
1406                         if (rc < 0) {
1407                                 CERROR("%s: wanted to write %u but got %d "
1408                                        "error\n", dbgcksum_file_name, len, rc);
1409                                 break;
1410                         }
1411                         len -= rc;
1412                         buf += rc;
1413                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1414                                dbgcksum_file_name, rc);
1415                 }
1416                 kunmap(pga[i]->pg);
1417         }
1418         set_fs(oldfs);
1419
1420         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1421         if (rc)
1422                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1423         filp_close(filp, NULL);
1424         return;
1425 }
1426
1427 static int
1428 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1429                                 __u32 client_cksum, __u32 server_cksum,
1430                                 struct osc_brw_async_args *aa)
1431 {
1432         __u32 new_cksum;
1433         char *msg;
1434         enum cksum_types cksum_type;
1435
1436         if (server_cksum == client_cksum) {
1437                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1438                 return 0;
1439         }
1440
1441         if (aa->aa_cli->cl_checksum_dump)
1442                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1443                                     server_cksum, client_cksum);
1444
1445         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1446                                        oa->o_flags : 0);
1447         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1448                                       aa->aa_ppga, OST_WRITE, cksum_type);
1449
1450         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1451                 msg = "the server did not use the checksum type specified in "
1452                       "the original request - likely a protocol problem";
1453         else if (new_cksum == server_cksum)
1454                 msg = "changed on the client after we checksummed it - "
1455                       "likely false positive due to mmap IO (bug 11742)";
1456         else if (new_cksum == client_cksum)
1457                 msg = "changed in transit before arrival at OST";
1458         else
1459                 msg = "changed in transit AND doesn't match the original - "
1460                       "likely false positive due to mmap IO (bug 11742)";
1461
1462         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1463                            DFID " object "DOSTID" extent [%llu-%llu], original "
1464                            "client csum %x (type %x), server csum %x (type %x),"
1465                            " client csum now %x\n",
1466                            aa->aa_cli->cl_import->imp_obd->obd_name,
1467                            msg, libcfs_nid2str(peer->nid),
1468                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1469                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1470                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1471                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1472                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1473                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1474                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1475                            server_cksum, cksum_type, new_cksum);
1476         return 1;
1477 }
1478
1479 /* Note rc enters this function as number of bytes transferred */
1480 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1481 {
1482         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1483         const struct lnet_process_id *peer =
1484                         &req->rq_import->imp_connection->c_peer;
1485         struct client_obd *cli = aa->aa_cli;
1486         struct ost_body *body;
1487         u32 client_cksum = 0;
1488         ENTRY;
1489
1490         if (rc < 0 && rc != -EDQUOT) {
1491                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1492                 RETURN(rc);
1493         }
1494
1495         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1496         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1497         if (body == NULL) {
1498                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1499                 RETURN(-EPROTO);
1500         }
1501
1502         /* set/clear over quota flag for a uid/gid/projid */
1503         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1504             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1505                 unsigned qid[LL_MAXQUOTAS] = {
1506                                          body->oa.o_uid, body->oa.o_gid,
1507                                          body->oa.o_projid };
1508                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1509                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1510                        body->oa.o_valid, body->oa.o_flags);
1511                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1512                                        body->oa.o_flags);
1513         }
1514
1515         osc_update_grant(cli, body);
1516
1517         if (rc < 0)
1518                 RETURN(rc);
1519
1520         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1521                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1522
1523         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1524                 if (rc > 0) {
1525                         CERROR("Unexpected +ve rc %d\n", rc);
1526                         RETURN(-EPROTO);
1527                 }
1528
1529                 if (req->rq_bulk != NULL &&
1530                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1531                         RETURN(-EAGAIN);
1532
1533                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1534                     check_write_checksum(&body->oa, peer, client_cksum,
1535                                          body->oa.o_cksum, aa))
1536                         RETURN(-EAGAIN);
1537
1538                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1539                                      aa->aa_page_count, aa->aa_ppga);
1540                 GOTO(out, rc);
1541         }
1542
1543         /* The rest of this function executes only for OST_READs */
1544
1545         if (req->rq_bulk == NULL) {
1546                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1547                                           RCL_SERVER);
1548                 LASSERT(rc == req->rq_status);
1549         } else {
1550                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1551                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1552         }
1553         if (rc < 0)
1554                 GOTO(out, rc = -EAGAIN);
1555
1556         if (rc > aa->aa_requested_nob) {
1557                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1558                        aa->aa_requested_nob);
1559                 RETURN(-EPROTO);
1560         }
1561
1562         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1563                 CERROR ("Unexpected rc %d (%d transferred)\n",
1564                         rc, req->rq_bulk->bd_nob_transferred);
1565                 return (-EPROTO);
1566         }
1567
1568         if (req->rq_bulk == NULL) {
1569                 /* short io */
1570                 int nob, pg_count, i = 0;
1571                 unsigned char *buf;
1572
1573                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1574                 pg_count = aa->aa_page_count;
1575                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1576                                                    rc);
1577                 nob = rc;
1578                 while (nob > 0 && pg_count > 0) {
1579                         unsigned char *ptr;
1580                         int count = aa->aa_ppga[i]->count > nob ?
1581                                     nob : aa->aa_ppga[i]->count;
1582
1583                         CDEBUG(D_CACHE, "page %p count %d\n",
1584                                aa->aa_ppga[i]->pg, count);
1585                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1586                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1587                                count);
1588                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1589
1590                         buf += count;
1591                         nob -= count;
1592                         i++;
1593                         pg_count--;
1594                 }
1595         }
1596
1597         if (rc < aa->aa_requested_nob)
1598                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1599
1600         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1601                 static int cksum_counter;
1602                 u32        server_cksum = body->oa.o_cksum;
1603                 char      *via = "";
1604                 char      *router = "";
1605                 enum cksum_types cksum_type;
1606
1607                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1608                                                body->oa.o_flags : 0);
1609                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1610                                                  aa->aa_ppga, OST_READ,
1611                                                  cksum_type);
1612
1613                 if (req->rq_bulk != NULL &&
1614                     peer->nid != req->rq_bulk->bd_sender) {
1615                         via = " via ";
1616                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1617                 }
1618
1619                 if (server_cksum != client_cksum) {
1620                         struct ost_body *clbody;
1621                         u32 page_count = aa->aa_page_count;
1622
1623                         clbody = req_capsule_client_get(&req->rq_pill,
1624                                                         &RMF_OST_BODY);
1625                         if (cli->cl_checksum_dump)
1626                                 dump_all_bulk_pages(&clbody->oa, page_count,
1627                                                     aa->aa_ppga, server_cksum,
1628                                                     client_cksum);
1629
1630                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1631                                            "%s%s%s inode "DFID" object "DOSTID
1632                                            " extent [%llu-%llu], client %x, "
1633                                            "server %x, cksum_type %x\n",
1634                                            req->rq_import->imp_obd->obd_name,
1635                                            libcfs_nid2str(peer->nid),
1636                                            via, router,
1637                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1638                                                 clbody->oa.o_parent_seq : 0ULL,
1639                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1640                                                 clbody->oa.o_parent_oid : 0,
1641                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1642                                                 clbody->oa.o_parent_ver : 0,
1643                                            POSTID(&body->oa.o_oi),
1644                                            aa->aa_ppga[0]->off,
1645                                            aa->aa_ppga[page_count-1]->off +
1646                                            aa->aa_ppga[page_count-1]->count - 1,
1647                                            client_cksum, server_cksum,
1648                                            cksum_type);
1649                         cksum_counter = 0;
1650                         aa->aa_oa->o_cksum = client_cksum;
1651                         rc = -EAGAIN;
1652                 } else {
1653                         cksum_counter++;
1654                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1655                         rc = 0;
1656                 }
1657         } else if (unlikely(client_cksum)) {
1658                 static int cksum_missed;
1659
1660                 cksum_missed++;
1661                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1662                         CERROR("Checksum %u requested from %s but not sent\n",
1663                                cksum_missed, libcfs_nid2str(peer->nid));
1664         } else {
1665                 rc = 0;
1666         }
1667 out:
1668         if (rc >= 0)
1669                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1670                                      aa->aa_oa, &body->oa);
1671
1672         RETURN(rc);
1673 }
1674
1675 static int osc_brw_redo_request(struct ptlrpc_request *request,
1676                                 struct osc_brw_async_args *aa, int rc)
1677 {
1678         struct ptlrpc_request *new_req;
1679         struct osc_brw_async_args *new_aa;
1680         struct osc_async_page *oap;
1681         ENTRY;
1682
1683         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1684                   "redo for recoverable error %d", rc);
1685
1686         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1687                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1688                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1689                                   aa->aa_ppga, &new_req, 1);
1690         if (rc)
1691                 RETURN(rc);
1692
1693         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1694                 if (oap->oap_request != NULL) {
1695                         LASSERTF(request == oap->oap_request,
1696                                  "request %p != oap_request %p\n",
1697                                  request, oap->oap_request);
1698                         if (oap->oap_interrupted) {
1699                                 ptlrpc_req_finished(new_req);
1700                                 RETURN(-EINTR);
1701                         }
1702                 }
1703         }
1704         /* New request takes over pga and oaps from old request.
1705          * Note that copying a list_head doesn't work, need to move it... */
1706         aa->aa_resends++;
1707         new_req->rq_interpret_reply = request->rq_interpret_reply;
1708         new_req->rq_async_args = request->rq_async_args;
1709         new_req->rq_commit_cb = request->rq_commit_cb;
1710         /* cap resend delay to the current request timeout, this is similar to
1711          * what ptlrpc does (see after_reply()) */
1712         if (aa->aa_resends > new_req->rq_timeout)
1713                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1714         else
1715                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1716         new_req->rq_generation_set = 1;
1717         new_req->rq_import_generation = request->rq_import_generation;
1718
1719         new_aa = ptlrpc_req_async_args(new_req);
1720
1721         INIT_LIST_HEAD(&new_aa->aa_oaps);
1722         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1723         INIT_LIST_HEAD(&new_aa->aa_exts);
1724         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1725         new_aa->aa_resends = aa->aa_resends;
1726
1727         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1728                 if (oap->oap_request) {
1729                         ptlrpc_req_finished(oap->oap_request);
1730                         oap->oap_request = ptlrpc_request_addref(new_req);
1731                 }
1732         }
1733
1734         /* XXX: This code will run into problem if we're going to support
1735          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1736          * and wait for all of them to be finished. We should inherit request
1737          * set from old request. */
1738         ptlrpcd_add_req(new_req);
1739
1740         DEBUG_REQ(D_INFO, new_req, "new request");
1741         RETURN(0);
1742 }
1743
1744 /*
1745  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1746  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1747  * fine for our small page arrays and doesn't require allocation.  its an
1748  * insertion sort that swaps elements that are strides apart, shrinking the
1749  * stride down until its '1' and the array is sorted.
1750  */
1751 static void sort_brw_pages(struct brw_page **array, int num)
1752 {
1753         int stride, i, j;
1754         struct brw_page *tmp;
1755
1756         if (num == 1)
1757                 return;
1758         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1759                 ;
1760
1761         do {
1762                 stride /= 3;
1763                 for (i = stride ; i < num ; i++) {
1764                         tmp = array[i];
1765                         j = i;
1766                         while (j >= stride && array[j - stride]->off > tmp->off) {
1767                                 array[j] = array[j - stride];
1768                                 j -= stride;
1769                         }
1770                         array[j] = tmp;
1771                 }
1772         } while (stride > 1);
1773 }
1774
1775 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1776 {
1777         LASSERT(ppga != NULL);
1778         OBD_FREE(ppga, sizeof(*ppga) * count);
1779 }
1780
1781 static int brw_interpret(const struct lu_env *env,
1782                          struct ptlrpc_request *req, void *data, int rc)
1783 {
1784         struct osc_brw_async_args *aa = data;
1785         struct osc_extent *ext;
1786         struct osc_extent *tmp;
1787         struct client_obd *cli = aa->aa_cli;
1788         unsigned long           transferred = 0;
1789         ENTRY;
1790
1791         rc = osc_brw_fini_request(req, rc);
1792         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1793         /* When server return -EINPROGRESS, client should always retry
1794          * regardless of the number of times the bulk was resent already. */
1795         if (osc_recoverable_error(rc)) {
1796                 if (req->rq_import_generation !=
1797                     req->rq_import->imp_generation) {
1798                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1799                                ""DOSTID", rc = %d.\n",
1800                                req->rq_import->imp_obd->obd_name,
1801                                POSTID(&aa->aa_oa->o_oi), rc);
1802                 } else if (rc == -EINPROGRESS ||
1803                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1804                         rc = osc_brw_redo_request(req, aa, rc);
1805                 } else {
1806                         CERROR("%s: too many resent retries for object: "
1807                                "%llu:%llu, rc = %d.\n",
1808                                req->rq_import->imp_obd->obd_name,
1809                                POSTID(&aa->aa_oa->o_oi), rc);
1810                 }
1811
1812                 if (rc == 0)
1813                         RETURN(0);
1814                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1815                         rc = -EIO;
1816         }
1817
1818         if (rc == 0) {
1819                 struct obdo *oa = aa->aa_oa;
1820                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1821                 unsigned long valid = 0;
1822                 struct cl_object *obj;
1823                 struct osc_async_page *last;
1824
1825                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1826                 obj = osc2cl(last->oap_obj);
1827
1828                 cl_object_attr_lock(obj);
1829                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1830                         attr->cat_blocks = oa->o_blocks;
1831                         valid |= CAT_BLOCKS;
1832                 }
1833                 if (oa->o_valid & OBD_MD_FLMTIME) {
1834                         attr->cat_mtime = oa->o_mtime;
1835                         valid |= CAT_MTIME;
1836                 }
1837                 if (oa->o_valid & OBD_MD_FLATIME) {
1838                         attr->cat_atime = oa->o_atime;
1839                         valid |= CAT_ATIME;
1840                 }
1841                 if (oa->o_valid & OBD_MD_FLCTIME) {
1842                         attr->cat_ctime = oa->o_ctime;
1843                         valid |= CAT_CTIME;
1844                 }
1845
1846                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1847                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1848                         loff_t last_off = last->oap_count + last->oap_obj_off +
1849                                 last->oap_page_off;
1850
1851                         /* Change file size if this is an out of quota or
1852                          * direct IO write and it extends the file size */
1853                         if (loi->loi_lvb.lvb_size < last_off) {
1854                                 attr->cat_size = last_off;
1855                                 valid |= CAT_SIZE;
1856                         }
1857                         /* Extend KMS if it's not a lockless write */
1858                         if (loi->loi_kms < last_off &&
1859                             oap2osc_page(last)->ops_srvlock == 0) {
1860                                 attr->cat_kms = last_off;
1861                                 valid |= CAT_KMS;
1862                         }
1863                 }
1864
1865                 if (valid != 0)
1866                         cl_object_attr_update(env, obj, attr, valid);
1867                 cl_object_attr_unlock(obj);
1868         }
1869         OBDO_FREE(aa->aa_oa);
1870
1871         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1872                 osc_inc_unstable_pages(req);
1873
1874         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1875                 list_del_init(&ext->oe_link);
1876                 osc_extent_finish(env, ext, 1, rc);
1877         }
1878         LASSERT(list_empty(&aa->aa_exts));
1879         LASSERT(list_empty(&aa->aa_oaps));
1880
1881         transferred = (req->rq_bulk == NULL ? /* short io */
1882                        aa->aa_requested_nob :
1883                        req->rq_bulk->bd_nob_transferred);
1884
1885         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1886         ptlrpc_lprocfs_brw(req, transferred);
1887
1888         spin_lock(&cli->cl_loi_list_lock);
1889         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1890          * is called so we know whether to go to sync BRWs or wait for more
1891          * RPCs to complete */
1892         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1893                 cli->cl_w_in_flight--;
1894         else
1895                 cli->cl_r_in_flight--;
1896         osc_wake_cache_waiters(cli);
1897         spin_unlock(&cli->cl_loi_list_lock);
1898
1899         osc_io_unplug(env, cli, NULL);
1900         RETURN(rc);
1901 }
1902
1903 static void brw_commit(struct ptlrpc_request *req)
1904 {
1905         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1906          * this called via the rq_commit_cb, I need to ensure
1907          * osc_dec_unstable_pages is still called. Otherwise unstable
1908          * pages may be leaked. */
1909         spin_lock(&req->rq_lock);
1910         if (likely(req->rq_unstable)) {
1911                 req->rq_unstable = 0;
1912                 spin_unlock(&req->rq_lock);
1913
1914                 osc_dec_unstable_pages(req);
1915         } else {
1916                 req->rq_committed = 1;
1917                 spin_unlock(&req->rq_lock);
1918         }
1919 }
1920
1921 /**
1922  * Build an RPC by the list of extent @ext_list. The caller must ensure
1923  * that the total pages in this list are NOT over max pages per RPC.
1924  * Extents in the list must be in OES_RPC state.
1925  */
1926 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1927                   struct list_head *ext_list, int cmd)
1928 {
1929         struct ptlrpc_request           *req = NULL;
1930         struct osc_extent               *ext;
1931         struct brw_page                 **pga = NULL;
1932         struct osc_brw_async_args       *aa = NULL;
1933         struct obdo                     *oa = NULL;
1934         struct osc_async_page           *oap;
1935         struct osc_object               *obj = NULL;
1936         struct cl_req_attr              *crattr = NULL;
1937         loff_t                          starting_offset = OBD_OBJECT_EOF;
1938         loff_t                          ending_offset = 0;
1939         int                             mpflag = 0;
1940         int                             mem_tight = 0;
1941         int                             page_count = 0;
1942         bool                            soft_sync = false;
1943         bool                            interrupted = false;
1944         int                             i;
1945         int                             grant = 0;
1946         int                             rc;
1947         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1948         struct ost_body                 *body;
1949         ENTRY;
1950         LASSERT(!list_empty(ext_list));
1951
1952         /* add pages into rpc_list to build BRW rpc */
1953         list_for_each_entry(ext, ext_list, oe_link) {
1954                 LASSERT(ext->oe_state == OES_RPC);
1955                 mem_tight |= ext->oe_memalloc;
1956                 grant += ext->oe_grants;
1957                 page_count += ext->oe_nr_pages;
1958                 if (obj == NULL)
1959                         obj = ext->oe_obj;
1960         }
1961
1962         soft_sync = osc_over_unstable_soft_limit(cli);
1963         if (mem_tight)
1964                 mpflag = cfs_memory_pressure_get_and_set();
1965
1966         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1967         if (pga == NULL)
1968                 GOTO(out, rc = -ENOMEM);
1969
1970         OBDO_ALLOC(oa);
1971         if (oa == NULL)
1972                 GOTO(out, rc = -ENOMEM);
1973
1974         i = 0;
1975         list_for_each_entry(ext, ext_list, oe_link) {
1976                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1977                         if (mem_tight)
1978                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1979                         if (soft_sync)
1980                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1981                         pga[i] = &oap->oap_brw_page;
1982                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1983                         i++;
1984
1985                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1986                         if (starting_offset == OBD_OBJECT_EOF ||
1987                             starting_offset > oap->oap_obj_off)
1988                                 starting_offset = oap->oap_obj_off;
1989                         else
1990                                 LASSERT(oap->oap_page_off == 0);
1991                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1992                                 ending_offset = oap->oap_obj_off +
1993                                                 oap->oap_count;
1994                         else
1995                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1996                                         PAGE_SIZE);
1997                         if (oap->oap_interrupted)
1998                                 interrupted = true;
1999                 }
2000         }
2001
2002         /* first page in the list */
2003         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2004
2005         crattr = &osc_env_info(env)->oti_req_attr;
2006         memset(crattr, 0, sizeof(*crattr));
2007         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2008         crattr->cra_flags = ~0ULL;
2009         crattr->cra_page = oap2cl_page(oap);
2010         crattr->cra_oa = oa;
2011         cl_req_attr_set(env, osc2cl(obj), crattr);
2012
2013         if (cmd == OBD_BRW_WRITE)
2014                 oa->o_grant_used = grant;
2015
2016         sort_brw_pages(pga, page_count);
2017         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2018         if (rc != 0) {
2019                 CERROR("prep_req failed: %d\n", rc);
2020                 GOTO(out, rc);
2021         }
2022
2023         req->rq_commit_cb = brw_commit;
2024         req->rq_interpret_reply = brw_interpret;
2025         req->rq_memalloc = mem_tight != 0;
2026         oap->oap_request = ptlrpc_request_addref(req);
2027         if (interrupted && !req->rq_intr)
2028                 ptlrpc_mark_interrupted(req);
2029
2030         /* Need to update the timestamps after the request is built in case
2031          * we race with setattr (locally or in queue at OST).  If OST gets
2032          * later setattr before earlier BRW (as determined by the request xid),
2033          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2034          * way to do this in a single call.  bug 10150 */
2035         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2036         crattr->cra_oa = &body->oa;
2037         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2038         cl_req_attr_set(env, osc2cl(obj), crattr);
2039         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2040
2041         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2042         aa = ptlrpc_req_async_args(req);
2043         INIT_LIST_HEAD(&aa->aa_oaps);
2044         list_splice_init(&rpc_list, &aa->aa_oaps);
2045         INIT_LIST_HEAD(&aa->aa_exts);
2046         list_splice_init(ext_list, &aa->aa_exts);
2047
2048         spin_lock(&cli->cl_loi_list_lock);
2049         starting_offset >>= PAGE_SHIFT;
2050         if (cmd == OBD_BRW_READ) {
2051                 cli->cl_r_in_flight++;
2052                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2053                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2054                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2055                                       starting_offset + 1);
2056         } else {
2057                 cli->cl_w_in_flight++;
2058                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2059                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2060                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2061                                       starting_offset + 1);
2062         }
2063         spin_unlock(&cli->cl_loi_list_lock);
2064
2065         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2066                   page_count, aa, cli->cl_r_in_flight,
2067                   cli->cl_w_in_flight);
2068         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2069
2070         ptlrpcd_add_req(req);
2071         rc = 0;
2072         EXIT;
2073
2074 out:
2075         if (mem_tight != 0)
2076                 cfs_memory_pressure_restore(mpflag);
2077
2078         if (rc != 0) {
2079                 LASSERT(req == NULL);
2080
2081                 if (oa)
2082                         OBDO_FREE(oa);
2083                 if (pga)
2084                         OBD_FREE(pga, sizeof(*pga) * page_count);
2085                 /* this should happen rarely and is pretty bad, it makes the
2086                  * pending list not follow the dirty order */
2087                 while (!list_empty(ext_list)) {
2088                         ext = list_entry(ext_list->next, struct osc_extent,
2089                                          oe_link);
2090                         list_del_init(&ext->oe_link);
2091                         osc_extent_finish(env, ext, 0, rc);
2092                 }
2093         }
2094         RETURN(rc);
2095 }
2096
2097 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2098 {
2099         int set = 0;
2100
2101         LASSERT(lock != NULL);
2102
2103         lock_res_and_lock(lock);
2104
2105         if (lock->l_ast_data == NULL)
2106                 lock->l_ast_data = data;
2107         if (lock->l_ast_data == data)
2108                 set = 1;
2109
2110         unlock_res_and_lock(lock);
2111
2112         return set;
2113 }
2114
2115 static int osc_enqueue_fini(struct ptlrpc_request *req,
2116                             osc_enqueue_upcall_f upcall, void *cookie,
2117                             struct lustre_handle *lockh, enum ldlm_mode mode,
2118                             __u64 *flags, bool speculative, int errcode)
2119 {
2120         bool intent = *flags & LDLM_FL_HAS_INTENT;
2121         int rc;
2122         ENTRY;
2123
2124         /* The request was created before ldlm_cli_enqueue call. */
2125         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2126                 struct ldlm_reply *rep;
2127
2128                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2129                 LASSERT(rep != NULL);
2130
2131                 rep->lock_policy_res1 =
2132                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2133                 if (rep->lock_policy_res1)
2134                         errcode = rep->lock_policy_res1;
2135                 if (!speculative)
2136                         *flags |= LDLM_FL_LVB_READY;
2137         } else if (errcode == ELDLM_OK) {
2138                 *flags |= LDLM_FL_LVB_READY;
2139         }
2140
2141         /* Call the update callback. */
2142         rc = (*upcall)(cookie, lockh, errcode);
2143
2144         /* release the reference taken in ldlm_cli_enqueue() */
2145         if (errcode == ELDLM_LOCK_MATCHED)
2146                 errcode = ELDLM_OK;
2147         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2148                 ldlm_lock_decref(lockh, mode);
2149
2150         RETURN(rc);
2151 }
2152
2153 static int osc_enqueue_interpret(const struct lu_env *env,
2154                                  struct ptlrpc_request *req,
2155                                  struct osc_enqueue_args *aa, int rc)
2156 {
2157         struct ldlm_lock *lock;
2158         struct lustre_handle *lockh = &aa->oa_lockh;
2159         enum ldlm_mode mode = aa->oa_mode;
2160         struct ost_lvb *lvb = aa->oa_lvb;
2161         __u32 lvb_len = sizeof(*lvb);
2162         __u64 flags = 0;
2163
2164         ENTRY;
2165
2166         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2167          * be valid. */
2168         lock = ldlm_handle2lock(lockh);
2169         LASSERTF(lock != NULL,
2170                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2171                  lockh->cookie, req, aa);
2172
2173         /* Take an additional reference so that a blocking AST that
2174          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2175          * to arrive after an upcall has been executed by
2176          * osc_enqueue_fini(). */
2177         ldlm_lock_addref(lockh, mode);
2178
2179         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2180         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2181
2182         /* Let CP AST to grant the lock first. */
2183         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2184
2185         if (aa->oa_speculative) {
2186                 LASSERT(aa->oa_lvb == NULL);
2187                 LASSERT(aa->oa_flags == NULL);
2188                 aa->oa_flags = &flags;
2189         }
2190
2191         /* Complete obtaining the lock procedure. */
2192         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2193                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2194                                    lockh, rc);
2195         /* Complete osc stuff. */
2196         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2197                               aa->oa_flags, aa->oa_speculative, rc);
2198
2199         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2200
2201         ldlm_lock_decref(lockh, mode);
2202         LDLM_LOCK_PUT(lock);
2203         RETURN(rc);
2204 }
2205
2206 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2207
2208 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2209  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2210  * other synchronous requests, however keeping some locks and trying to obtain
2211  * others may take a considerable amount of time in a case of ost failure; and
2212  * when other sync requests do not get released lock from a client, the client
2213  * is evicted from the cluster -- such scenarious make the life difficult, so
2214  * release locks just after they are obtained. */
2215 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2216                      __u64 *flags, union ldlm_policy_data *policy,
2217                      struct ost_lvb *lvb, int kms_valid,
2218                      osc_enqueue_upcall_f upcall, void *cookie,
2219                      struct ldlm_enqueue_info *einfo,
2220                      struct ptlrpc_request_set *rqset, int async,
2221                      bool speculative)
2222 {
2223         struct obd_device *obd = exp->exp_obd;
2224         struct lustre_handle lockh = { 0 };
2225         struct ptlrpc_request *req = NULL;
2226         int intent = *flags & LDLM_FL_HAS_INTENT;
2227         __u64 match_flags = *flags;
2228         enum ldlm_mode mode;
2229         int rc;
2230         ENTRY;
2231
2232         /* Filesystem lock extents are extended to page boundaries so that
2233          * dealing with the page cache is a little smoother.  */
2234         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2235         policy->l_extent.end |= ~PAGE_MASK;
2236
2237         /*
2238          * kms is not valid when either object is completely fresh (so that no
2239          * locks are cached), or object was evicted. In the latter case cached
2240          * lock cannot be used, because it would prime inode state with
2241          * potentially stale LVB.
2242          */
2243         if (!kms_valid)
2244                 goto no_match;
2245
2246         /* Next, search for already existing extent locks that will cover us */
2247         /* If we're trying to read, we also search for an existing PW lock.  The
2248          * VFS and page cache already protect us locally, so lots of readers/
2249          * writers can share a single PW lock.
2250          *
2251          * There are problems with conversion deadlocks, so instead of
2252          * converting a read lock to a write lock, we'll just enqueue a new
2253          * one.
2254          *
2255          * At some point we should cancel the read lock instead of making them
2256          * send us a blocking callback, but there are problems with canceling
2257          * locks out from other users right now, too. */
2258         mode = einfo->ei_mode;
2259         if (einfo->ei_mode == LCK_PR)
2260                 mode |= LCK_PW;
2261         /* Normal lock requests must wait for the LVB to be ready before
2262          * matching a lock; speculative lock requests do not need to,
2263          * because they will not actually use the lock. */
2264         if (!speculative)
2265                 match_flags |= LDLM_FL_LVB_READY;
2266         if (intent != 0)
2267                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2268         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2269                                einfo->ei_type, policy, mode, &lockh, 0);
2270         if (mode) {
2271                 struct ldlm_lock *matched;
2272
2273                 if (*flags & LDLM_FL_TEST_LOCK)
2274                         RETURN(ELDLM_OK);
2275
2276                 matched = ldlm_handle2lock(&lockh);
2277                 if (speculative) {
2278                         /* This DLM lock request is speculative, and does not
2279                          * have an associated IO request. Therefore if there
2280                          * is already a DLM lock, it wll just inform the
2281                          * caller to cancel the request for this stripe.*/
2282                         lock_res_and_lock(matched);
2283                         if (ldlm_extent_equal(&policy->l_extent,
2284                             &matched->l_policy_data.l_extent))
2285                                 rc = -EEXIST;
2286                         else
2287                                 rc = -ECANCELED;
2288                         unlock_res_and_lock(matched);
2289
2290                         ldlm_lock_decref(&lockh, mode);
2291                         LDLM_LOCK_PUT(matched);
2292                         RETURN(rc);
2293                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2294                         *flags |= LDLM_FL_LVB_READY;
2295
2296                         /* We already have a lock, and it's referenced. */
2297                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2298
2299                         ldlm_lock_decref(&lockh, mode);
2300                         LDLM_LOCK_PUT(matched);
2301                         RETURN(ELDLM_OK);
2302                 } else {
2303                         ldlm_lock_decref(&lockh, mode);
2304                         LDLM_LOCK_PUT(matched);
2305                 }
2306         }
2307
2308 no_match:
2309         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2310                 RETURN(-ENOLCK);
2311
2312         if (intent) {
2313                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2314                                            &RQF_LDLM_ENQUEUE_LVB);
2315                 if (req == NULL)
2316                         RETURN(-ENOMEM);
2317
2318                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2319                 if (rc) {
2320                         ptlrpc_request_free(req);
2321                         RETURN(rc);
2322                 }
2323
2324                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2325                                      sizeof *lvb);
2326                 ptlrpc_request_set_replen(req);
2327         }
2328
2329         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2330         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2331
2332         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2333                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2334         if (async) {
2335                 if (!rc) {
2336                         struct osc_enqueue_args *aa;
2337                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2338                         aa = ptlrpc_req_async_args(req);
2339                         aa->oa_exp         = exp;
2340                         aa->oa_mode        = einfo->ei_mode;
2341                         aa->oa_type        = einfo->ei_type;
2342                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2343                         aa->oa_upcall      = upcall;
2344                         aa->oa_cookie      = cookie;
2345                         aa->oa_speculative = speculative;
2346                         if (!speculative) {
2347                                 aa->oa_flags  = flags;
2348                                 aa->oa_lvb    = lvb;
2349                         } else {
2350                                 /* speculative locks are essentially to enqueue
2351                                  * a DLM lock  in advance, so we don't care
2352                                  * about the result of the enqueue. */
2353                                 aa->oa_lvb    = NULL;
2354                                 aa->oa_flags  = NULL;
2355                         }
2356
2357                         req->rq_interpret_reply =
2358                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2359                         if (rqset == PTLRPCD_SET)
2360                                 ptlrpcd_add_req(req);
2361                         else
2362                                 ptlrpc_set_add_req(rqset, req);
2363                 } else if (intent) {
2364                         ptlrpc_req_finished(req);
2365                 }
2366                 RETURN(rc);
2367         }
2368
2369         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2370                               flags, speculative, rc);
2371         if (intent)
2372                 ptlrpc_req_finished(req);
2373
2374         RETURN(rc);
2375 }
2376
2377 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2378                    enum ldlm_type type, union ldlm_policy_data *policy,
2379                    enum ldlm_mode mode, __u64 *flags, void *data,
2380                    struct lustre_handle *lockh, int unref)
2381 {
2382         struct obd_device *obd = exp->exp_obd;
2383         __u64 lflags = *flags;
2384         enum ldlm_mode rc;
2385         ENTRY;
2386
2387         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2388                 RETURN(-EIO);
2389
2390         /* Filesystem lock extents are extended to page boundaries so that
2391          * dealing with the page cache is a little smoother */
2392         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2393         policy->l_extent.end |= ~PAGE_MASK;
2394
2395         /* Next, search for already existing extent locks that will cover us */
2396         /* If we're trying to read, we also search for an existing PW lock.  The
2397          * VFS and page cache already protect us locally, so lots of readers/
2398          * writers can share a single PW lock. */
2399         rc = mode;
2400         if (mode == LCK_PR)
2401                 rc |= LCK_PW;
2402         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2403                              res_id, type, policy, rc, lockh, unref);
2404         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2405                 RETURN(rc);
2406
2407         if (data != NULL) {
2408                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2409
2410                 LASSERT(lock != NULL);
2411                 if (!osc_set_lock_data(lock, data)) {
2412                         ldlm_lock_decref(lockh, rc);
2413                         rc = 0;
2414                 }
2415                 LDLM_LOCK_PUT(lock);
2416         }
2417         RETURN(rc);
2418 }
2419
2420 static int osc_statfs_interpret(const struct lu_env *env,
2421                                 struct ptlrpc_request *req,
2422                                 struct osc_async_args *aa, int rc)
2423 {
2424         struct obd_statfs *msfs;
2425         ENTRY;
2426
2427         if (rc == -EBADR)
2428                 /* The request has in fact never been sent
2429                  * due to issues at a higher level (LOV).
2430                  * Exit immediately since the caller is
2431                  * aware of the problem and takes care
2432                  * of the clean up */
2433                  RETURN(rc);
2434
2435         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2436             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2437                 GOTO(out, rc = 0);
2438
2439         if (rc != 0)
2440                 GOTO(out, rc);
2441
2442         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2443         if (msfs == NULL) {
2444                 GOTO(out, rc = -EPROTO);
2445         }
2446
2447         *aa->aa_oi->oi_osfs = *msfs;
2448 out:
2449         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2450         RETURN(rc);
2451 }
2452
2453 static int osc_statfs_async(struct obd_export *exp,
2454                             struct obd_info *oinfo, __u64 max_age,
2455                             struct ptlrpc_request_set *rqset)
2456 {
2457         struct obd_device     *obd = class_exp2obd(exp);
2458         struct ptlrpc_request *req;
2459         struct osc_async_args *aa;
2460         int                    rc;
2461         ENTRY;
2462
2463         /* We could possibly pass max_age in the request (as an absolute
2464          * timestamp or a "seconds.usec ago") so the target can avoid doing
2465          * extra calls into the filesystem if that isn't necessary (e.g.
2466          * during mount that would help a bit).  Having relative timestamps
2467          * is not so great if request processing is slow, while absolute
2468          * timestamps are not ideal because they need time synchronization. */
2469         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2470         if (req == NULL)
2471                 RETURN(-ENOMEM);
2472
2473         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2474         if (rc) {
2475                 ptlrpc_request_free(req);
2476                 RETURN(rc);
2477         }
2478         ptlrpc_request_set_replen(req);
2479         req->rq_request_portal = OST_CREATE_PORTAL;
2480         ptlrpc_at_set_req_timeout(req);
2481
2482         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2483                 /* procfs requests not want stat in wait for avoid deadlock */
2484                 req->rq_no_resend = 1;
2485                 req->rq_no_delay = 1;
2486         }
2487
2488         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2489         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2490         aa = ptlrpc_req_async_args(req);
2491         aa->aa_oi = oinfo;
2492
2493         ptlrpc_set_add_req(rqset, req);
2494         RETURN(0);
2495 }
2496
2497 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2498                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2499 {
2500         struct obd_device     *obd = class_exp2obd(exp);
2501         struct obd_statfs     *msfs;
2502         struct ptlrpc_request *req;
2503         struct obd_import     *imp = NULL;
2504         int rc;
2505         ENTRY;
2506
2507         /*Since the request might also come from lprocfs, so we need
2508          *sync this with client_disconnect_export Bug15684*/
2509         down_read(&obd->u.cli.cl_sem);
2510         if (obd->u.cli.cl_import)
2511                 imp = class_import_get(obd->u.cli.cl_import);
2512         up_read(&obd->u.cli.cl_sem);
2513         if (!imp)
2514                 RETURN(-ENODEV);
2515
2516         /* We could possibly pass max_age in the request (as an absolute
2517          * timestamp or a "seconds.usec ago") so the target can avoid doing
2518          * extra calls into the filesystem if that isn't necessary (e.g.
2519          * during mount that would help a bit).  Having relative timestamps
2520          * is not so great if request processing is slow, while absolute
2521          * timestamps are not ideal because they need time synchronization. */
2522         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2523
2524         class_import_put(imp);
2525
2526         if (req == NULL)
2527                 RETURN(-ENOMEM);
2528
2529         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2530         if (rc) {
2531                 ptlrpc_request_free(req);
2532                 RETURN(rc);
2533         }
2534         ptlrpc_request_set_replen(req);
2535         req->rq_request_portal = OST_CREATE_PORTAL;
2536         ptlrpc_at_set_req_timeout(req);
2537
2538         if (flags & OBD_STATFS_NODELAY) {
2539                 /* procfs requests not want stat in wait for avoid deadlock */
2540                 req->rq_no_resend = 1;
2541                 req->rq_no_delay = 1;
2542         }
2543
2544         rc = ptlrpc_queue_wait(req);
2545         if (rc)
2546                 GOTO(out, rc);
2547
2548         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2549         if (msfs == NULL) {
2550                 GOTO(out, rc = -EPROTO);
2551         }
2552
2553         *osfs = *msfs;
2554
2555         EXIT;
2556  out:
2557         ptlrpc_req_finished(req);
2558         return rc;
2559 }
2560
2561 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2562                          void *karg, void __user *uarg)
2563 {
2564         struct obd_device *obd = exp->exp_obd;
2565         struct obd_ioctl_data *data = karg;
2566         int err = 0;
2567         ENTRY;
2568
2569         if (!try_module_get(THIS_MODULE)) {
2570                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2571                        module_name(THIS_MODULE));
2572                 return -EINVAL;
2573         }
2574         switch (cmd) {
2575         case OBD_IOC_CLIENT_RECOVER:
2576                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2577                                             data->ioc_inlbuf1, 0);
2578                 if (err > 0)
2579                         err = 0;
2580                 GOTO(out, err);
2581         case IOC_OSC_SET_ACTIVE:
2582                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2583                                                data->ioc_offset);
2584                 GOTO(out, err);
2585         case OBD_IOC_PING_TARGET:
2586                 err = ptlrpc_obd_ping(obd);
2587                 GOTO(out, err);
2588         default:
2589                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2590                        cmd, current_comm());
2591                 GOTO(out, err = -ENOTTY);
2592         }
2593 out:
2594         module_put(THIS_MODULE);
2595         return err;
2596 }
2597
2598 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2599                               u32 keylen, void *key,
2600                               u32 vallen, void *val,
2601                               struct ptlrpc_request_set *set)
2602 {
2603         struct ptlrpc_request *req;
2604         struct obd_device     *obd = exp->exp_obd;
2605         struct obd_import     *imp = class_exp2cliimp(exp);
2606         char                  *tmp;
2607         int                    rc;
2608         ENTRY;
2609
2610         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2611
2612         if (KEY_IS(KEY_CHECKSUM)) {
2613                 if (vallen != sizeof(int))
2614                         RETURN(-EINVAL);
2615                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2616                 RETURN(0);
2617         }
2618
2619         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2620                 sptlrpc_conf_client_adapt(obd);
2621                 RETURN(0);
2622         }
2623
2624         if (KEY_IS(KEY_FLUSH_CTX)) {
2625                 sptlrpc_import_flush_my_ctx(imp);
2626                 RETURN(0);
2627         }
2628
2629         if (KEY_IS(KEY_CACHE_SET)) {
2630                 struct client_obd *cli = &obd->u.cli;
2631
2632                 LASSERT(cli->cl_cache == NULL); /* only once */
2633                 cli->cl_cache = (struct cl_client_cache *)val;
2634                 cl_cache_incref(cli->cl_cache);
2635                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2636
2637                 /* add this osc into entity list */
2638                 LASSERT(list_empty(&cli->cl_lru_osc));
2639                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2640                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2641                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2642
2643                 RETURN(0);
2644         }
2645
2646         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2647                 struct client_obd *cli = &obd->u.cli;
2648                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2649                 long target = *(long *)val;
2650
2651                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2652                 *(long *)val -= nr;
2653                 RETURN(0);
2654         }
2655
2656         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2657                 RETURN(-EINVAL);
2658
2659         /* We pass all other commands directly to OST. Since nobody calls osc
2660            methods directly and everybody is supposed to go through LOV, we
2661            assume lov checked invalid values for us.
2662            The only recognised values so far are evict_by_nid and mds_conn.
2663            Even if something bad goes through, we'd get a -EINVAL from OST
2664            anyway. */
2665
2666         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2667                                                 &RQF_OST_SET_GRANT_INFO :
2668                                                 &RQF_OBD_SET_INFO);
2669         if (req == NULL)
2670                 RETURN(-ENOMEM);
2671
2672         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2673                              RCL_CLIENT, keylen);
2674         if (!KEY_IS(KEY_GRANT_SHRINK))
2675                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2676                                      RCL_CLIENT, vallen);
2677         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2678         if (rc) {
2679                 ptlrpc_request_free(req);
2680                 RETURN(rc);
2681         }
2682
2683         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2684         memcpy(tmp, key, keylen);
2685         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2686                                                         &RMF_OST_BODY :
2687                                                         &RMF_SETINFO_VAL);
2688         memcpy(tmp, val, vallen);
2689
2690         if (KEY_IS(KEY_GRANT_SHRINK)) {
2691                 struct osc_grant_args *aa;
2692                 struct obdo *oa;
2693
2694                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2695                 aa = ptlrpc_req_async_args(req);
2696                 OBDO_ALLOC(oa);
2697                 if (!oa) {
2698                         ptlrpc_req_finished(req);
2699                         RETURN(-ENOMEM);
2700                 }
2701                 *oa = ((struct ost_body *)val)->oa;
2702                 aa->aa_oa = oa;
2703                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2704         }
2705
2706         ptlrpc_request_set_replen(req);
2707         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2708                 LASSERT(set != NULL);
2709                 ptlrpc_set_add_req(set, req);
2710                 ptlrpc_check_set(NULL, set);
2711         } else {
2712                 ptlrpcd_add_req(req);
2713         }
2714
2715         RETURN(0);
2716 }
2717
2718 static int osc_reconnect(const struct lu_env *env,
2719                          struct obd_export *exp, struct obd_device *obd,
2720                          struct obd_uuid *cluuid,
2721                          struct obd_connect_data *data,
2722                          void *localdata)
2723 {
2724         struct client_obd *cli = &obd->u.cli;
2725
2726         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2727                 long lost_grant;
2728                 long grant;
2729
2730                 spin_lock(&cli->cl_loi_list_lock);
2731                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2732                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2733                         grant += cli->cl_dirty_grant;
2734                 else
2735                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2736                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2737                 lost_grant = cli->cl_lost_grant;
2738                 cli->cl_lost_grant = 0;
2739                 spin_unlock(&cli->cl_loi_list_lock);
2740
2741                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2742                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2743                        data->ocd_version, data->ocd_grant, lost_grant);
2744         }
2745
2746         RETURN(0);
2747 }
2748
2749 static int osc_disconnect(struct obd_export *exp)
2750 {
2751         struct obd_device *obd = class_exp2obd(exp);
2752         int rc;
2753
2754         rc = client_disconnect_export(exp);
2755         /**
2756          * Initially we put del_shrink_grant before disconnect_export, but it
2757          * causes the following problem if setup (connect) and cleanup
2758          * (disconnect) are tangled together.
2759          *      connect p1                     disconnect p2
2760          *   ptlrpc_connect_import
2761          *     ...............               class_manual_cleanup
2762          *                                     osc_disconnect
2763          *                                     del_shrink_grant
2764          *   ptlrpc_connect_interrupt
2765          *     init_grant_shrink
2766          *   add this client to shrink list
2767          *                                      cleanup_osc
2768          * Bang! pinger trigger the shrink.
2769          * So the osc should be disconnected from the shrink list, after we
2770          * are sure the import has been destroyed. BUG18662
2771          */
2772         if (obd->u.cli.cl_import == NULL)
2773                 osc_del_shrink_grant(&obd->u.cli);
2774         return rc;
2775 }
2776
2777 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2778         struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2779 {
2780         struct lu_env *env = arg;
2781         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2782         struct ldlm_lock *lock;
2783         struct osc_object *osc = NULL;
2784         ENTRY;
2785
2786         lock_res(res);
2787         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2788                 if (lock->l_ast_data != NULL && osc == NULL) {
2789                         osc = lock->l_ast_data;
2790                         cl_object_get(osc2cl(osc));
2791                 }
2792
2793                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2794                  * by the 2nd round of ldlm_namespace_clean() call in
2795                  * osc_import_event(). */
2796                 ldlm_clear_cleaned(lock);
2797         }
2798         unlock_res(res);
2799
2800         if (osc != NULL) {
2801                 osc_object_invalidate(env, osc);
2802                 cl_object_put(env, osc2cl(osc));
2803         }
2804
2805         RETURN(0);
2806 }
2807
2808 static int osc_import_event(struct obd_device *obd,
2809                             struct obd_import *imp,
2810                             enum obd_import_event event)
2811 {
2812         struct client_obd *cli;
2813         int rc = 0;
2814
2815         ENTRY;
2816         LASSERT(imp->imp_obd == obd);
2817
2818         switch (event) {
2819         case IMP_EVENT_DISCON: {
2820                 cli = &obd->u.cli;
2821                 spin_lock(&cli->cl_loi_list_lock);
2822                 cli->cl_avail_grant = 0;
2823                 cli->cl_lost_grant = 0;
2824                 spin_unlock(&cli->cl_loi_list_lock);
2825                 break;
2826         }
2827         case IMP_EVENT_INACTIVE: {
2828                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2829                 break;
2830         }
2831         case IMP_EVENT_INVALIDATE: {
2832                 struct ldlm_namespace *ns = obd->obd_namespace;
2833                 struct lu_env         *env;
2834                 __u16                  refcheck;
2835
2836                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2837
2838                 env = cl_env_get(&refcheck);
2839                 if (!IS_ERR(env)) {
2840                         osc_io_unplug(env, &obd->u.cli, NULL);
2841
2842                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2843                                                  osc_ldlm_resource_invalidate,
2844                                                  env, 0);
2845                         cl_env_put(env, &refcheck);
2846
2847                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2848                 } else
2849                         rc = PTR_ERR(env);
2850                 break;
2851         }
2852         case IMP_EVENT_ACTIVE: {
2853                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2854                 break;
2855         }
2856         case IMP_EVENT_OCD: {
2857                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2858
2859                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2860                         osc_init_grant(&obd->u.cli, ocd);
2861
2862                 /* See bug 7198 */
2863                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2864                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2865
2866                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2867                 break;
2868         }
2869         case IMP_EVENT_DEACTIVATE: {
2870                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2871                 break;
2872         }
2873         case IMP_EVENT_ACTIVATE: {
2874                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2875                 break;
2876         }
2877         default:
2878                 CERROR("Unknown import event %d\n", event);
2879                 LBUG();
2880         }
2881         RETURN(rc);
2882 }
2883
2884 /**
2885  * Determine whether the lock can be canceled before replaying the lock
2886  * during recovery, see bug16774 for detailed information.
2887  *
2888  * \retval zero the lock can't be canceled
2889  * \retval other ok to cancel
2890  */
2891 static int osc_cancel_weight(struct ldlm_lock *lock)
2892 {
2893         /*
2894          * Cancel all unused and granted extent lock.
2895          */
2896         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2897             lock->l_granted_mode == lock->l_req_mode &&
2898             osc_ldlm_weigh_ast(lock) == 0)
2899                 RETURN(1);
2900
2901         RETURN(0);
2902 }
2903
2904 static int brw_queue_work(const struct lu_env *env, void *data)
2905 {
2906         struct client_obd *cli = data;
2907
2908         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2909
2910         osc_io_unplug(env, cli, NULL);
2911         RETURN(0);
2912 }
2913
2914 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2915 {
2916         struct client_obd *cli = &obd->u.cli;
2917         struct obd_type   *type;
2918         void              *handler;
2919         int                rc;
2920         int                adding;
2921         int                added;
2922         int                req_count;
2923         ENTRY;
2924
2925         rc = ptlrpcd_addref();
2926         if (rc)
2927                 RETURN(rc);
2928
2929         rc = client_obd_setup(obd, lcfg);
2930         if (rc)
2931                 GOTO(out_ptlrpcd, rc);
2932
2933         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2934         if (IS_ERR(handler))
2935                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2936         cli->cl_writeback_work = handler;
2937
2938         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2939         if (IS_ERR(handler))
2940                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2941         cli->cl_lru_work = handler;
2942
2943         rc = osc_quota_setup(obd);
2944         if (rc)
2945                 GOTO(out_ptlrpcd_work, rc);
2946
2947         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2948
2949 #ifdef CONFIG_PROC_FS
2950         obd->obd_vars = lprocfs_osc_obd_vars;
2951 #endif
2952         /* If this is true then both client (osc) and server (osp) are on the
2953          * same node. The osp layer if loaded first will register the osc proc
2954          * directory. In that case this obd_device will be attached its proc
2955          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2956          */
2957         type = class_search_type(LUSTRE_OSP_NAME);
2958         if (type && type->typ_procsym) {
2959                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2960                                                        type->typ_procsym,
2961                                                        obd->obd_vars, obd);
2962                 if (IS_ERR(obd->obd_proc_entry)) {
2963                         rc = PTR_ERR(obd->obd_proc_entry);
2964                         CERROR("error %d setting up lprocfs for %s\n", rc,
2965                                obd->obd_name);
2966                         obd->obd_proc_entry = NULL;
2967                 }
2968         }
2969
2970         rc = lprocfs_obd_setup(obd, false);
2971         if (!rc) {
2972                 /* If the basic OSC proc tree construction succeeded then
2973                  * lets do the rest.
2974                  */
2975                 lproc_osc_attach_seqstat(obd);
2976                 sptlrpc_lprocfs_cliobd_attach(obd);
2977                 ptlrpc_lprocfs_register_obd(obd);
2978         }
2979
2980         /*
2981          * We try to control the total number of requests with a upper limit
2982          * osc_reqpool_maxreqcount. There might be some race which will cause
2983          * over-limit allocation, but it is fine.
2984          */
2985         req_count = atomic_read(&osc_pool_req_count);
2986         if (req_count < osc_reqpool_maxreqcount) {
2987                 adding = cli->cl_max_rpcs_in_flight + 2;
2988                 if (req_count + adding > osc_reqpool_maxreqcount)
2989                         adding = osc_reqpool_maxreqcount - req_count;
2990
2991                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2992                 atomic_add(added, &osc_pool_req_count);
2993         }
2994
2995         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2996         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2997
2998         spin_lock(&osc_shrink_lock);
2999         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3000         spin_unlock(&osc_shrink_lock);
3001
3002         RETURN(0);
3003
3004 out_ptlrpcd_work:
3005         if (cli->cl_writeback_work != NULL) {
3006                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3007                 cli->cl_writeback_work = NULL;
3008         }
3009         if (cli->cl_lru_work != NULL) {
3010                 ptlrpcd_destroy_work(cli->cl_lru_work);
3011                 cli->cl_lru_work = NULL;
3012         }
3013 out_client_setup:
3014         client_obd_cleanup(obd);
3015 out_ptlrpcd:
3016         ptlrpcd_decref();
3017         RETURN(rc);
3018 }
3019
3020 static int osc_precleanup(struct obd_device *obd)
3021 {
3022         struct client_obd *cli = &obd->u.cli;
3023         ENTRY;
3024
3025         /* LU-464
3026          * for echo client, export may be on zombie list, wait for
3027          * zombie thread to cull it, because cli.cl_import will be
3028          * cleared in client_disconnect_export():
3029          *   class_export_destroy() -> obd_cleanup() ->
3030          *   echo_device_free() -> echo_client_cleanup() ->
3031          *   obd_disconnect() -> osc_disconnect() ->
3032          *   client_disconnect_export()
3033          */
3034         obd_zombie_barrier();
3035         if (cli->cl_writeback_work) {
3036                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3037                 cli->cl_writeback_work = NULL;
3038         }
3039
3040         if (cli->cl_lru_work) {
3041                 ptlrpcd_destroy_work(cli->cl_lru_work);
3042                 cli->cl_lru_work = NULL;
3043         }
3044
3045         obd_cleanup_client_import(obd);
3046         ptlrpc_lprocfs_unregister_obd(obd);
3047         lprocfs_obd_cleanup(obd);
3048         RETURN(0);
3049 }
3050
3051 int osc_cleanup(struct obd_device *obd)
3052 {
3053         struct client_obd *cli = &obd->u.cli;
3054         int rc;
3055
3056         ENTRY;
3057
3058         spin_lock(&osc_shrink_lock);
3059         list_del(&cli->cl_shrink_list);
3060         spin_unlock(&osc_shrink_lock);
3061
3062         /* lru cleanup */
3063         if (cli->cl_cache != NULL) {
3064                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3065                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3066                 list_del_init(&cli->cl_lru_osc);
3067                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3068                 cli->cl_lru_left = NULL;
3069                 cl_cache_decref(cli->cl_cache);
3070                 cli->cl_cache = NULL;
3071         }
3072
3073         /* free memory of osc quota cache */
3074         osc_quota_cleanup(obd);
3075
3076         rc = client_obd_cleanup(obd);
3077
3078         ptlrpcd_decref();
3079         RETURN(rc);
3080 }
3081
3082 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3083 {
3084         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3085         return rc > 0 ? 0: rc;
3086 }
3087
3088 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3089 {
3090         return osc_process_config_base(obd, buf);
3091 }
3092
3093 static struct obd_ops osc_obd_ops = {
3094         .o_owner                = THIS_MODULE,
3095         .o_setup                = osc_setup,
3096         .o_precleanup           = osc_precleanup,
3097         .o_cleanup              = osc_cleanup,
3098         .o_add_conn             = client_import_add_conn,
3099         .o_del_conn             = client_import_del_conn,
3100         .o_connect              = client_connect_import,
3101         .o_reconnect            = osc_reconnect,
3102         .o_disconnect           = osc_disconnect,
3103         .o_statfs               = osc_statfs,
3104         .o_statfs_async         = osc_statfs_async,
3105         .o_create               = osc_create,
3106         .o_destroy              = osc_destroy,
3107         .o_getattr              = osc_getattr,
3108         .o_setattr              = osc_setattr,
3109         .o_iocontrol            = osc_iocontrol,
3110         .o_set_info_async       = osc_set_info_async,
3111         .o_import_event         = osc_import_event,
3112         .o_process_config       = osc_process_config,
3113         .o_quotactl             = osc_quotactl,
3114 };
3115
3116 static struct shrinker *osc_cache_shrinker;
3117 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3118 DEFINE_SPINLOCK(osc_shrink_lock);
3119
3120 #ifndef HAVE_SHRINKER_COUNT
3121 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3122 {
3123         struct shrink_control scv = {
3124                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3125                 .gfp_mask   = shrink_param(sc, gfp_mask)
3126         };
3127 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3128         struct shrinker *shrinker = NULL;
3129 #endif
3130
3131         (void)osc_cache_shrink_scan(shrinker, &scv);
3132
3133         return osc_cache_shrink_count(shrinker, &scv);
3134 }
3135 #endif
3136
3137 static int __init osc_init(void)
3138 {
3139         bool enable_proc = true;
3140         struct obd_type *type;
3141         unsigned int reqpool_size;
3142         unsigned int reqsize;
3143         int rc;
3144         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3145                          osc_cache_shrink_count, osc_cache_shrink_scan);
3146         ENTRY;
3147
3148         /* print an address of _any_ initialized kernel symbol from this
3149          * module, to allow debugging with gdb that doesn't support data
3150          * symbols from modules.*/
3151         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3152
3153         rc = lu_kmem_init(osc_caches);
3154         if (rc)
3155                 RETURN(rc);
3156
3157         type = class_search_type(LUSTRE_OSP_NAME);
3158         if (type != NULL && type->typ_procsym != NULL)
3159                 enable_proc = false;
3160
3161         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3162                                  LUSTRE_OSC_NAME, &osc_device_type);
3163         if (rc)
3164                 GOTO(out_kmem, rc);
3165
3166         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3167
3168         /* This is obviously too much memory, only prevent overflow here */
3169         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3170                 GOTO(out_type, rc = -EINVAL);
3171
3172         reqpool_size = osc_reqpool_mem_max << 20;
3173
3174         reqsize = 1;
3175         while (reqsize < OST_IO_MAXREQSIZE)
3176                 reqsize = reqsize << 1;
3177
3178         /*
3179          * We don't enlarge the request count in OSC pool according to
3180          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3181          * tried after normal allocation failed. So a small OSC pool won't
3182          * cause much performance degression in most of cases.
3183          */
3184         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3185
3186         atomic_set(&osc_pool_req_count, 0);
3187         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3188                                           ptlrpc_add_rqs_to_pool);
3189
3190         if (osc_rq_pool != NULL)
3191                 GOTO(out, rc);
3192         rc = -ENOMEM;
3193 out_type:
3194         class_unregister_type(LUSTRE_OSC_NAME);
3195 out_kmem:
3196         lu_kmem_fini(osc_caches);
3197 out:
3198         RETURN(rc);
3199 }
3200
3201 static void __exit osc_exit(void)
3202 {
3203         remove_shrinker(osc_cache_shrinker);
3204         class_unregister_type(LUSTRE_OSC_NAME);
3205         lu_kmem_fini(osc_caches);
3206         ptlrpc_free_rq_pool(osc_rq_pool);
3207 }
3208
3209 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3210 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3211 MODULE_VERSION(LUSTRE_VERSION_STRING);
3212 MODULE_LICENSE("GPL");
3213
3214 module_init(osc_init);
3215 module_exit(osc_exit);