Whamcloud - gitweb
LU-10776 osc: Do not request more than 2GiB grant
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <lprocfs_status.h>
36 #include <lustre_debug.h>
37 #include <lustre_dlm.h>
38 #include <lustre_fid.h>
39 #include <lustre_ha.h>
40 #include <uapi/linux/lustre/lustre_ioctl.h>
41 #include <lustre_net.h>
42 #include <lustre_obdo.h>
43 #include <uapi/linux/lustre/lustre_param.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 #define osc_grant_args osc_brw_async_args
60
61 struct osc_setattr_args {
62         struct obdo             *sa_oa;
63         obd_enqueue_update_f     sa_upcall;
64         void                    *sa_cookie;
65 };
66
67 struct osc_fsync_args {
68         struct osc_object       *fa_obj;
69         struct obdo             *fa_oa;
70         obd_enqueue_update_f    fa_upcall;
71         void                    *fa_cookie;
72 };
73
74 struct osc_ladvise_args {
75         struct obdo             *la_oa;
76         obd_enqueue_update_f     la_upcall;
77         void                    *la_cookie;
78 };
79
80 static void osc_release_ppga(struct brw_page **ppga, size_t count);
81 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
82                          void *data, int rc);
83
84 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
85 {
86         struct ost_body *body;
87
88         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
89         LASSERT(body);
90
91         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
92 }
93
94 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
95                        struct obdo *oa)
96 {
97         struct ptlrpc_request   *req;
98         struct ost_body         *body;
99         int                      rc;
100
101         ENTRY;
102         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
103         if (req == NULL)
104                 RETURN(-ENOMEM);
105
106         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
107         if (rc) {
108                 ptlrpc_request_free(req);
109                 RETURN(rc);
110         }
111
112         osc_pack_req_body(req, oa);
113
114         ptlrpc_request_set_replen(req);
115
116         rc = ptlrpc_queue_wait(req);
117         if (rc)
118                 GOTO(out, rc);
119
120         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
121         if (body == NULL)
122                 GOTO(out, rc = -EPROTO);
123
124         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
125         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
126
127         oa->o_blksize = cli_brw_size(exp->exp_obd);
128         oa->o_valid |= OBD_MD_FLBLKSZ;
129
130         EXIT;
131 out:
132         ptlrpc_req_finished(req);
133
134         return rc;
135 }
136
137 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
138                        struct obdo *oa)
139 {
140         struct ptlrpc_request   *req;
141         struct ost_body         *body;
142         int                      rc;
143
144         ENTRY;
145         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
146
147         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
148         if (req == NULL)
149                 RETURN(-ENOMEM);
150
151         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
152         if (rc) {
153                 ptlrpc_request_free(req);
154                 RETURN(rc);
155         }
156
157         osc_pack_req_body(req, oa);
158
159         ptlrpc_request_set_replen(req);
160
161         rc = ptlrpc_queue_wait(req);
162         if (rc)
163                 GOTO(out, rc);
164
165         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 GOTO(out, rc = -EPROTO);
168
169         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
170
171         EXIT;
172 out:
173         ptlrpc_req_finished(req);
174
175         RETURN(rc);
176 }
177
178 static int osc_setattr_interpret(const struct lu_env *env,
179                                  struct ptlrpc_request *req,
180                                  struct osc_setattr_args *sa, int rc)
181 {
182         struct ost_body *body;
183         ENTRY;
184
185         if (rc != 0)
186                 GOTO(out, rc);
187
188         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
189         if (body == NULL)
190                 GOTO(out, rc = -EPROTO);
191
192         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
193                              &body->oa);
194 out:
195         rc = sa->sa_upcall(sa->sa_cookie, rc);
196         RETURN(rc);
197 }
198
199 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
200                       obd_enqueue_update_f upcall, void *cookie,
201                       struct ptlrpc_request_set *rqset)
202 {
203         struct ptlrpc_request   *req;
204         struct osc_setattr_args *sa;
205         int                      rc;
206
207         ENTRY;
208
209         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
210         if (req == NULL)
211                 RETURN(-ENOMEM);
212
213         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
214         if (rc) {
215                 ptlrpc_request_free(req);
216                 RETURN(rc);
217         }
218
219         osc_pack_req_body(req, oa);
220
221         ptlrpc_request_set_replen(req);
222
223         /* do mds to ost setattr asynchronously */
224         if (!rqset) {
225                 /* Do not wait for response. */
226                 ptlrpcd_add_req(req);
227         } else {
228                 req->rq_interpret_reply =
229                         (ptlrpc_interpterer_t)osc_setattr_interpret;
230
231                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
232                 sa = ptlrpc_req_async_args(req);
233                 sa->sa_oa = oa;
234                 sa->sa_upcall = upcall;
235                 sa->sa_cookie = cookie;
236
237                 if (rqset == PTLRPCD_SET)
238                         ptlrpcd_add_req(req);
239                 else
240                         ptlrpc_set_add_req(rqset, req);
241         }
242
243         RETURN(0);
244 }
245
246 static int osc_ladvise_interpret(const struct lu_env *env,
247                                  struct ptlrpc_request *req,
248                                  void *arg, int rc)
249 {
250         struct osc_ladvise_args *la = arg;
251         struct ost_body *body;
252         ENTRY;
253
254         if (rc != 0)
255                 GOTO(out, rc);
256
257         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
258         if (body == NULL)
259                 GOTO(out, rc = -EPROTO);
260
261         *la->la_oa = body->oa;
262 out:
263         rc = la->la_upcall(la->la_cookie, rc);
264         RETURN(rc);
265 }
266
267 /**
268  * If rqset is NULL, do not wait for response. Upcall and cookie could also
269  * be NULL in this case
270  */
271 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
272                      struct ladvise_hdr *ladvise_hdr,
273                      obd_enqueue_update_f upcall, void *cookie,
274                      struct ptlrpc_request_set *rqset)
275 {
276         struct ptlrpc_request   *req;
277         struct ost_body         *body;
278         struct osc_ladvise_args *la;
279         int                      rc;
280         struct lu_ladvise       *req_ladvise;
281         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
282         int                      num_advise = ladvise_hdr->lah_count;
283         struct ladvise_hdr      *req_ladvise_hdr;
284         ENTRY;
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
287         if (req == NULL)
288                 RETURN(-ENOMEM);
289
290         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
291                              num_advise * sizeof(*ladvise));
292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
293         if (rc != 0) {
294                 ptlrpc_request_free(req);
295                 RETURN(rc);
296         }
297         req->rq_request_portal = OST_IO_PORTAL;
298         ptlrpc_at_set_req_timeout(req);
299
300         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
301         LASSERT(body);
302         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
303                              oa);
304
305         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
306                                                  &RMF_OST_LADVISE_HDR);
307         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
308
309         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
310         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
311         ptlrpc_request_set_replen(req);
312
313         if (rqset == NULL) {
314                 /* Do not wait for response. */
315                 ptlrpcd_add_req(req);
316                 RETURN(0);
317         }
318
319         req->rq_interpret_reply = osc_ladvise_interpret;
320         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
321         la = ptlrpc_req_async_args(req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         if (rqset == PTLRPCD_SET)
327                 ptlrpcd_add_req(req);
328         else
329                 ptlrpc_set_add_req(rqset, req);
330
331         RETURN(0);
332 }
333
334 static int osc_create(const struct lu_env *env, struct obd_export *exp,
335                       struct obdo *oa)
336 {
337         struct ptlrpc_request *req;
338         struct ost_body       *body;
339         int                    rc;
340         ENTRY;
341
342         LASSERT(oa != NULL);
343         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
344         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
345
346         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
347         if (req == NULL)
348                 GOTO(out, rc = -ENOMEM);
349
350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 GOTO(out, rc);
354         }
355
356         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
357         LASSERT(body);
358
359         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
360
361         ptlrpc_request_set_replen(req);
362
363         rc = ptlrpc_queue_wait(req);
364         if (rc)
365                 GOTO(out_req, rc);
366
367         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
368         if (body == NULL)
369                 GOTO(out_req, rc = -EPROTO);
370
371         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
372         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
373
374         oa->o_blksize = cli_brw_size(exp->exp_obd);
375         oa->o_valid |= OBD_MD_FLBLKSZ;
376
377         CDEBUG(D_HA, "transno: %lld\n",
378                lustre_msg_get_transno(req->rq_repmsg));
379 out_req:
380         ptlrpc_req_finished(req);
381 out:
382         RETURN(rc);
383 }
384
385 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
386                    obd_enqueue_update_f upcall, void *cookie)
387 {
388         struct ptlrpc_request *req;
389         struct osc_setattr_args *sa;
390         struct obd_import *imp = class_exp2cliimp(exp);
391         struct ost_body *body;
392         int rc;
393
394         ENTRY;
395
396         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
397         if (req == NULL)
398                 RETURN(-ENOMEM);
399
400         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
401         if (rc < 0) {
402                 ptlrpc_request_free(req);
403                 RETURN(rc);
404         }
405
406         osc_set_io_portal(req);
407
408         ptlrpc_at_set_req_timeout(req);
409
410         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
411
412         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
413
414         ptlrpc_request_set_replen(req);
415
416         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
417         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
418         sa = ptlrpc_req_async_args(req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req,
431                               void *arg, int rc)
432 {
433         struct osc_fsync_args   *fa = arg;
434         struct ost_body         *body;
435         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
436         unsigned long           valid = 0;
437         struct cl_object        *obj;
438         ENTRY;
439
440         if (rc != 0)
441                 GOTO(out, rc);
442
443         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
444         if (body == NULL) {
445                 CERROR("can't unpack ost_body\n");
446                 GOTO(out, rc = -EPROTO);
447         }
448
449         *fa->fa_oa = body->oa;
450         obj = osc2cl(fa->fa_obj);
451
452         /* Update osc object's blocks attribute */
453         cl_object_attr_lock(obj);
454         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
455                 attr->cat_blocks = body->oa.o_blocks;
456                 valid |= CAT_BLOCKS;
457         }
458
459         if (valid != 0)
460                 cl_object_attr_update(env, obj, attr, valid);
461         cl_object_attr_unlock(obj);
462
463 out:
464         rc = fa->fa_upcall(fa->fa_cookie, rc);
465         RETURN(rc);
466 }
467
468 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
469                   obd_enqueue_update_f upcall, void *cookie,
470                   struct ptlrpc_request_set *rqset)
471 {
472         struct obd_export     *exp = osc_export(obj);
473         struct ptlrpc_request *req;
474         struct ost_body       *body;
475         struct osc_fsync_args *fa;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
480         if (req == NULL)
481                 RETURN(-ENOMEM);
482
483         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
484         if (rc) {
485                 ptlrpc_request_free(req);
486                 RETURN(rc);
487         }
488
489         /* overload the size and blocks fields in the oa with start/end */
490         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
491         LASSERT(body);
492         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
493
494         ptlrpc_request_set_replen(req);
495         req->rq_interpret_reply = osc_sync_interpret;
496
497         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
498         fa = ptlrpc_req_async_args(req);
499         fa->fa_obj = obj;
500         fa->fa_oa = oa;
501         fa->fa_upcall = upcall;
502         fa->fa_cookie = cookie;
503
504         if (rqset == PTLRPCD_SET)
505                 ptlrpcd_add_req(req);
506         else
507                 ptlrpc_set_add_req(rqset, req);
508
509         RETURN (0);
510 }
511
512 /* Find and cancel locally locks matched by @mode in the resource found by
513  * @objid. Found locks are added into @cancel list. Returns the amount of
514  * locks added to @cancels list. */
515 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
516                                    struct list_head *cancels,
517                                    enum ldlm_mode mode, __u64 lock_flags)
518 {
519         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
520         struct ldlm_res_id res_id;
521         struct ldlm_resource *res;
522         int count;
523         ENTRY;
524
525         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
526          * export) but disabled through procfs (flag in NS).
527          *
528          * This distinguishes from a case when ELC is not supported originally,
529          * when we still want to cancel locks in advance and just cancel them
530          * locally, without sending any RPC. */
531         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
532                 RETURN(0);
533
534         ostid_build_res_name(&oa->o_oi, &res_id);
535         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
536         if (IS_ERR(res))
537                 RETURN(0);
538
539         LDLM_RESOURCE_ADDREF(res);
540         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
541                                            lock_flags, 0, NULL);
542         LDLM_RESOURCE_DELREF(res);
543         ldlm_resource_putref(res);
544         RETURN(count);
545 }
546
547 static int osc_destroy_interpret(const struct lu_env *env,
548                                  struct ptlrpc_request *req, void *data,
549                                  int rc)
550 {
551         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
552
553         atomic_dec(&cli->cl_destroy_in_flight);
554         wake_up(&cli->cl_destroy_waitq);
555         return 0;
556 }
557
558 static int osc_can_send_destroy(struct client_obd *cli)
559 {
560         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
561             cli->cl_max_rpcs_in_flight) {
562                 /* The destroy request can be sent */
563                 return 1;
564         }
565         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
566             cli->cl_max_rpcs_in_flight) {
567                 /*
568                  * The counter has been modified between the two atomic
569                  * operations.
570                  */
571                 wake_up(&cli->cl_destroy_waitq);
572         }
573         return 0;
574 }
575
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
601                                0, &cancels, count);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
608         ptlrpc_at_set_req_timeout(req);
609
610         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
611         LASSERT(body);
612         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
613
614         ptlrpc_request_set_replen(req);
615
616         req->rq_interpret_reply = osc_destroy_interpret;
617         if (!osc_can_send_destroy(cli)) {
618                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
619
620                 /*
621                  * Wait until the number of on-going destroy RPCs drops
622                  * under max_rpc_in_flight
623                  */
624                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
625                                             osc_can_send_destroy(cli), &lwi);
626                 if (rc) {
627                         ptlrpc_req_finished(req);
628                         RETURN(rc);
629                 }
630         }
631
632         /* Do not wait for response */
633         ptlrpcd_add_req(req);
634         RETURN(0);
635 }
636
637 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
638                                 long writing_bytes)
639 {
640         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
641
642         LASSERT(!(oa->o_valid & bits));
643
644         oa->o_valid |= bits;
645         spin_lock(&cli->cl_loi_list_lock);
646         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
647                 oa->o_dirty = cli->cl_dirty_grant;
648         else
649                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
650         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
651                      cli->cl_dirty_max_pages)) {
652                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
653                        cli->cl_dirty_pages, cli->cl_dirty_transit,
654                        cli->cl_dirty_max_pages);
655                 oa->o_undirty = 0;
656         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
657                             atomic_long_read(&obd_dirty_transit_pages) >
658                             (long)(obd_max_dirty_pages + 1))) {
659                 /* The atomic_read() allowing the atomic_inc() are
660                  * not covered by a lock thus they may safely race and trip
661                  * this CERROR() unless we add in a small fudge factor (+1). */
662                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
663                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
664                        atomic_long_read(&obd_dirty_transit_pages),
665                        obd_max_dirty_pages);
666                 oa->o_undirty = 0;
667         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
668                             0x7fffffff)) {
669                 CERROR("dirty %lu - dirty_max %lu too big???\n",
670                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
671                 oa->o_undirty = 0;
672         } else {
673                 unsigned long nrpages;
674                 unsigned long undirty;
675
676                 nrpages = cli->cl_max_pages_per_rpc;
677                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
678                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
679                 undirty = nrpages << PAGE_SHIFT;
680                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
681                                  GRANT_PARAM)) {
682                         int nrextents;
683
684                         /* take extent tax into account when asking for more
685                          * grant space */
686                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
687                                      cli->cl_max_extent_pages;
688                         undirty += nrextents * cli->cl_grant_extent_tax;
689                 }
690                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
691                  * to add extent tax, etc.
692                  */
693                 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
694                                     (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
695         }
696         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
697         oa->o_dropped = cli->cl_lost_grant;
698         cli->cl_lost_grant = 0;
699         spin_unlock(&cli->cl_loi_list_lock);
700         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
701                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
702 }
703
704 void osc_update_next_shrink(struct client_obd *cli)
705 {
706         cli->cl_next_shrink_grant = ktime_get_seconds() +
707                                     cli->cl_grant_shrink_interval;
708
709         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
710                cli->cl_next_shrink_grant);
711 }
712
713 static void __osc_update_grant(struct client_obd *cli, u64 grant)
714 {
715         spin_lock(&cli->cl_loi_list_lock);
716         cli->cl_avail_grant += grant;
717         spin_unlock(&cli->cl_loi_list_lock);
718 }
719
720 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
721 {
722         if (body->oa.o_valid & OBD_MD_FLGRANT) {
723                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
724                 __osc_update_grant(cli, body->oa.o_grant);
725         }
726 }
727
728 static int osc_shrink_grant_interpret(const struct lu_env *env,
729                                       struct ptlrpc_request *req,
730                                       void *aa, int rc)
731 {
732         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
733         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
734         struct ost_body *body;
735
736         if (rc != 0) {
737                 __osc_update_grant(cli, oa->o_grant);
738                 GOTO(out, rc);
739         }
740
741         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
742         LASSERT(body);
743         osc_update_grant(cli, body);
744 out:
745         OBDO_FREE(oa);
746         return rc;
747 }
748
749 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         oa->o_grant = cli->cl_avail_grant / 4;
753         cli->cl_avail_grant -= oa->o_grant;
754         spin_unlock(&cli->cl_loi_list_lock);
755         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
756                 oa->o_valid |= OBD_MD_FLFLAGS;
757                 oa->o_flags = 0;
758         }
759         oa->o_flags |= OBD_FL_SHRINK_GRANT;
760         osc_update_next_shrink(cli);
761 }
762
763 /* Shrink the current grant, either from some large amount to enough for a
764  * full set of in-flight RPCs, or if we have already shrunk to that limit
765  * then to enough for a single RPC.  This avoids keeping more grant than
766  * needed, and avoids shrinking the grant piecemeal. */
767 static int osc_shrink_grant(struct client_obd *cli)
768 {
769         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
770                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
771
772         spin_lock(&cli->cl_loi_list_lock);
773         if (cli->cl_avail_grant <= target_bytes)
774                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
775         spin_unlock(&cli->cl_loi_list_lock);
776
777         return osc_shrink_grant_to_target(cli, target_bytes);
778 }
779
780 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
781 {
782         int                     rc = 0;
783         struct ost_body        *body;
784         ENTRY;
785
786         spin_lock(&cli->cl_loi_list_lock);
787         /* Don't shrink if we are already above or below the desired limit
788          * We don't want to shrink below a single RPC, as that will negatively
789          * impact block allocation and long-term performance. */
790         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
791                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
792
793         if (target_bytes >= cli->cl_avail_grant) {
794                 spin_unlock(&cli->cl_loi_list_lock);
795                 RETURN(0);
796         }
797         spin_unlock(&cli->cl_loi_list_lock);
798
799         OBD_ALLOC_PTR(body);
800         if (!body)
801                 RETURN(-ENOMEM);
802
803         osc_announce_cached(cli, &body->oa, 0);
804
805         spin_lock(&cli->cl_loi_list_lock);
806         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
807         cli->cl_avail_grant = target_bytes;
808         spin_unlock(&cli->cl_loi_list_lock);
809         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
810                 body->oa.o_valid |= OBD_MD_FLFLAGS;
811                 body->oa.o_flags = 0;
812         }
813         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
814         osc_update_next_shrink(cli);
815
816         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
817                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
818                                 sizeof(*body), body, NULL);
819         if (rc != 0)
820                 __osc_update_grant(cli, body->oa.o_grant);
821         OBD_FREE_PTR(body);
822         RETURN(rc);
823 }
824
825 static int osc_should_shrink_grant(struct client_obd *client)
826 {
827         time64_t next_shrink = client->cl_next_shrink_grant;
828
829         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
830              OBD_CONNECT_GRANT_SHRINK) == 0)
831                 return 0;
832
833         if (ktime_get_seconds() >= next_shrink - 5) {
834                 /* Get the current RPC size directly, instead of going via:
835                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
836                  * Keep comment here so that it can be found by searching. */
837                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
838
839                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
840                     client->cl_avail_grant > brw_size)
841                         return 1;
842                 else
843                         osc_update_next_shrink(client);
844         }
845         return 0;
846 }
847
848 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
849 {
850         struct client_obd *client;
851
852         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
853                 if (osc_should_shrink_grant(client))
854                         osc_shrink_grant(client);
855         }
856         return 0;
857 }
858
859 static int osc_add_shrink_grant(struct client_obd *client)
860 {
861         int rc;
862
863         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
864                                        TIMEOUT_GRANT,
865                                        osc_grant_shrink_grant_cb, NULL,
866                                        &client->cl_grant_shrink_list);
867         if (rc) {
868                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
869                 return rc;
870         }
871         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
872         osc_update_next_shrink(client);
873         return 0;
874 }
875
876 static int osc_del_shrink_grant(struct client_obd *client)
877 {
878         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
879                                          TIMEOUT_GRANT);
880 }
881
882 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
883 {
884         /*
885          * ocd_grant is the total grant amount we're expect to hold: if we've
886          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
887          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
888          * dirty.
889          *
890          * race is tolerable here: if we're evicted, but imp_state already
891          * left EVICTED state, then cl_dirty_pages must be 0 already.
892          */
893         spin_lock(&cli->cl_loi_list_lock);
894         cli->cl_avail_grant = ocd->ocd_grant;
895         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
896                 cli->cl_avail_grant -= cli->cl_reserved_grant;
897                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
898                         cli->cl_avail_grant -= cli->cl_dirty_grant;
899                 else
900                         cli->cl_avail_grant -=
901                                         cli->cl_dirty_pages << PAGE_SHIFT;
902         }
903
904         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
905                 u64 size;
906                 int chunk_mask;
907
908                 /* overhead for each extent insertion */
909                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
910                 /* determine the appropriate chunk size used by osc_extent. */
911                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
912                                           ocd->ocd_grant_blkbits);
913                 /* max_pages_per_rpc must be chunk aligned */
914                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
915                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
916                                              ~chunk_mask) & chunk_mask;
917                 /* determine maximum extent size, in #pages */
918                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
919                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
920                 if (cli->cl_max_extent_pages == 0)
921                         cli->cl_max_extent_pages = 1;
922         } else {
923                 cli->cl_grant_extent_tax = 0;
924                 cli->cl_chunkbits = PAGE_SHIFT;
925                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
926         }
927         spin_unlock(&cli->cl_loi_list_lock);
928
929         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
930                 "chunk bits: %d cl_max_extent_pages: %d\n",
931                 cli_name(cli),
932                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
933                 cli->cl_max_extent_pages);
934
935         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
936             list_empty(&cli->cl_grant_shrink_list))
937                 osc_add_shrink_grant(cli);
938 }
939 EXPORT_SYMBOL(osc_init_grant);
940
941 /* We assume that the reason this OSC got a short read is because it read
942  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
943  * via the LOV, and it _knows_ it's reading inside the file, it's just that
944  * this stripe never got written at or beyond this stripe offset yet. */
945 static void handle_short_read(int nob_read, size_t page_count,
946                               struct brw_page **pga)
947 {
948         char *ptr;
949         int i = 0;
950
951         /* skip bytes read OK */
952         while (nob_read > 0) {
953                 LASSERT (page_count > 0);
954
955                 if (pga[i]->count > nob_read) {
956                         /* EOF inside this page */
957                         ptr = kmap(pga[i]->pg) +
958                                 (pga[i]->off & ~PAGE_MASK);
959                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
960                         kunmap(pga[i]->pg);
961                         page_count--;
962                         i++;
963                         break;
964                 }
965
966                 nob_read -= pga[i]->count;
967                 page_count--;
968                 i++;
969         }
970
971         /* zero remaining pages */
972         while (page_count-- > 0) {
973                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
974                 memset(ptr, 0, pga[i]->count);
975                 kunmap(pga[i]->pg);
976                 i++;
977         }
978 }
979
980 static int check_write_rcs(struct ptlrpc_request *req,
981                            int requested_nob, int niocount,
982                            size_t page_count, struct brw_page **pga)
983 {
984         int     i;
985         __u32   *remote_rcs;
986
987         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
988                                                   sizeof(*remote_rcs) *
989                                                   niocount);
990         if (remote_rcs == NULL) {
991                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
992                 return(-EPROTO);
993         }
994
995         /* return error if any niobuf was in error */
996         for (i = 0; i < niocount; i++) {
997                 if ((int)remote_rcs[i] < 0)
998                         return(remote_rcs[i]);
999
1000                 if (remote_rcs[i] != 0) {
1001                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1002                                 i, remote_rcs[i], req);
1003                         return(-EPROTO);
1004                 }
1005         }
1006         if (req->rq_bulk != NULL &&
1007             req->rq_bulk->bd_nob_transferred != requested_nob) {
1008                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1009                        req->rq_bulk->bd_nob_transferred, requested_nob);
1010                 return(-EPROTO);
1011         }
1012
1013         return (0);
1014 }
1015
1016 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1017 {
1018         if (p1->flag != p2->flag) {
1019                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1020                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1021                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1022
1023                 /* warn if we try to combine flags that we don't know to be
1024                  * safe to combine */
1025                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1026                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1027                               "report this at https://jira.hpdd.intel.com/\n",
1028                               p1->flag, p2->flag);
1029                 }
1030                 return 0;
1031         }
1032
1033         return (p1->off + p1->count == p2->off);
1034 }
1035
1036 static int osc_checksum_bulk(int nob, size_t pg_count,
1037                              struct brw_page **pga, int opc,
1038                              enum cksum_types cksum_type,
1039                              u32 *cksum)
1040 {
1041         int                             i = 0;
1042         struct cfs_crypto_hash_desc     *hdesc;
1043         unsigned int                    bufsize;
1044         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1045
1046         LASSERT(pg_count > 0);
1047
1048         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1049         if (IS_ERR(hdesc)) {
1050                 CERROR("Unable to initialize checksum hash %s\n",
1051                        cfs_crypto_hash_name(cfs_alg));
1052                 return PTR_ERR(hdesc);
1053         }
1054
1055         while (nob > 0 && pg_count > 0) {
1056                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1057
1058                 /* corrupt the data before we compute the checksum, to
1059                  * simulate an OST->client data error */
1060                 if (i == 0 && opc == OST_READ &&
1061                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1062                         unsigned char *ptr = kmap(pga[i]->pg);
1063                         int off = pga[i]->off & ~PAGE_MASK;
1064
1065                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1066                         kunmap(pga[i]->pg);
1067                 }
1068                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1069                                             pga[i]->off & ~PAGE_MASK,
1070                                             count);
1071                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1072                                (int)(pga[i]->off & ~PAGE_MASK));
1073
1074                 nob -= pga[i]->count;
1075                 pg_count--;
1076                 i++;
1077         }
1078
1079         bufsize = sizeof(*cksum);
1080         cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
1081
1082         /* For sending we only compute the wrong checksum instead
1083          * of corrupting the data so it is still correct on a redo */
1084         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1085                 (*cksum)++;
1086
1087         return 0;
1088 }
1089
1090 static int
1091 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1092                      u32 page_count, struct brw_page **pga,
1093                      struct ptlrpc_request **reqp, int resend)
1094 {
1095         struct ptlrpc_request   *req;
1096         struct ptlrpc_bulk_desc *desc;
1097         struct ost_body         *body;
1098         struct obd_ioobj        *ioobj;
1099         struct niobuf_remote    *niobuf;
1100         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1101         struct osc_brw_async_args *aa;
1102         struct req_capsule      *pill;
1103         struct brw_page *pg_prev;
1104         void *short_io_buf;
1105
1106         ENTRY;
1107         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1108                 RETURN(-ENOMEM); /* Recoverable */
1109         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1110                 RETURN(-EINVAL); /* Fatal */
1111
1112         if ((cmd & OBD_BRW_WRITE) != 0) {
1113                 opc = OST_WRITE;
1114                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1115                                                 osc_rq_pool,
1116                                                 &RQF_OST_BRW_WRITE);
1117         } else {
1118                 opc = OST_READ;
1119                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1120         }
1121         if (req == NULL)
1122                 RETURN(-ENOMEM);
1123
1124         for (niocount = i = 1; i < page_count; i++) {
1125                 if (!can_merge_pages(pga[i - 1], pga[i]))
1126                         niocount++;
1127         }
1128
1129         pill = &req->rq_pill;
1130         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1131                              sizeof(*ioobj));
1132         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1133                              niocount * sizeof(*niobuf));
1134
1135         for (i = 0; i < page_count; i++)
1136                 short_io_size += pga[i]->count;
1137
1138         /* Check if we can do a short io. */
1139         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1140             imp_connect_shortio(cli->cl_import)))
1141                 short_io_size = 0;
1142
1143         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1144                              opc == OST_READ ? 0 : short_io_size);
1145         if (opc == OST_READ)
1146                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1147                                      short_io_size);
1148
1149         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1150         if (rc) {
1151                 ptlrpc_request_free(req);
1152                 RETURN(rc);
1153         }
1154         osc_set_io_portal(req);
1155
1156         ptlrpc_at_set_req_timeout(req);
1157         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1158          * retry logic */
1159         req->rq_no_retry_einprogress = 1;
1160
1161         if (short_io_size != 0) {
1162                 desc = NULL;
1163                 short_io_buf = NULL;
1164                 goto no_bulk;
1165         }
1166
1167         desc = ptlrpc_prep_bulk_imp(req, page_count,
1168                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1169                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1170                         PTLRPC_BULK_PUT_SINK) |
1171                         PTLRPC_BULK_BUF_KIOV,
1172                 OST_BULK_PORTAL,
1173                 &ptlrpc_bulk_kiov_pin_ops);
1174
1175         if (desc == NULL)
1176                 GOTO(out, rc = -ENOMEM);
1177         /* NB request now owns desc and will free it when it gets freed */
1178 no_bulk:
1179         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1180         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1181         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1183
1184         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1185
1186         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1187          * and from_kgid(), because they are asynchronous. Fortunately, variable
1188          * oa contains valid o_uid and o_gid in these two operations.
1189          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1190          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1191          * other process logic */
1192         body->oa.o_uid = oa->o_uid;
1193         body->oa.o_gid = oa->o_gid;
1194
1195         obdo_to_ioobj(oa, ioobj);
1196         ioobj->ioo_bufcnt = niocount;
1197         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1198          * that might be send for this request.  The actual number is decided
1199          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1200          * "max - 1" for old client compatibility sending "0", and also so the
1201          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1202         if (desc != NULL)
1203                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1204         else /* short io */
1205                 ioobj_max_brw_set(ioobj, 0);
1206
1207         if (short_io_size != 0) {
1208                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1209                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1210                         body->oa.o_flags = 0;
1211                 }
1212                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1213                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1214                        short_io_size);
1215                 if (opc == OST_WRITE) {
1216                         short_io_buf = req_capsule_client_get(pill,
1217                                                               &RMF_SHORT_IO);
1218                         LASSERT(short_io_buf != NULL);
1219                 }
1220         }
1221
1222         LASSERT(page_count > 0);
1223         pg_prev = pga[0];
1224         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1225                 struct brw_page *pg = pga[i];
1226                 int poff = pg->off & ~PAGE_MASK;
1227
1228                 LASSERT(pg->count > 0);
1229                 /* make sure there is no gap in the middle of page array */
1230                 LASSERTF(page_count == 1 ||
1231                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1232                           ergo(i > 0 && i < page_count - 1,
1233                                poff == 0 && pg->count == PAGE_SIZE)   &&
1234                           ergo(i == page_count - 1, poff == 0)),
1235                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1236                          i, page_count, pg, pg->off, pg->count);
1237                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1238                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1239                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1240                          i, page_count,
1241                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1242                          pg_prev->pg, page_private(pg_prev->pg),
1243                          pg_prev->pg->index, pg_prev->off);
1244                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1245                         (pg->flag & OBD_BRW_SRVLOCK));
1246                 if (short_io_size != 0 && opc == OST_WRITE) {
1247                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1248
1249                         LASSERT(short_io_size >= requested_nob + pg->count);
1250                         memcpy(short_io_buf + requested_nob,
1251                                ptr + poff,
1252                                pg->count);
1253                         ll_kunmap_atomic(ptr, KM_USER0);
1254                 } else if (short_io_size == 0) {
1255                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1256                                                          pg->count);
1257                 }
1258                 requested_nob += pg->count;
1259
1260                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1261                         niobuf--;
1262                         niobuf->rnb_len += pg->count;
1263                 } else {
1264                         niobuf->rnb_offset = pg->off;
1265                         niobuf->rnb_len    = pg->count;
1266                         niobuf->rnb_flags  = pg->flag;
1267                 }
1268                 pg_prev = pg;
1269         }
1270
1271         LASSERTF((void *)(niobuf - niocount) ==
1272                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1273                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1274                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1275
1276         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1277         if (resend) {
1278                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1279                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1280                         body->oa.o_flags = 0;
1281                 }
1282                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1283         }
1284
1285         if (osc_should_shrink_grant(cli))
1286                 osc_shrink_grant_local(cli, &body->oa);
1287
1288         /* size[REQ_REC_OFF] still sizeof (*body) */
1289         if (opc == OST_WRITE) {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         /* store cl_cksum_type in a local variable since
1293                          * it can be changed via lprocfs */
1294                         enum cksum_types cksum_type = cli->cl_cksum_type;
1295
1296                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1297                                 body->oa.o_flags = 0;
1298
1299                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1300                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1301
1302                         rc = osc_checksum_bulk(requested_nob, page_count,
1303                                                pga, OST_WRITE, cksum_type,
1304                                                &body->oa.o_cksum);
1305                         if (rc < 0) {
1306                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1307                                        rc);
1308                                 GOTO(out, rc);
1309                         }
1310                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1311                                body->oa.o_cksum);
1312
1313                         /* save this in 'oa', too, for later checking */
1314                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1315                         oa->o_flags |= cksum_type_pack(cksum_type);
1316                 } else {
1317                         /* clear out the checksum flag, in case this is a
1318                          * resend but cl_checksum is no longer set. b=11238 */
1319                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1320                 }
1321                 oa->o_cksum = body->oa.o_cksum;
1322                 /* 1 RC per niobuf */
1323                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1324                                      sizeof(__u32) * niocount);
1325         } else {
1326                 if (cli->cl_checksum &&
1327                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1328                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1329                                 body->oa.o_flags = 0;
1330                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1331                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1332                 }
1333
1334                 /* Client cksum has been already copied to wire obdo in previous
1335                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1336                  * resent due to cksum error, this will allow Server to
1337                  * check+dump pages on its side */
1338         }
1339         ptlrpc_request_set_replen(req);
1340
1341         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1342         aa = ptlrpc_req_async_args(req);
1343         aa->aa_oa = oa;
1344         aa->aa_requested_nob = requested_nob;
1345         aa->aa_nio_count = niocount;
1346         aa->aa_page_count = page_count;
1347         aa->aa_resends = 0;
1348         aa->aa_ppga = pga;
1349         aa->aa_cli = cli;
1350         INIT_LIST_HEAD(&aa->aa_oaps);
1351
1352         *reqp = req;
1353         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1354         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1355                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1356                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1357         RETURN(0);
1358
1359  out:
1360         ptlrpc_req_finished(req);
1361         RETURN(rc);
1362 }
1363
1364 char dbgcksum_file_name[PATH_MAX];
1365
1366 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1367                                 struct brw_page **pga, __u32 server_cksum,
1368                                 __u32 client_cksum)
1369 {
1370         struct file *filp;
1371         int rc, i;
1372         unsigned int len;
1373         char *buf;
1374
1375         /* will only keep dump of pages on first error for the same range in
1376          * file/fid, not during the resends/retries. */
1377         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1378                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1379                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1380                   libcfs_debug_file_path_arr :
1381                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1382                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1383                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1384                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1385                  pga[0]->off,
1386                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1387                  client_cksum, server_cksum);
1388         filp = filp_open(dbgcksum_file_name,
1389                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1390         if (IS_ERR(filp)) {
1391                 rc = PTR_ERR(filp);
1392                 if (rc == -EEXIST)
1393                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1394                                "checksum error: rc = %d\n", dbgcksum_file_name,
1395                                rc);
1396                 else
1397                         CERROR("%s: can't open to dump pages with checksum "
1398                                "error: rc = %d\n", dbgcksum_file_name, rc);
1399                 return;
1400         }
1401
1402         for (i = 0; i < page_count; i++) {
1403                 len = pga[i]->count;
1404                 buf = kmap(pga[i]->pg);
1405                 while (len != 0) {
1406                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1407                         if (rc < 0) {
1408                                 CERROR("%s: wanted to write %u but got %d "
1409                                        "error\n", dbgcksum_file_name, len, rc);
1410                                 break;
1411                         }
1412                         len -= rc;
1413                         buf += rc;
1414                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1415                                dbgcksum_file_name, rc);
1416                 }
1417                 kunmap(pga[i]->pg);
1418         }
1419
1420         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1421         if (rc)
1422                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1423         filp_close(filp, NULL);
1424         return;
1425 }
1426
1427 static int
1428 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1429                                 __u32 client_cksum, __u32 server_cksum,
1430                                 struct osc_brw_async_args *aa)
1431 {
1432         __u32 new_cksum;
1433         char *msg;
1434         enum cksum_types cksum_type;
1435         int rc;
1436
1437         if (server_cksum == client_cksum) {
1438                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1439                 return 0;
1440         }
1441
1442         if (aa->aa_cli->cl_checksum_dump)
1443                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1444                                     server_cksum, client_cksum);
1445
1446         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1447                                        oa->o_flags : 0);
1448         rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1449                                aa->aa_ppga, OST_WRITE, cksum_type,
1450                                &new_cksum);
1451
1452         if (rc < 0)
1453                 msg = "failed to calculate the client write checksum";
1454         else if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1455                 msg = "the server did not use the checksum type specified in "
1456                       "the original request - likely a protocol problem";
1457         else if (new_cksum == server_cksum)
1458                 msg = "changed on the client after we checksummed it - "
1459                       "likely false positive due to mmap IO (bug 11742)";
1460         else if (new_cksum == client_cksum)
1461                 msg = "changed in transit before arrival at OST";
1462         else
1463                 msg = "changed in transit AND doesn't match the original - "
1464                       "likely false positive due to mmap IO (bug 11742)";
1465
1466         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1467                            DFID " object "DOSTID" extent [%llu-%llu], original "
1468                            "client csum %x (type %x), server csum %x (type %x),"
1469                            " client csum now %x\n",
1470                            aa->aa_cli->cl_import->imp_obd->obd_name,
1471                            msg, libcfs_nid2str(peer->nid),
1472                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1473                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1474                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1475                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1476                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1477                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1478                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1479                            server_cksum, cksum_type, new_cksum);
1480         return 1;
1481 }
1482
1483 /* Note rc enters this function as number of bytes transferred */
1484 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1485 {
1486         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1487         const struct lnet_process_id *peer =
1488                         &req->rq_import->imp_connection->c_peer;
1489         struct client_obd *cli = aa->aa_cli;
1490         struct ost_body *body;
1491         u32 client_cksum = 0;
1492         ENTRY;
1493
1494         if (rc < 0 && rc != -EDQUOT) {
1495                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1496                 RETURN(rc);
1497         }
1498
1499         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1500         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1501         if (body == NULL) {
1502                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1503                 RETURN(-EPROTO);
1504         }
1505
1506         /* set/clear over quota flag for a uid/gid/projid */
1507         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1508             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1509                 unsigned qid[LL_MAXQUOTAS] = {
1510                                          body->oa.o_uid, body->oa.o_gid,
1511                                          body->oa.o_projid };
1512                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1513                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1514                        body->oa.o_valid, body->oa.o_flags);
1515                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1516                                        body->oa.o_flags);
1517         }
1518
1519         osc_update_grant(cli, body);
1520
1521         if (rc < 0)
1522                 RETURN(rc);
1523
1524         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1525                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1526
1527         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1528                 if (rc > 0) {
1529                         CERROR("Unexpected +ve rc %d\n", rc);
1530                         RETURN(-EPROTO);
1531                 }
1532
1533                 if (req->rq_bulk != NULL &&
1534                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1535                         RETURN(-EAGAIN);
1536
1537                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1538                     check_write_checksum(&body->oa, peer, client_cksum,
1539                                          body->oa.o_cksum, aa))
1540                         RETURN(-EAGAIN);
1541
1542                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1543                                      aa->aa_page_count, aa->aa_ppga);
1544                 GOTO(out, rc);
1545         }
1546
1547         /* The rest of this function executes only for OST_READs */
1548
1549         if (req->rq_bulk == NULL) {
1550                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1551                                           RCL_SERVER);
1552                 LASSERT(rc == req->rq_status);
1553         } else {
1554                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1555                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1556         }
1557         if (rc < 0)
1558                 GOTO(out, rc = -EAGAIN);
1559
1560         if (rc > aa->aa_requested_nob) {
1561                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1562                        aa->aa_requested_nob);
1563                 RETURN(-EPROTO);
1564         }
1565
1566         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1567                 CERROR ("Unexpected rc %d (%d transferred)\n",
1568                         rc, req->rq_bulk->bd_nob_transferred);
1569                 return (-EPROTO);
1570         }
1571
1572         if (req->rq_bulk == NULL) {
1573                 /* short io */
1574                 int nob, pg_count, i = 0;
1575                 unsigned char *buf;
1576
1577                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1578                 pg_count = aa->aa_page_count;
1579                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1580                                                    rc);
1581                 nob = rc;
1582                 while (nob > 0 && pg_count > 0) {
1583                         unsigned char *ptr;
1584                         int count = aa->aa_ppga[i]->count > nob ?
1585                                     nob : aa->aa_ppga[i]->count;
1586
1587                         CDEBUG(D_CACHE, "page %p count %d\n",
1588                                aa->aa_ppga[i]->pg, count);
1589                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1590                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1591                                count);
1592                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1593
1594                         buf += count;
1595                         nob -= count;
1596                         i++;
1597                         pg_count--;
1598                 }
1599         }
1600
1601         if (rc < aa->aa_requested_nob)
1602                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1603
1604         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1605                 static int cksum_counter;
1606                 u32        server_cksum = body->oa.o_cksum;
1607                 char      *via = "";
1608                 char      *router = "";
1609                 enum cksum_types cksum_type;
1610
1611                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1612                                                body->oa.o_flags : 0);
1613                 rc = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga,
1614                                        OST_READ, cksum_type, &client_cksum);
1615                 if (rc < 0) {
1616                         CDEBUG(D_PAGE,
1617                                "failed to calculate checksum, rc = %d\n", rc);
1618                         GOTO(out, rc);
1619                 }
1620                 if (req->rq_bulk != NULL &&
1621                     peer->nid != req->rq_bulk->bd_sender) {
1622                         via = " via ";
1623                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1624                 }
1625
1626                 if (server_cksum != client_cksum) {
1627                         struct ost_body *clbody;
1628                         u32 page_count = aa->aa_page_count;
1629
1630                         clbody = req_capsule_client_get(&req->rq_pill,
1631                                                         &RMF_OST_BODY);
1632                         if (cli->cl_checksum_dump)
1633                                 dump_all_bulk_pages(&clbody->oa, page_count,
1634                                                     aa->aa_ppga, server_cksum,
1635                                                     client_cksum);
1636
1637                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1638                                            "%s%s%s inode "DFID" object "DOSTID
1639                                            " extent [%llu-%llu], client %x, "
1640                                            "server %x, cksum_type %x\n",
1641                                            req->rq_import->imp_obd->obd_name,
1642                                            libcfs_nid2str(peer->nid),
1643                                            via, router,
1644                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1645                                                 clbody->oa.o_parent_seq : 0ULL,
1646                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1647                                                 clbody->oa.o_parent_oid : 0,
1648                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1649                                                 clbody->oa.o_parent_ver : 0,
1650                                            POSTID(&body->oa.o_oi),
1651                                            aa->aa_ppga[0]->off,
1652                                            aa->aa_ppga[page_count-1]->off +
1653                                            aa->aa_ppga[page_count-1]->count - 1,
1654                                            client_cksum, server_cksum,
1655                                            cksum_type);
1656                         cksum_counter = 0;
1657                         aa->aa_oa->o_cksum = client_cksum;
1658                         rc = -EAGAIN;
1659                 } else {
1660                         cksum_counter++;
1661                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1662                         rc = 0;
1663                 }
1664         } else if (unlikely(client_cksum)) {
1665                 static int cksum_missed;
1666
1667                 cksum_missed++;
1668                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1669                         CERROR("Checksum %u requested from %s but not sent\n",
1670                                cksum_missed, libcfs_nid2str(peer->nid));
1671         } else {
1672                 rc = 0;
1673         }
1674 out:
1675         if (rc >= 0)
1676                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1677                                      aa->aa_oa, &body->oa);
1678
1679         RETURN(rc);
1680 }
1681
1682 static int osc_brw_redo_request(struct ptlrpc_request *request,
1683                                 struct osc_brw_async_args *aa, int rc)
1684 {
1685         struct ptlrpc_request *new_req;
1686         struct osc_brw_async_args *new_aa;
1687         struct osc_async_page *oap;
1688         ENTRY;
1689
1690         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1691                   "redo for recoverable error %d", rc);
1692
1693         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1694                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1695                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1696                                   aa->aa_ppga, &new_req, 1);
1697         if (rc)
1698                 RETURN(rc);
1699
1700         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1701                 if (oap->oap_request != NULL) {
1702                         LASSERTF(request == oap->oap_request,
1703                                  "request %p != oap_request %p\n",
1704                                  request, oap->oap_request);
1705                         if (oap->oap_interrupted) {
1706                                 ptlrpc_req_finished(new_req);
1707                                 RETURN(-EINTR);
1708                         }
1709                 }
1710         }
1711         /* New request takes over pga and oaps from old request.
1712          * Note that copying a list_head doesn't work, need to move it... */
1713         aa->aa_resends++;
1714         new_req->rq_interpret_reply = request->rq_interpret_reply;
1715         new_req->rq_async_args = request->rq_async_args;
1716         new_req->rq_commit_cb = request->rq_commit_cb;
1717         /* cap resend delay to the current request timeout, this is similar to
1718          * what ptlrpc does (see after_reply()) */
1719         if (aa->aa_resends > new_req->rq_timeout)
1720                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1721         else
1722                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1723         new_req->rq_generation_set = 1;
1724         new_req->rq_import_generation = request->rq_import_generation;
1725
1726         new_aa = ptlrpc_req_async_args(new_req);
1727
1728         INIT_LIST_HEAD(&new_aa->aa_oaps);
1729         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1730         INIT_LIST_HEAD(&new_aa->aa_exts);
1731         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1732         new_aa->aa_resends = aa->aa_resends;
1733
1734         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1735                 if (oap->oap_request) {
1736                         ptlrpc_req_finished(oap->oap_request);
1737                         oap->oap_request = ptlrpc_request_addref(new_req);
1738                 }
1739         }
1740
1741         /* XXX: This code will run into problem if we're going to support
1742          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1743          * and wait for all of them to be finished. We should inherit request
1744          * set from old request. */
1745         ptlrpcd_add_req(new_req);
1746
1747         DEBUG_REQ(D_INFO, new_req, "new request");
1748         RETURN(0);
1749 }
1750
1751 /*
1752  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1753  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1754  * fine for our small page arrays and doesn't require allocation.  its an
1755  * insertion sort that swaps elements that are strides apart, shrinking the
1756  * stride down until its '1' and the array is sorted.
1757  */
1758 static void sort_brw_pages(struct brw_page **array, int num)
1759 {
1760         int stride, i, j;
1761         struct brw_page *tmp;
1762
1763         if (num == 1)
1764                 return;
1765         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1766                 ;
1767
1768         do {
1769                 stride /= 3;
1770                 for (i = stride ; i < num ; i++) {
1771                         tmp = array[i];
1772                         j = i;
1773                         while (j >= stride && array[j - stride]->off > tmp->off) {
1774                                 array[j] = array[j - stride];
1775                                 j -= stride;
1776                         }
1777                         array[j] = tmp;
1778                 }
1779         } while (stride > 1);
1780 }
1781
1782 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1783 {
1784         LASSERT(ppga != NULL);
1785         OBD_FREE(ppga, sizeof(*ppga) * count);
1786 }
1787
1788 static int brw_interpret(const struct lu_env *env,
1789                          struct ptlrpc_request *req, void *data, int rc)
1790 {
1791         struct osc_brw_async_args *aa = data;
1792         struct osc_extent *ext;
1793         struct osc_extent *tmp;
1794         struct client_obd *cli = aa->aa_cli;
1795         unsigned long           transferred = 0;
1796         ENTRY;
1797
1798         rc = osc_brw_fini_request(req, rc);
1799         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1800         /* When server return -EINPROGRESS, client should always retry
1801          * regardless of the number of times the bulk was resent already. */
1802         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1803                 if (req->rq_import_generation !=
1804                     req->rq_import->imp_generation) {
1805                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1806                                ""DOSTID", rc = %d.\n",
1807                                req->rq_import->imp_obd->obd_name,
1808                                POSTID(&aa->aa_oa->o_oi), rc);
1809                 } else if (rc == -EINPROGRESS ||
1810                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1811                         rc = osc_brw_redo_request(req, aa, rc);
1812                 } else {
1813                         CERROR("%s: too many resent retries for object: "
1814                                "%llu:%llu, rc = %d.\n",
1815                                req->rq_import->imp_obd->obd_name,
1816                                POSTID(&aa->aa_oa->o_oi), rc);
1817                 }
1818
1819                 if (rc == 0)
1820                         RETURN(0);
1821                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1822                         rc = -EIO;
1823         }
1824
1825         if (rc == 0) {
1826                 struct obdo *oa = aa->aa_oa;
1827                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1828                 unsigned long valid = 0;
1829                 struct cl_object *obj;
1830                 struct osc_async_page *last;
1831
1832                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1833                 obj = osc2cl(last->oap_obj);
1834
1835                 cl_object_attr_lock(obj);
1836                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1837                         attr->cat_blocks = oa->o_blocks;
1838                         valid |= CAT_BLOCKS;
1839                 }
1840                 if (oa->o_valid & OBD_MD_FLMTIME) {
1841                         attr->cat_mtime = oa->o_mtime;
1842                         valid |= CAT_MTIME;
1843                 }
1844                 if (oa->o_valid & OBD_MD_FLATIME) {
1845                         attr->cat_atime = oa->o_atime;
1846                         valid |= CAT_ATIME;
1847                 }
1848                 if (oa->o_valid & OBD_MD_FLCTIME) {
1849                         attr->cat_ctime = oa->o_ctime;
1850                         valid |= CAT_CTIME;
1851                 }
1852
1853                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1854                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1855                         loff_t last_off = last->oap_count + last->oap_obj_off +
1856                                 last->oap_page_off;
1857
1858                         /* Change file size if this is an out of quota or
1859                          * direct IO write and it extends the file size */
1860                         if (loi->loi_lvb.lvb_size < last_off) {
1861                                 attr->cat_size = last_off;
1862                                 valid |= CAT_SIZE;
1863                         }
1864                         /* Extend KMS if it's not a lockless write */
1865                         if (loi->loi_kms < last_off &&
1866                             oap2osc_page(last)->ops_srvlock == 0) {
1867                                 attr->cat_kms = last_off;
1868                                 valid |= CAT_KMS;
1869                         }
1870                 }
1871
1872                 if (valid != 0)
1873                         cl_object_attr_update(env, obj, attr, valid);
1874                 cl_object_attr_unlock(obj);
1875         }
1876         OBDO_FREE(aa->aa_oa);
1877
1878         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1879                 osc_inc_unstable_pages(req);
1880
1881         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1882                 list_del_init(&ext->oe_link);
1883                 osc_extent_finish(env, ext, 1,
1884                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
1885         }
1886         LASSERT(list_empty(&aa->aa_exts));
1887         LASSERT(list_empty(&aa->aa_oaps));
1888
1889         transferred = (req->rq_bulk == NULL ? /* short io */
1890                        aa->aa_requested_nob :
1891                        req->rq_bulk->bd_nob_transferred);
1892
1893         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1894         ptlrpc_lprocfs_brw(req, transferred);
1895
1896         spin_lock(&cli->cl_loi_list_lock);
1897         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1898          * is called so we know whether to go to sync BRWs or wait for more
1899          * RPCs to complete */
1900         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1901                 cli->cl_w_in_flight--;
1902         else
1903                 cli->cl_r_in_flight--;
1904         osc_wake_cache_waiters(cli);
1905         spin_unlock(&cli->cl_loi_list_lock);
1906
1907         osc_io_unplug(env, cli, NULL);
1908         RETURN(rc);
1909 }
1910
1911 static void brw_commit(struct ptlrpc_request *req)
1912 {
1913         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1914          * this called via the rq_commit_cb, I need to ensure
1915          * osc_dec_unstable_pages is still called. Otherwise unstable
1916          * pages may be leaked. */
1917         spin_lock(&req->rq_lock);
1918         if (likely(req->rq_unstable)) {
1919                 req->rq_unstable = 0;
1920                 spin_unlock(&req->rq_lock);
1921
1922                 osc_dec_unstable_pages(req);
1923         } else {
1924                 req->rq_committed = 1;
1925                 spin_unlock(&req->rq_lock);
1926         }
1927 }
1928
1929 /**
1930  * Build an RPC by the list of extent @ext_list. The caller must ensure
1931  * that the total pages in this list are NOT over max pages per RPC.
1932  * Extents in the list must be in OES_RPC state.
1933  */
1934 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1935                   struct list_head *ext_list, int cmd)
1936 {
1937         struct ptlrpc_request           *req = NULL;
1938         struct osc_extent               *ext;
1939         struct brw_page                 **pga = NULL;
1940         struct osc_brw_async_args       *aa = NULL;
1941         struct obdo                     *oa = NULL;
1942         struct osc_async_page           *oap;
1943         struct osc_object               *obj = NULL;
1944         struct cl_req_attr              *crattr = NULL;
1945         loff_t                          starting_offset = OBD_OBJECT_EOF;
1946         loff_t                          ending_offset = 0;
1947         int                             mpflag = 0;
1948         int                             mem_tight = 0;
1949         int                             page_count = 0;
1950         bool                            soft_sync = false;
1951         bool                            interrupted = false;
1952         bool                            ndelay = false;
1953         int                             i;
1954         int                             grant = 0;
1955         int                             rc;
1956         __u32                           layout_version = 0;
1957         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1958         struct ost_body                 *body;
1959         ENTRY;
1960         LASSERT(!list_empty(ext_list));
1961
1962         /* add pages into rpc_list to build BRW rpc */
1963         list_for_each_entry(ext, ext_list, oe_link) {
1964                 LASSERT(ext->oe_state == OES_RPC);
1965                 mem_tight |= ext->oe_memalloc;
1966                 grant += ext->oe_grants;
1967                 page_count += ext->oe_nr_pages;
1968                 layout_version = MAX(layout_version, ext->oe_layout_version);
1969                 if (obj == NULL)
1970                         obj = ext->oe_obj;
1971         }
1972
1973         soft_sync = osc_over_unstable_soft_limit(cli);
1974         if (mem_tight)
1975                 mpflag = cfs_memory_pressure_get_and_set();
1976
1977         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1978         if (pga == NULL)
1979                 GOTO(out, rc = -ENOMEM);
1980
1981         OBDO_ALLOC(oa);
1982         if (oa == NULL)
1983                 GOTO(out, rc = -ENOMEM);
1984
1985         i = 0;
1986         list_for_each_entry(ext, ext_list, oe_link) {
1987                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1988                         if (mem_tight)
1989                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1990                         if (soft_sync)
1991                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1992                         pga[i] = &oap->oap_brw_page;
1993                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1994                         i++;
1995
1996                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1997                         if (starting_offset == OBD_OBJECT_EOF ||
1998                             starting_offset > oap->oap_obj_off)
1999                                 starting_offset = oap->oap_obj_off;
2000                         else
2001                                 LASSERT(oap->oap_page_off == 0);
2002                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2003                                 ending_offset = oap->oap_obj_off +
2004                                                 oap->oap_count;
2005                         else
2006                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2007                                         PAGE_SIZE);
2008                         if (oap->oap_interrupted)
2009                                 interrupted = true;
2010                 }
2011                 if (ext->oe_ndelay)
2012                         ndelay = true;
2013         }
2014
2015         /* first page in the list */
2016         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2017
2018         crattr = &osc_env_info(env)->oti_req_attr;
2019         memset(crattr, 0, sizeof(*crattr));
2020         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2021         crattr->cra_flags = ~0ULL;
2022         crattr->cra_page = oap2cl_page(oap);
2023         crattr->cra_oa = oa;
2024         cl_req_attr_set(env, osc2cl(obj), crattr);
2025
2026         if (cmd == OBD_BRW_WRITE) {
2027                 oa->o_grant_used = grant;
2028                 if (layout_version > 0) {
2029                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2030                                PFID(&oa->o_oi.oi_fid), layout_version);
2031
2032                         oa->o_layout_version = layout_version;
2033                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2034                 }
2035         }
2036
2037         sort_brw_pages(pga, page_count);
2038         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2039         if (rc != 0) {
2040                 CERROR("prep_req failed: %d\n", rc);
2041                 GOTO(out, rc);
2042         }
2043
2044         req->rq_commit_cb = brw_commit;
2045         req->rq_interpret_reply = brw_interpret;
2046         req->rq_memalloc = mem_tight != 0;
2047         oap->oap_request = ptlrpc_request_addref(req);
2048         if (interrupted && !req->rq_intr)
2049                 ptlrpc_mark_interrupted(req);
2050         if (ndelay) {
2051                 req->rq_no_resend = req->rq_no_delay = 1;
2052                 /* probably set a shorter timeout value.
2053                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2054                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2055         }
2056
2057         /* Need to update the timestamps after the request is built in case
2058          * we race with setattr (locally or in queue at OST).  If OST gets
2059          * later setattr before earlier BRW (as determined by the request xid),
2060          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2061          * way to do this in a single call.  bug 10150 */
2062         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2063         crattr->cra_oa = &body->oa;
2064         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2065         cl_req_attr_set(env, osc2cl(obj), crattr);
2066         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2067
2068         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2069         aa = ptlrpc_req_async_args(req);
2070         INIT_LIST_HEAD(&aa->aa_oaps);
2071         list_splice_init(&rpc_list, &aa->aa_oaps);
2072         INIT_LIST_HEAD(&aa->aa_exts);
2073         list_splice_init(ext_list, &aa->aa_exts);
2074
2075         spin_lock(&cli->cl_loi_list_lock);
2076         starting_offset >>= PAGE_SHIFT;
2077         if (cmd == OBD_BRW_READ) {
2078                 cli->cl_r_in_flight++;
2079                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2080                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2081                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2082                                       starting_offset + 1);
2083         } else {
2084                 cli->cl_w_in_flight++;
2085                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2086                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2087                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2088                                       starting_offset + 1);
2089         }
2090         spin_unlock(&cli->cl_loi_list_lock);
2091
2092         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2093                   page_count, aa, cli->cl_r_in_flight,
2094                   cli->cl_w_in_flight);
2095         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2096
2097         ptlrpcd_add_req(req);
2098         rc = 0;
2099         EXIT;
2100
2101 out:
2102         if (mem_tight != 0)
2103                 cfs_memory_pressure_restore(mpflag);
2104
2105         if (rc != 0) {
2106                 LASSERT(req == NULL);
2107
2108                 if (oa)
2109                         OBDO_FREE(oa);
2110                 if (pga)
2111                         OBD_FREE(pga, sizeof(*pga) * page_count);
2112                 /* this should happen rarely and is pretty bad, it makes the
2113                  * pending list not follow the dirty order */
2114                 while (!list_empty(ext_list)) {
2115                         ext = list_entry(ext_list->next, struct osc_extent,
2116                                          oe_link);
2117                         list_del_init(&ext->oe_link);
2118                         osc_extent_finish(env, ext, 0, rc);
2119                 }
2120         }
2121         RETURN(rc);
2122 }
2123
2124 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2125 {
2126         int set = 0;
2127
2128         LASSERT(lock != NULL);
2129
2130         lock_res_and_lock(lock);
2131
2132         if (lock->l_ast_data == NULL)
2133                 lock->l_ast_data = data;
2134         if (lock->l_ast_data == data)
2135                 set = 1;
2136
2137         unlock_res_and_lock(lock);
2138
2139         return set;
2140 }
2141
2142 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2143                      void *cookie, struct lustre_handle *lockh,
2144                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2145                      int errcode)
2146 {
2147         bool intent = *flags & LDLM_FL_HAS_INTENT;
2148         int rc;
2149         ENTRY;
2150
2151         /* The request was created before ldlm_cli_enqueue call. */
2152         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2153                 struct ldlm_reply *rep;
2154
2155                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2156                 LASSERT(rep != NULL);
2157
2158                 rep->lock_policy_res1 =
2159                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2160                 if (rep->lock_policy_res1)
2161                         errcode = rep->lock_policy_res1;
2162                 if (!speculative)
2163                         *flags |= LDLM_FL_LVB_READY;
2164         } else if (errcode == ELDLM_OK) {
2165                 *flags |= LDLM_FL_LVB_READY;
2166         }
2167
2168         /* Call the update callback. */
2169         rc = (*upcall)(cookie, lockh, errcode);
2170
2171         /* release the reference taken in ldlm_cli_enqueue() */
2172         if (errcode == ELDLM_LOCK_MATCHED)
2173                 errcode = ELDLM_OK;
2174         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2175                 ldlm_lock_decref(lockh, mode);
2176
2177         RETURN(rc);
2178 }
2179
2180 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2181                           struct osc_enqueue_args *aa, int rc)
2182 {
2183         struct ldlm_lock *lock;
2184         struct lustre_handle *lockh = &aa->oa_lockh;
2185         enum ldlm_mode mode = aa->oa_mode;
2186         struct ost_lvb *lvb = aa->oa_lvb;
2187         __u32 lvb_len = sizeof(*lvb);
2188         __u64 flags = 0;
2189
2190         ENTRY;
2191
2192         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193          * be valid. */
2194         lock = ldlm_handle2lock(lockh);
2195         LASSERTF(lock != NULL,
2196                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2197                  lockh->cookie, req, aa);
2198
2199         /* Take an additional reference so that a blocking AST that
2200          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2201          * to arrive after an upcall has been executed by
2202          * osc_enqueue_fini(). */
2203         ldlm_lock_addref(lockh, mode);
2204
2205         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2206         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2207
2208         /* Let CP AST to grant the lock first. */
2209         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2210
2211         if (aa->oa_speculative) {
2212                 LASSERT(aa->oa_lvb == NULL);
2213                 LASSERT(aa->oa_flags == NULL);
2214                 aa->oa_flags = &flags;
2215         }
2216
2217         /* Complete obtaining the lock procedure. */
2218         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2219                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2220                                    lockh, rc);
2221         /* Complete osc stuff. */
2222         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2223                               aa->oa_flags, aa->oa_speculative, rc);
2224
2225         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2226
2227         ldlm_lock_decref(lockh, mode);
2228         LDLM_LOCK_PUT(lock);
2229         RETURN(rc);
2230 }
2231
2232 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2233
2234 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2235  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2236  * other synchronous requests, however keeping some locks and trying to obtain
2237  * others may take a considerable amount of time in a case of ost failure; and
2238  * when other sync requests do not get released lock from a client, the client
2239  * is evicted from the cluster -- such scenarious make the life difficult, so
2240  * release locks just after they are obtained. */
2241 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2242                      __u64 *flags, union ldlm_policy_data *policy,
2243                      struct ost_lvb *lvb, int kms_valid,
2244                      osc_enqueue_upcall_f upcall, void *cookie,
2245                      struct ldlm_enqueue_info *einfo,
2246                      struct ptlrpc_request_set *rqset, int async,
2247                      bool speculative)
2248 {
2249         struct obd_device *obd = exp->exp_obd;
2250         struct lustre_handle lockh = { 0 };
2251         struct ptlrpc_request *req = NULL;
2252         int intent = *flags & LDLM_FL_HAS_INTENT;
2253         __u64 match_flags = *flags;
2254         enum ldlm_mode mode;
2255         int rc;
2256         ENTRY;
2257
2258         /* Filesystem lock extents are extended to page boundaries so that
2259          * dealing with the page cache is a little smoother.  */
2260         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2261         policy->l_extent.end |= ~PAGE_MASK;
2262
2263         /*
2264          * kms is not valid when either object is completely fresh (so that no
2265          * locks are cached), or object was evicted. In the latter case cached
2266          * lock cannot be used, because it would prime inode state with
2267          * potentially stale LVB.
2268          */
2269         if (!kms_valid)
2270                 goto no_match;
2271
2272         /* Next, search for already existing extent locks that will cover us */
2273         /* If we're trying to read, we also search for an existing PW lock.  The
2274          * VFS and page cache already protect us locally, so lots of readers/
2275          * writers can share a single PW lock.
2276          *
2277          * There are problems with conversion deadlocks, so instead of
2278          * converting a read lock to a write lock, we'll just enqueue a new
2279          * one.
2280          *
2281          * At some point we should cancel the read lock instead of making them
2282          * send us a blocking callback, but there are problems with canceling
2283          * locks out from other users right now, too. */
2284         mode = einfo->ei_mode;
2285         if (einfo->ei_mode == LCK_PR)
2286                 mode |= LCK_PW;
2287         /* Normal lock requests must wait for the LVB to be ready before
2288          * matching a lock; speculative lock requests do not need to,
2289          * because they will not actually use the lock. */
2290         if (!speculative)
2291                 match_flags |= LDLM_FL_LVB_READY;
2292         if (intent != 0)
2293                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2294         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2295                                einfo->ei_type, policy, mode, &lockh, 0);
2296         if (mode) {
2297                 struct ldlm_lock *matched;
2298
2299                 if (*flags & LDLM_FL_TEST_LOCK)
2300                         RETURN(ELDLM_OK);
2301
2302                 matched = ldlm_handle2lock(&lockh);
2303                 if (speculative) {
2304                         /* This DLM lock request is speculative, and does not
2305                          * have an associated IO request. Therefore if there
2306                          * is already a DLM lock, it wll just inform the
2307                          * caller to cancel the request for this stripe.*/
2308                         lock_res_and_lock(matched);
2309                         if (ldlm_extent_equal(&policy->l_extent,
2310                             &matched->l_policy_data.l_extent))
2311                                 rc = -EEXIST;
2312                         else
2313                                 rc = -ECANCELED;
2314                         unlock_res_and_lock(matched);
2315
2316                         ldlm_lock_decref(&lockh, mode);
2317                         LDLM_LOCK_PUT(matched);
2318                         RETURN(rc);
2319                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2320                         *flags |= LDLM_FL_LVB_READY;
2321
2322                         /* We already have a lock, and it's referenced. */
2323                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2324
2325                         ldlm_lock_decref(&lockh, mode);
2326                         LDLM_LOCK_PUT(matched);
2327                         RETURN(ELDLM_OK);
2328                 } else {
2329                         ldlm_lock_decref(&lockh, mode);
2330                         LDLM_LOCK_PUT(matched);
2331                 }
2332         }
2333
2334 no_match:
2335         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2336                 RETURN(-ENOLCK);
2337
2338         if (intent) {
2339                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2340                                            &RQF_LDLM_ENQUEUE_LVB);
2341                 if (req == NULL)
2342                         RETURN(-ENOMEM);
2343
2344                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2345                 if (rc) {
2346                         ptlrpc_request_free(req);
2347                         RETURN(rc);
2348                 }
2349
2350                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2351                                      sizeof *lvb);
2352                 ptlrpc_request_set_replen(req);
2353         }
2354
2355         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2356         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2357
2358         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2359                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2360         if (async) {
2361                 if (!rc) {
2362                         struct osc_enqueue_args *aa;
2363                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2364                         aa = ptlrpc_req_async_args(req);
2365                         aa->oa_exp         = exp;
2366                         aa->oa_mode        = einfo->ei_mode;
2367                         aa->oa_type        = einfo->ei_type;
2368                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2369                         aa->oa_upcall      = upcall;
2370                         aa->oa_cookie      = cookie;
2371                         aa->oa_speculative = speculative;
2372                         if (!speculative) {
2373                                 aa->oa_flags  = flags;
2374                                 aa->oa_lvb    = lvb;
2375                         } else {
2376                                 /* speculative locks are essentially to enqueue
2377                                  * a DLM lock  in advance, so we don't care
2378                                  * about the result of the enqueue. */
2379                                 aa->oa_lvb    = NULL;
2380                                 aa->oa_flags  = NULL;
2381                         }
2382
2383                         req->rq_interpret_reply =
2384                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2385                         if (rqset == PTLRPCD_SET)
2386                                 ptlrpcd_add_req(req);
2387                         else
2388                                 ptlrpc_set_add_req(rqset, req);
2389                 } else if (intent) {
2390                         ptlrpc_req_finished(req);
2391                 }
2392                 RETURN(rc);
2393         }
2394
2395         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2396                               flags, speculative, rc);
2397         if (intent)
2398                 ptlrpc_req_finished(req);
2399
2400         RETURN(rc);
2401 }
2402
2403 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2404                    enum ldlm_type type, union ldlm_policy_data *policy,
2405                    enum ldlm_mode mode, __u64 *flags, void *data,
2406                    struct lustre_handle *lockh, int unref)
2407 {
2408         struct obd_device *obd = exp->exp_obd;
2409         __u64 lflags = *flags;
2410         enum ldlm_mode rc;
2411         ENTRY;
2412
2413         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2414                 RETURN(-EIO);
2415
2416         /* Filesystem lock extents are extended to page boundaries so that
2417          * dealing with the page cache is a little smoother */
2418         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2419         policy->l_extent.end |= ~PAGE_MASK;
2420
2421         /* Next, search for already existing extent locks that will cover us */
2422         /* If we're trying to read, we also search for an existing PW lock.  The
2423          * VFS and page cache already protect us locally, so lots of readers/
2424          * writers can share a single PW lock. */
2425         rc = mode;
2426         if (mode == LCK_PR)
2427                 rc |= LCK_PW;
2428         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2429                              res_id, type, policy, rc, lockh, unref);
2430         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2431                 RETURN(rc);
2432
2433         if (data != NULL) {
2434                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2435
2436                 LASSERT(lock != NULL);
2437                 if (!osc_set_lock_data(lock, data)) {
2438                         ldlm_lock_decref(lockh, rc);
2439                         rc = 0;
2440                 }
2441                 LDLM_LOCK_PUT(lock);
2442         }
2443         RETURN(rc);
2444 }
2445
2446 static int osc_statfs_interpret(const struct lu_env *env,
2447                                 struct ptlrpc_request *req,
2448                                 struct osc_async_args *aa, int rc)
2449 {
2450         struct obd_statfs *msfs;
2451         ENTRY;
2452
2453         if (rc == -EBADR)
2454                 /* The request has in fact never been sent
2455                  * due to issues at a higher level (LOV).
2456                  * Exit immediately since the caller is
2457                  * aware of the problem and takes care
2458                  * of the clean up */
2459                  RETURN(rc);
2460
2461         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2462             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2463                 GOTO(out, rc = 0);
2464
2465         if (rc != 0)
2466                 GOTO(out, rc);
2467
2468         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2469         if (msfs == NULL) {
2470                 GOTO(out, rc = -EPROTO);
2471         }
2472
2473         *aa->aa_oi->oi_osfs = *msfs;
2474 out:
2475         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2476         RETURN(rc);
2477 }
2478
2479 static int osc_statfs_async(struct obd_export *exp,
2480                             struct obd_info *oinfo, time64_t max_age,
2481                             struct ptlrpc_request_set *rqset)
2482 {
2483         struct obd_device     *obd = class_exp2obd(exp);
2484         struct ptlrpc_request *req;
2485         struct osc_async_args *aa;
2486         int                    rc;
2487         ENTRY;
2488
2489         /* We could possibly pass max_age in the request (as an absolute
2490          * timestamp or a "seconds.usec ago") so the target can avoid doing
2491          * extra calls into the filesystem if that isn't necessary (e.g.
2492          * during mount that would help a bit).  Having relative timestamps
2493          * is not so great if request processing is slow, while absolute
2494          * timestamps are not ideal because they need time synchronization. */
2495         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2496         if (req == NULL)
2497                 RETURN(-ENOMEM);
2498
2499         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2500         if (rc) {
2501                 ptlrpc_request_free(req);
2502                 RETURN(rc);
2503         }
2504         ptlrpc_request_set_replen(req);
2505         req->rq_request_portal = OST_CREATE_PORTAL;
2506         ptlrpc_at_set_req_timeout(req);
2507
2508         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2509                 /* procfs requests not want stat in wait for avoid deadlock */
2510                 req->rq_no_resend = 1;
2511                 req->rq_no_delay = 1;
2512         }
2513
2514         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2515         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2516         aa = ptlrpc_req_async_args(req);
2517         aa->aa_oi = oinfo;
2518
2519         ptlrpc_set_add_req(rqset, req);
2520         RETURN(0);
2521 }
2522
2523 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2524                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2525 {
2526         struct obd_device     *obd = class_exp2obd(exp);
2527         struct obd_statfs     *msfs;
2528         struct ptlrpc_request *req;
2529         struct obd_import     *imp = NULL;
2530         int rc;
2531         ENTRY;
2532
2533         /*Since the request might also come from lprocfs, so we need
2534          *sync this with client_disconnect_export Bug15684*/
2535         down_read(&obd->u.cli.cl_sem);
2536         if (obd->u.cli.cl_import)
2537                 imp = class_import_get(obd->u.cli.cl_import);
2538         up_read(&obd->u.cli.cl_sem);
2539         if (!imp)
2540                 RETURN(-ENODEV);
2541
2542         /* We could possibly pass max_age in the request (as an absolute
2543          * timestamp or a "seconds.usec ago") so the target can avoid doing
2544          * extra calls into the filesystem if that isn't necessary (e.g.
2545          * during mount that would help a bit).  Having relative timestamps
2546          * is not so great if request processing is slow, while absolute
2547          * timestamps are not ideal because they need time synchronization. */
2548         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2549
2550         class_import_put(imp);
2551
2552         if (req == NULL)
2553                 RETURN(-ENOMEM);
2554
2555         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2556         if (rc) {
2557                 ptlrpc_request_free(req);
2558                 RETURN(rc);
2559         }
2560         ptlrpc_request_set_replen(req);
2561         req->rq_request_portal = OST_CREATE_PORTAL;
2562         ptlrpc_at_set_req_timeout(req);
2563
2564         if (flags & OBD_STATFS_NODELAY) {
2565                 /* procfs requests not want stat in wait for avoid deadlock */
2566                 req->rq_no_resend = 1;
2567                 req->rq_no_delay = 1;
2568         }
2569
2570         rc = ptlrpc_queue_wait(req);
2571         if (rc)
2572                 GOTO(out, rc);
2573
2574         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2575         if (msfs == NULL) {
2576                 GOTO(out, rc = -EPROTO);
2577         }
2578
2579         *osfs = *msfs;
2580
2581         EXIT;
2582  out:
2583         ptlrpc_req_finished(req);
2584         return rc;
2585 }
2586
2587 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2588                          void *karg, void __user *uarg)
2589 {
2590         struct obd_device *obd = exp->exp_obd;
2591         struct obd_ioctl_data *data = karg;
2592         int err = 0;
2593         ENTRY;
2594
2595         if (!try_module_get(THIS_MODULE)) {
2596                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2597                        module_name(THIS_MODULE));
2598                 return -EINVAL;
2599         }
2600         switch (cmd) {
2601         case OBD_IOC_CLIENT_RECOVER:
2602                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2603                                             data->ioc_inlbuf1, 0);
2604                 if (err > 0)
2605                         err = 0;
2606                 GOTO(out, err);
2607         case IOC_OSC_SET_ACTIVE:
2608                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2609                                                data->ioc_offset);
2610                 GOTO(out, err);
2611         case OBD_IOC_PING_TARGET:
2612                 err = ptlrpc_obd_ping(obd);
2613                 GOTO(out, err);
2614         default:
2615                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2616                        cmd, current_comm());
2617                 GOTO(out, err = -ENOTTY);
2618         }
2619 out:
2620         module_put(THIS_MODULE);
2621         return err;
2622 }
2623
2624 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2625                        u32 keylen, void *key, u32 vallen, void *val,
2626                        struct ptlrpc_request_set *set)
2627 {
2628         struct ptlrpc_request *req;
2629         struct obd_device     *obd = exp->exp_obd;
2630         struct obd_import     *imp = class_exp2cliimp(exp);
2631         char                  *tmp;
2632         int                    rc;
2633         ENTRY;
2634
2635         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2636
2637         if (KEY_IS(KEY_CHECKSUM)) {
2638                 if (vallen != sizeof(int))
2639                         RETURN(-EINVAL);
2640                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2641                 RETURN(0);
2642         }
2643
2644         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2645                 sptlrpc_conf_client_adapt(obd);
2646                 RETURN(0);
2647         }
2648
2649         if (KEY_IS(KEY_FLUSH_CTX)) {
2650                 sptlrpc_import_flush_my_ctx(imp);
2651                 RETURN(0);
2652         }
2653
2654         if (KEY_IS(KEY_CACHE_SET)) {
2655                 struct client_obd *cli = &obd->u.cli;
2656
2657                 LASSERT(cli->cl_cache == NULL); /* only once */
2658                 cli->cl_cache = (struct cl_client_cache *)val;
2659                 cl_cache_incref(cli->cl_cache);
2660                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2661
2662                 /* add this osc into entity list */
2663                 LASSERT(list_empty(&cli->cl_lru_osc));
2664                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2665                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2666                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2667
2668                 RETURN(0);
2669         }
2670
2671         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2672                 struct client_obd *cli = &obd->u.cli;
2673                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2674                 long target = *(long *)val;
2675
2676                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2677                 *(long *)val -= nr;
2678                 RETURN(0);
2679         }
2680
2681         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2682                 RETURN(-EINVAL);
2683
2684         /* We pass all other commands directly to OST. Since nobody calls osc
2685            methods directly and everybody is supposed to go through LOV, we
2686            assume lov checked invalid values for us.
2687            The only recognised values so far are evict_by_nid and mds_conn.
2688            Even if something bad goes through, we'd get a -EINVAL from OST
2689            anyway. */
2690
2691         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2692                                                 &RQF_OST_SET_GRANT_INFO :
2693                                                 &RQF_OBD_SET_INFO);
2694         if (req == NULL)
2695                 RETURN(-ENOMEM);
2696
2697         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2698                              RCL_CLIENT, keylen);
2699         if (!KEY_IS(KEY_GRANT_SHRINK))
2700                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2701                                      RCL_CLIENT, vallen);
2702         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2703         if (rc) {
2704                 ptlrpc_request_free(req);
2705                 RETURN(rc);
2706         }
2707
2708         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2709         memcpy(tmp, key, keylen);
2710         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2711                                                         &RMF_OST_BODY :
2712                                                         &RMF_SETINFO_VAL);
2713         memcpy(tmp, val, vallen);
2714
2715         if (KEY_IS(KEY_GRANT_SHRINK)) {
2716                 struct osc_grant_args *aa;
2717                 struct obdo *oa;
2718
2719                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2720                 aa = ptlrpc_req_async_args(req);
2721                 OBDO_ALLOC(oa);
2722                 if (!oa) {
2723                         ptlrpc_req_finished(req);
2724                         RETURN(-ENOMEM);
2725                 }
2726                 *oa = ((struct ost_body *)val)->oa;
2727                 aa->aa_oa = oa;
2728                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2729         }
2730
2731         ptlrpc_request_set_replen(req);
2732         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2733                 LASSERT(set != NULL);
2734                 ptlrpc_set_add_req(set, req);
2735                 ptlrpc_check_set(NULL, set);
2736         } else {
2737                 ptlrpcd_add_req(req);
2738         }
2739
2740         RETURN(0);
2741 }
2742 EXPORT_SYMBOL(osc_set_info_async);
2743
2744 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2745                   struct obd_device *obd, struct obd_uuid *cluuid,
2746                   struct obd_connect_data *data, void *localdata)
2747 {
2748         struct client_obd *cli = &obd->u.cli;
2749
2750         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2751                 long lost_grant;
2752                 long grant;
2753
2754                 spin_lock(&cli->cl_loi_list_lock);
2755                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2756                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2757                         grant += cli->cl_dirty_grant;
2758                 else
2759                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2760                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2761                 lost_grant = cli->cl_lost_grant;
2762                 cli->cl_lost_grant = 0;
2763                 spin_unlock(&cli->cl_loi_list_lock);
2764
2765                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2766                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2767                        data->ocd_version, data->ocd_grant, lost_grant);
2768         }
2769
2770         RETURN(0);
2771 }
2772 EXPORT_SYMBOL(osc_reconnect);
2773
2774 int osc_disconnect(struct obd_export *exp)
2775 {
2776         struct obd_device *obd = class_exp2obd(exp);
2777         int rc;
2778
2779         rc = client_disconnect_export(exp);
2780         /**
2781          * Initially we put del_shrink_grant before disconnect_export, but it
2782          * causes the following problem if setup (connect) and cleanup
2783          * (disconnect) are tangled together.
2784          *      connect p1                     disconnect p2
2785          *   ptlrpc_connect_import
2786          *     ...............               class_manual_cleanup
2787          *                                     osc_disconnect
2788          *                                     del_shrink_grant
2789          *   ptlrpc_connect_interrupt
2790          *     init_grant_shrink
2791          *   add this client to shrink list
2792          *                                      cleanup_osc
2793          * Bang! pinger trigger the shrink.
2794          * So the osc should be disconnected from the shrink list, after we
2795          * are sure the import has been destroyed. BUG18662
2796          */
2797         if (obd->u.cli.cl_import == NULL)
2798                 osc_del_shrink_grant(&obd->u.cli);
2799         return rc;
2800 }
2801 EXPORT_SYMBOL(osc_disconnect);
2802
2803 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2804                                  struct hlist_node *hnode, void *arg)
2805 {
2806         struct lu_env *env = arg;
2807         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2808         struct ldlm_lock *lock;
2809         struct osc_object *osc = NULL;
2810         ENTRY;
2811
2812         lock_res(res);
2813         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2814                 if (lock->l_ast_data != NULL && osc == NULL) {
2815                         osc = lock->l_ast_data;
2816                         cl_object_get(osc2cl(osc));
2817                 }
2818
2819                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2820                  * by the 2nd round of ldlm_namespace_clean() call in
2821                  * osc_import_event(). */
2822                 ldlm_clear_cleaned(lock);
2823         }
2824         unlock_res(res);
2825
2826         if (osc != NULL) {
2827                 osc_object_invalidate(env, osc);
2828                 cl_object_put(env, osc2cl(osc));
2829         }
2830
2831         RETURN(0);
2832 }
2833 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2834
2835 static int osc_import_event(struct obd_device *obd,
2836                             struct obd_import *imp,
2837                             enum obd_import_event event)
2838 {
2839         struct client_obd *cli;
2840         int rc = 0;
2841
2842         ENTRY;
2843         LASSERT(imp->imp_obd == obd);
2844
2845         switch (event) {
2846         case IMP_EVENT_DISCON: {
2847                 cli = &obd->u.cli;
2848                 spin_lock(&cli->cl_loi_list_lock);
2849                 cli->cl_avail_grant = 0;
2850                 cli->cl_lost_grant = 0;
2851                 spin_unlock(&cli->cl_loi_list_lock);
2852                 break;
2853         }
2854         case IMP_EVENT_INACTIVE: {
2855                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2856                 break;
2857         }
2858         case IMP_EVENT_INVALIDATE: {
2859                 struct ldlm_namespace *ns = obd->obd_namespace;
2860                 struct lu_env         *env;
2861                 __u16                  refcheck;
2862
2863                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2864
2865                 env = cl_env_get(&refcheck);
2866                 if (!IS_ERR(env)) {
2867                         osc_io_unplug(env, &obd->u.cli, NULL);
2868
2869                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2870                                                  osc_ldlm_resource_invalidate,
2871                                                  env, 0);
2872                         cl_env_put(env, &refcheck);
2873
2874                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2875                 } else
2876                         rc = PTR_ERR(env);
2877                 break;
2878         }
2879         case IMP_EVENT_ACTIVE: {
2880                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2881                 break;
2882         }
2883         case IMP_EVENT_OCD: {
2884                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2885
2886                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2887                         osc_init_grant(&obd->u.cli, ocd);
2888
2889                 /* See bug 7198 */
2890                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2891                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2892
2893                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2894                 break;
2895         }
2896         case IMP_EVENT_DEACTIVATE: {
2897                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2898                 break;
2899         }
2900         case IMP_EVENT_ACTIVATE: {
2901                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2902                 break;
2903         }
2904         default:
2905                 CERROR("Unknown import event %d\n", event);
2906                 LBUG();
2907         }
2908         RETURN(rc);
2909 }
2910
2911 /**
2912  * Determine whether the lock can be canceled before replaying the lock
2913  * during recovery, see bug16774 for detailed information.
2914  *
2915  * \retval zero the lock can't be canceled
2916  * \retval other ok to cancel
2917  */
2918 static int osc_cancel_weight(struct ldlm_lock *lock)
2919 {
2920         /*
2921          * Cancel all unused and granted extent lock.
2922          */
2923         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2924             lock->l_granted_mode == lock->l_req_mode &&
2925             osc_ldlm_weigh_ast(lock) == 0)
2926                 RETURN(1);
2927
2928         RETURN(0);
2929 }
2930
2931 static int brw_queue_work(const struct lu_env *env, void *data)
2932 {
2933         struct client_obd *cli = data;
2934
2935         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2936
2937         osc_io_unplug(env, cli, NULL);
2938         RETURN(0);
2939 }
2940
2941 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2942 {
2943         struct client_obd *cli = &obd->u.cli;
2944         void *handler;
2945         int rc;
2946
2947         ENTRY;
2948
2949         rc = ptlrpcd_addref();
2950         if (rc)
2951                 RETURN(rc);
2952
2953         rc = client_obd_setup(obd, lcfg);
2954         if (rc)
2955                 GOTO(out_ptlrpcd, rc);
2956
2957
2958         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2959         if (IS_ERR(handler))
2960                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2961         cli->cl_writeback_work = handler;
2962
2963         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2964         if (IS_ERR(handler))
2965                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2966         cli->cl_lru_work = handler;
2967
2968         rc = osc_quota_setup(obd);
2969         if (rc)
2970                 GOTO(out_ptlrpcd_work, rc);
2971
2972         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2973
2974         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2975         RETURN(rc);
2976
2977 out_ptlrpcd_work:
2978         if (cli->cl_writeback_work != NULL) {
2979                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2980                 cli->cl_writeback_work = NULL;
2981         }
2982         if (cli->cl_lru_work != NULL) {
2983                 ptlrpcd_destroy_work(cli->cl_lru_work);
2984                 cli->cl_lru_work = NULL;
2985         }
2986         client_obd_cleanup(obd);
2987 out_ptlrpcd:
2988         ptlrpcd_decref();
2989         RETURN(rc);
2990 }
2991 EXPORT_SYMBOL(osc_setup_common);
2992
2993 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2994 {
2995         struct client_obd *cli = &obd->u.cli;
2996         struct obd_type   *type;
2997         int                adding;
2998         int                added;
2999         int                req_count;
3000         int                rc;
3001
3002         ENTRY;
3003
3004         rc = osc_setup_common(obd, lcfg);
3005         if (rc < 0)
3006                 RETURN(rc);
3007
3008 #ifdef CONFIG_PROC_FS
3009         obd->obd_vars = lprocfs_osc_obd_vars;
3010 #endif
3011         /* If this is true then both client (osc) and server (osp) are on the
3012          * same node. The osp layer if loaded first will register the osc proc
3013          * directory. In that case this obd_device will be attached its proc
3014          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
3015          */
3016         type = class_search_type(LUSTRE_OSP_NAME);
3017         if (type && type->typ_procsym) {
3018                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
3019                                                        type->typ_procsym,
3020                                                        obd->obd_vars, obd);
3021                 if (IS_ERR(obd->obd_proc_entry)) {
3022                         rc = PTR_ERR(obd->obd_proc_entry);
3023                         CERROR("error %d setting up lprocfs for %s\n", rc,
3024                                obd->obd_name);
3025                         obd->obd_proc_entry = NULL;
3026                 }
3027         }
3028
3029         rc = lprocfs_obd_setup(obd, false);
3030         if (!rc) {
3031                 /* If the basic OSC proc tree construction succeeded then
3032                  * lets do the rest.
3033                  */
3034                 lproc_osc_attach_seqstat(obd);
3035                 sptlrpc_lprocfs_cliobd_attach(obd);
3036                 ptlrpc_lprocfs_register_obd(obd);
3037         }
3038
3039         /*
3040          * We try to control the total number of requests with a upper limit
3041          * osc_reqpool_maxreqcount. There might be some race which will cause
3042          * over-limit allocation, but it is fine.
3043          */
3044         req_count = atomic_read(&osc_pool_req_count);
3045         if (req_count < osc_reqpool_maxreqcount) {
3046                 adding = cli->cl_max_rpcs_in_flight + 2;
3047                 if (req_count + adding > osc_reqpool_maxreqcount)
3048                         adding = osc_reqpool_maxreqcount - req_count;
3049
3050                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3051                 atomic_add(added, &osc_pool_req_count);
3052         }
3053
3054         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3055         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3056
3057         spin_lock(&osc_shrink_lock);
3058         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3059         spin_unlock(&osc_shrink_lock);
3060
3061         RETURN(0);
3062 }
3063
3064 int osc_precleanup_common(struct obd_device *obd)
3065 {
3066         struct client_obd *cli = &obd->u.cli;
3067         ENTRY;
3068
3069         /* LU-464
3070          * for echo client, export may be on zombie list, wait for
3071          * zombie thread to cull it, because cli.cl_import will be
3072          * cleared in client_disconnect_export():
3073          *   class_export_destroy() -> obd_cleanup() ->
3074          *   echo_device_free() -> echo_client_cleanup() ->
3075          *   obd_disconnect() -> osc_disconnect() ->
3076          *   client_disconnect_export()
3077          */
3078         obd_zombie_barrier();
3079         if (cli->cl_writeback_work) {
3080                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3081                 cli->cl_writeback_work = NULL;
3082         }
3083
3084         if (cli->cl_lru_work) {
3085                 ptlrpcd_destroy_work(cli->cl_lru_work);
3086                 cli->cl_lru_work = NULL;
3087         }
3088
3089         obd_cleanup_client_import(obd);
3090         RETURN(0);
3091 }
3092 EXPORT_SYMBOL(osc_precleanup_common);
3093
3094 static int osc_precleanup(struct obd_device *obd)
3095 {
3096         ENTRY;
3097
3098         osc_precleanup_common(obd);
3099
3100         ptlrpc_lprocfs_unregister_obd(obd);
3101         RETURN(0);
3102 }
3103
3104 int osc_cleanup_common(struct obd_device *obd)
3105 {
3106         struct client_obd *cli = &obd->u.cli;
3107         int rc;
3108
3109         ENTRY;
3110
3111         spin_lock(&osc_shrink_lock);
3112         list_del(&cli->cl_shrink_list);
3113         spin_unlock(&osc_shrink_lock);
3114
3115         /* lru cleanup */
3116         if (cli->cl_cache != NULL) {
3117                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3118                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3119                 list_del_init(&cli->cl_lru_osc);
3120                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3121                 cli->cl_lru_left = NULL;
3122                 cl_cache_decref(cli->cl_cache);
3123                 cli->cl_cache = NULL;
3124         }
3125
3126         /* free memory of osc quota cache */
3127         osc_quota_cleanup(obd);
3128
3129         rc = client_obd_cleanup(obd);
3130
3131         ptlrpcd_decref();
3132         RETURN(rc);
3133 }
3134 EXPORT_SYMBOL(osc_cleanup_common);
3135
3136 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3137 {
3138         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3139         return rc > 0 ? 0: rc;
3140 }
3141
3142 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3143 {
3144         return osc_process_config_base(obd, buf);
3145 }
3146
3147 static struct obd_ops osc_obd_ops = {
3148         .o_owner                = THIS_MODULE,
3149         .o_setup                = osc_setup,
3150         .o_precleanup           = osc_precleanup,
3151         .o_cleanup              = osc_cleanup_common,
3152         .o_add_conn             = client_import_add_conn,
3153         .o_del_conn             = client_import_del_conn,
3154         .o_connect              = client_connect_import,
3155         .o_reconnect            = osc_reconnect,
3156         .o_disconnect           = osc_disconnect,
3157         .o_statfs               = osc_statfs,
3158         .o_statfs_async         = osc_statfs_async,
3159         .o_create               = osc_create,
3160         .o_destroy              = osc_destroy,
3161         .o_getattr              = osc_getattr,
3162         .o_setattr              = osc_setattr,
3163         .o_iocontrol            = osc_iocontrol,
3164         .o_set_info_async       = osc_set_info_async,
3165         .o_import_event         = osc_import_event,
3166         .o_process_config       = osc_process_config,
3167         .o_quotactl             = osc_quotactl,
3168 };
3169
3170 static struct shrinker *osc_cache_shrinker;
3171 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3172 DEFINE_SPINLOCK(osc_shrink_lock);
3173
3174 #ifndef HAVE_SHRINKER_COUNT
3175 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3176 {
3177         struct shrink_control scv = {
3178                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3179                 .gfp_mask   = shrink_param(sc, gfp_mask)
3180         };
3181 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3182         struct shrinker *shrinker = NULL;
3183 #endif
3184
3185         (void)osc_cache_shrink_scan(shrinker, &scv);
3186
3187         return osc_cache_shrink_count(shrinker, &scv);
3188 }
3189 #endif
3190
3191 static int __init osc_init(void)
3192 {
3193         bool enable_proc = true;
3194         struct obd_type *type;
3195         unsigned int reqpool_size;
3196         unsigned int reqsize;
3197         int rc;
3198         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3199                          osc_cache_shrink_count, osc_cache_shrink_scan);
3200         ENTRY;
3201
3202         /* print an address of _any_ initialized kernel symbol from this
3203          * module, to allow debugging with gdb that doesn't support data
3204          * symbols from modules.*/
3205         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3206
3207         rc = lu_kmem_init(osc_caches);
3208         if (rc)
3209                 RETURN(rc);
3210
3211         type = class_search_type(LUSTRE_OSP_NAME);
3212         if (type != NULL && type->typ_procsym != NULL)
3213                 enable_proc = false;
3214
3215         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3216                                  LUSTRE_OSC_NAME, &osc_device_type);
3217         if (rc)
3218                 GOTO(out_kmem, rc);
3219
3220         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3221
3222         /* This is obviously too much memory, only prevent overflow here */
3223         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3224                 GOTO(out_type, rc = -EINVAL);
3225
3226         reqpool_size = osc_reqpool_mem_max << 20;
3227
3228         reqsize = 1;
3229         while (reqsize < OST_IO_MAXREQSIZE)
3230                 reqsize = reqsize << 1;
3231
3232         /*
3233          * We don't enlarge the request count in OSC pool according to
3234          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3235          * tried after normal allocation failed. So a small OSC pool won't
3236          * cause much performance degression in most of cases.
3237          */
3238         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3239
3240         atomic_set(&osc_pool_req_count, 0);
3241         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3242                                           ptlrpc_add_rqs_to_pool);
3243
3244         if (osc_rq_pool != NULL)
3245                 GOTO(out, rc);
3246         rc = -ENOMEM;
3247 out_type:
3248         class_unregister_type(LUSTRE_OSC_NAME);
3249 out_kmem:
3250         lu_kmem_fini(osc_caches);
3251 out:
3252         RETURN(rc);
3253 }
3254
3255 static void __exit osc_exit(void)
3256 {
3257         remove_shrinker(osc_cache_shrinker);
3258         class_unregister_type(LUSTRE_OSC_NAME);
3259         lu_kmem_fini(osc_caches);
3260         ptlrpc_free_rq_pool(osc_rq_pool);
3261 }
3262
3263 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3264 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3265 MODULE_VERSION(LUSTRE_VERSION_STRING);
3266 MODULE_LICENSE("GPL");
3267
3268 module_init(osc_init);
3269 module_exit(osc_exit);