Whamcloud - gitweb
518a30f0f05f1cbf0cb536800ad2fc9a79126d2a
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <lprocfs_status.h>
36 #include <lustre_debug.h>
37 #include <lustre_dlm.h>
38 #include <lustre_fid.h>
39 #include <lustre_ha.h>
40 #include <uapi/linux/lustre/lustre_ioctl.h>
41 #include <lustre_net.h>
42 #include <lustre_obdo.h>
43 #include <uapi/linux/lustre/lustre_param.h>
44 #include <obd.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48
49 #include "osc_internal.h"
50
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
54
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
58
59 #define osc_grant_args osc_brw_async_args
60
61 struct osc_setattr_args {
62         struct obdo             *sa_oa;
63         obd_enqueue_update_f     sa_upcall;
64         void                    *sa_cookie;
65 };
66
67 struct osc_fsync_args {
68         struct osc_object       *fa_obj;
69         struct obdo             *fa_oa;
70         obd_enqueue_update_f    fa_upcall;
71         void                    *fa_cookie;
72 };
73
74 struct osc_ladvise_args {
75         struct obdo             *la_oa;
76         obd_enqueue_update_f     la_upcall;
77         void                    *la_cookie;
78 };
79
80 static void osc_release_ppga(struct brw_page **ppga, size_t count);
81 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
82                          void *data, int rc);
83
84 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
85 {
86         struct ost_body *body;
87
88         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
89         LASSERT(body);
90
91         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
92 }
93
94 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
95                        struct obdo *oa)
96 {
97         struct ptlrpc_request   *req;
98         struct ost_body         *body;
99         int                      rc;
100
101         ENTRY;
102         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
103         if (req == NULL)
104                 RETURN(-ENOMEM);
105
106         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
107         if (rc) {
108                 ptlrpc_request_free(req);
109                 RETURN(rc);
110         }
111
112         osc_pack_req_body(req, oa);
113
114         ptlrpc_request_set_replen(req);
115
116         rc = ptlrpc_queue_wait(req);
117         if (rc)
118                 GOTO(out, rc);
119
120         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
121         if (body == NULL)
122                 GOTO(out, rc = -EPROTO);
123
124         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
125         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
126
127         oa->o_blksize = cli_brw_size(exp->exp_obd);
128         oa->o_valid |= OBD_MD_FLBLKSZ;
129
130         EXIT;
131 out:
132         ptlrpc_req_finished(req);
133
134         return rc;
135 }
136
137 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
138                        struct obdo *oa)
139 {
140         struct ptlrpc_request   *req;
141         struct ost_body         *body;
142         int                      rc;
143
144         ENTRY;
145         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
146
147         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
148         if (req == NULL)
149                 RETURN(-ENOMEM);
150
151         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
152         if (rc) {
153                 ptlrpc_request_free(req);
154                 RETURN(rc);
155         }
156
157         osc_pack_req_body(req, oa);
158
159         ptlrpc_request_set_replen(req);
160
161         rc = ptlrpc_queue_wait(req);
162         if (rc)
163                 GOTO(out, rc);
164
165         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 GOTO(out, rc = -EPROTO);
168
169         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
170
171         EXIT;
172 out:
173         ptlrpc_req_finished(req);
174
175         RETURN(rc);
176 }
177
178 static int osc_setattr_interpret(const struct lu_env *env,
179                                  struct ptlrpc_request *req,
180                                  struct osc_setattr_args *sa, int rc)
181 {
182         struct ost_body *body;
183         ENTRY;
184
185         if (rc != 0)
186                 GOTO(out, rc);
187
188         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
189         if (body == NULL)
190                 GOTO(out, rc = -EPROTO);
191
192         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
193                              &body->oa);
194 out:
195         rc = sa->sa_upcall(sa->sa_cookie, rc);
196         RETURN(rc);
197 }
198
199 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
200                       obd_enqueue_update_f upcall, void *cookie,
201                       struct ptlrpc_request_set *rqset)
202 {
203         struct ptlrpc_request   *req;
204         struct osc_setattr_args *sa;
205         int                      rc;
206
207         ENTRY;
208
209         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
210         if (req == NULL)
211                 RETURN(-ENOMEM);
212
213         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
214         if (rc) {
215                 ptlrpc_request_free(req);
216                 RETURN(rc);
217         }
218
219         osc_pack_req_body(req, oa);
220
221         ptlrpc_request_set_replen(req);
222
223         /* do mds to ost setattr asynchronously */
224         if (!rqset) {
225                 /* Do not wait for response. */
226                 ptlrpcd_add_req(req);
227         } else {
228                 req->rq_interpret_reply =
229                         (ptlrpc_interpterer_t)osc_setattr_interpret;
230
231                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
232                 sa = ptlrpc_req_async_args(req);
233                 sa->sa_oa = oa;
234                 sa->sa_upcall = upcall;
235                 sa->sa_cookie = cookie;
236
237                 if (rqset == PTLRPCD_SET)
238                         ptlrpcd_add_req(req);
239                 else
240                         ptlrpc_set_add_req(rqset, req);
241         }
242
243         RETURN(0);
244 }
245
246 static int osc_ladvise_interpret(const struct lu_env *env,
247                                  struct ptlrpc_request *req,
248                                  void *arg, int rc)
249 {
250         struct osc_ladvise_args *la = arg;
251         struct ost_body *body;
252         ENTRY;
253
254         if (rc != 0)
255                 GOTO(out, rc);
256
257         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
258         if (body == NULL)
259                 GOTO(out, rc = -EPROTO);
260
261         *la->la_oa = body->oa;
262 out:
263         rc = la->la_upcall(la->la_cookie, rc);
264         RETURN(rc);
265 }
266
267 /**
268  * If rqset is NULL, do not wait for response. Upcall and cookie could also
269  * be NULL in this case
270  */
271 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
272                      struct ladvise_hdr *ladvise_hdr,
273                      obd_enqueue_update_f upcall, void *cookie,
274                      struct ptlrpc_request_set *rqset)
275 {
276         struct ptlrpc_request   *req;
277         struct ost_body         *body;
278         struct osc_ladvise_args *la;
279         int                      rc;
280         struct lu_ladvise       *req_ladvise;
281         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
282         int                      num_advise = ladvise_hdr->lah_count;
283         struct ladvise_hdr      *req_ladvise_hdr;
284         ENTRY;
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
287         if (req == NULL)
288                 RETURN(-ENOMEM);
289
290         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
291                              num_advise * sizeof(*ladvise));
292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
293         if (rc != 0) {
294                 ptlrpc_request_free(req);
295                 RETURN(rc);
296         }
297         req->rq_request_portal = OST_IO_PORTAL;
298         ptlrpc_at_set_req_timeout(req);
299
300         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
301         LASSERT(body);
302         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
303                              oa);
304
305         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
306                                                  &RMF_OST_LADVISE_HDR);
307         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
308
309         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
310         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
311         ptlrpc_request_set_replen(req);
312
313         if (rqset == NULL) {
314                 /* Do not wait for response. */
315                 ptlrpcd_add_req(req);
316                 RETURN(0);
317         }
318
319         req->rq_interpret_reply = osc_ladvise_interpret;
320         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
321         la = ptlrpc_req_async_args(req);
322         la->la_oa = oa;
323         la->la_upcall = upcall;
324         la->la_cookie = cookie;
325
326         if (rqset == PTLRPCD_SET)
327                 ptlrpcd_add_req(req);
328         else
329                 ptlrpc_set_add_req(rqset, req);
330
331         RETURN(0);
332 }
333
334 static int osc_create(const struct lu_env *env, struct obd_export *exp,
335                       struct obdo *oa)
336 {
337         struct ptlrpc_request *req;
338         struct ost_body       *body;
339         int                    rc;
340         ENTRY;
341
342         LASSERT(oa != NULL);
343         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
344         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
345
346         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
347         if (req == NULL)
348                 GOTO(out, rc = -ENOMEM);
349
350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 GOTO(out, rc);
354         }
355
356         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
357         LASSERT(body);
358
359         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
360
361         ptlrpc_request_set_replen(req);
362
363         rc = ptlrpc_queue_wait(req);
364         if (rc)
365                 GOTO(out_req, rc);
366
367         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
368         if (body == NULL)
369                 GOTO(out_req, rc = -EPROTO);
370
371         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
372         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
373
374         oa->o_blksize = cli_brw_size(exp->exp_obd);
375         oa->o_valid |= OBD_MD_FLBLKSZ;
376
377         CDEBUG(D_HA, "transno: %lld\n",
378                lustre_msg_get_transno(req->rq_repmsg));
379 out_req:
380         ptlrpc_req_finished(req);
381 out:
382         RETURN(rc);
383 }
384
385 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
386                    obd_enqueue_update_f upcall, void *cookie)
387 {
388         struct ptlrpc_request *req;
389         struct osc_setattr_args *sa;
390         struct obd_import *imp = class_exp2cliimp(exp);
391         struct ost_body *body;
392         int rc;
393
394         ENTRY;
395
396         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
397         if (req == NULL)
398                 RETURN(-ENOMEM);
399
400         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
401         if (rc < 0) {
402                 ptlrpc_request_free(req);
403                 RETURN(rc);
404         }
405
406         osc_set_io_portal(req);
407
408         ptlrpc_at_set_req_timeout(req);
409
410         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
411
412         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
413
414         ptlrpc_request_set_replen(req);
415
416         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
417         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
418         sa = ptlrpc_req_async_args(req);
419         sa->sa_oa = oa;
420         sa->sa_upcall = upcall;
421         sa->sa_cookie = cookie;
422
423         ptlrpcd_add_req(req);
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(osc_punch_send);
428
429 static int osc_sync_interpret(const struct lu_env *env,
430                               struct ptlrpc_request *req,
431                               void *arg, int rc)
432 {
433         struct osc_fsync_args   *fa = arg;
434         struct ost_body         *body;
435         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
436         unsigned long           valid = 0;
437         struct cl_object        *obj;
438         ENTRY;
439
440         if (rc != 0)
441                 GOTO(out, rc);
442
443         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
444         if (body == NULL) {
445                 CERROR("can't unpack ost_body\n");
446                 GOTO(out, rc = -EPROTO);
447         }
448
449         *fa->fa_oa = body->oa;
450         obj = osc2cl(fa->fa_obj);
451
452         /* Update osc object's blocks attribute */
453         cl_object_attr_lock(obj);
454         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
455                 attr->cat_blocks = body->oa.o_blocks;
456                 valid |= CAT_BLOCKS;
457         }
458
459         if (valid != 0)
460                 cl_object_attr_update(env, obj, attr, valid);
461         cl_object_attr_unlock(obj);
462
463 out:
464         rc = fa->fa_upcall(fa->fa_cookie, rc);
465         RETURN(rc);
466 }
467
468 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
469                   obd_enqueue_update_f upcall, void *cookie,
470                   struct ptlrpc_request_set *rqset)
471 {
472         struct obd_export     *exp = osc_export(obj);
473         struct ptlrpc_request *req;
474         struct ost_body       *body;
475         struct osc_fsync_args *fa;
476         int                    rc;
477         ENTRY;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
480         if (req == NULL)
481                 RETURN(-ENOMEM);
482
483         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
484         if (rc) {
485                 ptlrpc_request_free(req);
486                 RETURN(rc);
487         }
488
489         /* overload the size and blocks fields in the oa with start/end */
490         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
491         LASSERT(body);
492         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
493
494         ptlrpc_request_set_replen(req);
495         req->rq_interpret_reply = osc_sync_interpret;
496
497         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
498         fa = ptlrpc_req_async_args(req);
499         fa->fa_obj = obj;
500         fa->fa_oa = oa;
501         fa->fa_upcall = upcall;
502         fa->fa_cookie = cookie;
503
504         if (rqset == PTLRPCD_SET)
505                 ptlrpcd_add_req(req);
506         else
507                 ptlrpc_set_add_req(rqset, req);
508
509         RETURN (0);
510 }
511
512 /* Find and cancel locally locks matched by @mode in the resource found by
513  * @objid. Found locks are added into @cancel list. Returns the amount of
514  * locks added to @cancels list. */
515 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
516                                    struct list_head *cancels,
517                                    enum ldlm_mode mode, __u64 lock_flags)
518 {
519         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
520         struct ldlm_res_id res_id;
521         struct ldlm_resource *res;
522         int count;
523         ENTRY;
524
525         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
526          * export) but disabled through procfs (flag in NS).
527          *
528          * This distinguishes from a case when ELC is not supported originally,
529          * when we still want to cancel locks in advance and just cancel them
530          * locally, without sending any RPC. */
531         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
532                 RETURN(0);
533
534         ostid_build_res_name(&oa->o_oi, &res_id);
535         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
536         if (IS_ERR(res))
537                 RETURN(0);
538
539         LDLM_RESOURCE_ADDREF(res);
540         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
541                                            lock_flags, 0, NULL);
542         LDLM_RESOURCE_DELREF(res);
543         ldlm_resource_putref(res);
544         RETURN(count);
545 }
546
547 static int osc_destroy_interpret(const struct lu_env *env,
548                                  struct ptlrpc_request *req, void *data,
549                                  int rc)
550 {
551         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
552
553         atomic_dec(&cli->cl_destroy_in_flight);
554         wake_up(&cli->cl_destroy_waitq);
555         return 0;
556 }
557
558 static int osc_can_send_destroy(struct client_obd *cli)
559 {
560         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
561             cli->cl_max_rpcs_in_flight) {
562                 /* The destroy request can be sent */
563                 return 1;
564         }
565         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
566             cli->cl_max_rpcs_in_flight) {
567                 /*
568                  * The counter has been modified between the two atomic
569                  * operations.
570                  */
571                 wake_up(&cli->cl_destroy_waitq);
572         }
573         return 0;
574 }
575
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
601                                0, &cancels, count);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
608         ptlrpc_at_set_req_timeout(req);
609
610         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
611         LASSERT(body);
612         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
613
614         ptlrpc_request_set_replen(req);
615
616         req->rq_interpret_reply = osc_destroy_interpret;
617         if (!osc_can_send_destroy(cli)) {
618                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
619
620                 /*
621                  * Wait until the number of on-going destroy RPCs drops
622                  * under max_rpc_in_flight
623                  */
624                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
625                                             osc_can_send_destroy(cli), &lwi);
626                 if (rc) {
627                         ptlrpc_req_finished(req);
628                         RETURN(rc);
629                 }
630         }
631
632         /* Do not wait for response */
633         ptlrpcd_add_req(req);
634         RETURN(0);
635 }
636
637 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
638                                 long writing_bytes)
639 {
640         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
641
642         LASSERT(!(oa->o_valid & bits));
643
644         oa->o_valid |= bits;
645         spin_lock(&cli->cl_loi_list_lock);
646         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
647                 oa->o_dirty = cli->cl_dirty_grant;
648         else
649                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
650         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
651                      cli->cl_dirty_max_pages)) {
652                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
653                        cli->cl_dirty_pages, cli->cl_dirty_transit,
654                        cli->cl_dirty_max_pages);
655                 oa->o_undirty = 0;
656         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
657                             atomic_long_read(&obd_dirty_transit_pages) >
658                             (long)(obd_max_dirty_pages + 1))) {
659                 /* The atomic_read() allowing the atomic_inc() are
660                  * not covered by a lock thus they may safely race and trip
661                  * this CERROR() unless we add in a small fudge factor (+1). */
662                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
663                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
664                        atomic_long_read(&obd_dirty_transit_pages),
665                        obd_max_dirty_pages);
666                 oa->o_undirty = 0;
667         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
668                             0x7fffffff)) {
669                 CERROR("dirty %lu - dirty_max %lu too big???\n",
670                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
671                 oa->o_undirty = 0;
672         } else {
673                 unsigned long nrpages;
674
675                 nrpages = cli->cl_max_pages_per_rpc;
676                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
677                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
678                 oa->o_undirty = nrpages << PAGE_SHIFT;
679                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
680                                  GRANT_PARAM)) {
681                         int nrextents;
682
683                         /* take extent tax into account when asking for more
684                          * grant space */
685                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
686                                      cli->cl_max_extent_pages;
687                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
688                 }
689         }
690         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
691         oa->o_dropped = cli->cl_lost_grant;
692         cli->cl_lost_grant = 0;
693         spin_unlock(&cli->cl_loi_list_lock);
694         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
695                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
696 }
697
698 void osc_update_next_shrink(struct client_obd *cli)
699 {
700         cli->cl_next_shrink_grant = ktime_get_seconds() +
701                                     cli->cl_grant_shrink_interval;
702
703         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
704                cli->cl_next_shrink_grant);
705 }
706
707 static void __osc_update_grant(struct client_obd *cli, u64 grant)
708 {
709         spin_lock(&cli->cl_loi_list_lock);
710         cli->cl_avail_grant += grant;
711         spin_unlock(&cli->cl_loi_list_lock);
712 }
713
714 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
715 {
716         if (body->oa.o_valid & OBD_MD_FLGRANT) {
717                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
718                 __osc_update_grant(cli, body->oa.o_grant);
719         }
720 }
721
722 static int osc_shrink_grant_interpret(const struct lu_env *env,
723                                       struct ptlrpc_request *req,
724                                       void *aa, int rc)
725 {
726         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
727         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
728         struct ost_body *body;
729
730         if (rc != 0) {
731                 __osc_update_grant(cli, oa->o_grant);
732                 GOTO(out, rc);
733         }
734
735         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         osc_update_grant(cli, body);
738 out:
739         OBDO_FREE(oa);
740         return rc;
741 }
742
743 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
744 {
745         spin_lock(&cli->cl_loi_list_lock);
746         oa->o_grant = cli->cl_avail_grant / 4;
747         cli->cl_avail_grant -= oa->o_grant;
748         spin_unlock(&cli->cl_loi_list_lock);
749         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
750                 oa->o_valid |= OBD_MD_FLFLAGS;
751                 oa->o_flags = 0;
752         }
753         oa->o_flags |= OBD_FL_SHRINK_GRANT;
754         osc_update_next_shrink(cli);
755 }
756
757 /* Shrink the current grant, either from some large amount to enough for a
758  * full set of in-flight RPCs, or if we have already shrunk to that limit
759  * then to enough for a single RPC.  This avoids keeping more grant than
760  * needed, and avoids shrinking the grant piecemeal. */
761 static int osc_shrink_grant(struct client_obd *cli)
762 {
763         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
764                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
765
766         spin_lock(&cli->cl_loi_list_lock);
767         if (cli->cl_avail_grant <= target_bytes)
768                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
769         spin_unlock(&cli->cl_loi_list_lock);
770
771         return osc_shrink_grant_to_target(cli, target_bytes);
772 }
773
774 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
775 {
776         int                     rc = 0;
777         struct ost_body        *body;
778         ENTRY;
779
780         spin_lock(&cli->cl_loi_list_lock);
781         /* Don't shrink if we are already above or below the desired limit
782          * We don't want to shrink below a single RPC, as that will negatively
783          * impact block allocation and long-term performance. */
784         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
785                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
786
787         if (target_bytes >= cli->cl_avail_grant) {
788                 spin_unlock(&cli->cl_loi_list_lock);
789                 RETURN(0);
790         }
791         spin_unlock(&cli->cl_loi_list_lock);
792
793         OBD_ALLOC_PTR(body);
794         if (!body)
795                 RETURN(-ENOMEM);
796
797         osc_announce_cached(cli, &body->oa, 0);
798
799         spin_lock(&cli->cl_loi_list_lock);
800         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
801         cli->cl_avail_grant = target_bytes;
802         spin_unlock(&cli->cl_loi_list_lock);
803         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
804                 body->oa.o_valid |= OBD_MD_FLFLAGS;
805                 body->oa.o_flags = 0;
806         }
807         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
808         osc_update_next_shrink(cli);
809
810         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
811                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
812                                 sizeof(*body), body, NULL);
813         if (rc != 0)
814                 __osc_update_grant(cli, body->oa.o_grant);
815         OBD_FREE_PTR(body);
816         RETURN(rc);
817 }
818
819 static int osc_should_shrink_grant(struct client_obd *client)
820 {
821         time64_t next_shrink = client->cl_next_shrink_grant;
822
823         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
824              OBD_CONNECT_GRANT_SHRINK) == 0)
825                 return 0;
826
827         if (ktime_get_seconds() >= next_shrink - 5) {
828                 /* Get the current RPC size directly, instead of going via:
829                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
830                  * Keep comment here so that it can be found by searching. */
831                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
832
833                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
834                     client->cl_avail_grant > brw_size)
835                         return 1;
836                 else
837                         osc_update_next_shrink(client);
838         }
839         return 0;
840 }
841
842 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
843 {
844         struct client_obd *client;
845
846         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
847                 if (osc_should_shrink_grant(client))
848                         osc_shrink_grant(client);
849         }
850         return 0;
851 }
852
853 static int osc_add_shrink_grant(struct client_obd *client)
854 {
855         int rc;
856
857         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
858                                        TIMEOUT_GRANT,
859                                        osc_grant_shrink_grant_cb, NULL,
860                                        &client->cl_grant_shrink_list);
861         if (rc) {
862                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
863                 return rc;
864         }
865         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
866         osc_update_next_shrink(client);
867         return 0;
868 }
869
870 static int osc_del_shrink_grant(struct client_obd *client)
871 {
872         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
873                                          TIMEOUT_GRANT);
874 }
875
876 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
877 {
878         /*
879          * ocd_grant is the total grant amount we're expect to hold: if we've
880          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
881          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
882          * dirty.
883          *
884          * race is tolerable here: if we're evicted, but imp_state already
885          * left EVICTED state, then cl_dirty_pages must be 0 already.
886          */
887         spin_lock(&cli->cl_loi_list_lock);
888         cli->cl_avail_grant = ocd->ocd_grant;
889         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
890                 cli->cl_avail_grant -= cli->cl_reserved_grant;
891                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
892                         cli->cl_avail_grant -= cli->cl_dirty_grant;
893                 else
894                         cli->cl_avail_grant -=
895                                         cli->cl_dirty_pages << PAGE_SHIFT;
896         }
897
898         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
899                 u64 size;
900                 int chunk_mask;
901
902                 /* overhead for each extent insertion */
903                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
904                 /* determine the appropriate chunk size used by osc_extent. */
905                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
906                                           ocd->ocd_grant_blkbits);
907                 /* max_pages_per_rpc must be chunk aligned */
908                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
909                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
910                                              ~chunk_mask) & chunk_mask;
911                 /* determine maximum extent size, in #pages */
912                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
913                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
914                 if (cli->cl_max_extent_pages == 0)
915                         cli->cl_max_extent_pages = 1;
916         } else {
917                 cli->cl_grant_extent_tax = 0;
918                 cli->cl_chunkbits = PAGE_SHIFT;
919                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
920         }
921         spin_unlock(&cli->cl_loi_list_lock);
922
923         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
924                 "chunk bits: %d cl_max_extent_pages: %d\n",
925                 cli_name(cli),
926                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
927                 cli->cl_max_extent_pages);
928
929         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
930             list_empty(&cli->cl_grant_shrink_list))
931                 osc_add_shrink_grant(cli);
932 }
933 EXPORT_SYMBOL(osc_init_grant);
934
935 /* We assume that the reason this OSC got a short read is because it read
936  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
937  * via the LOV, and it _knows_ it's reading inside the file, it's just that
938  * this stripe never got written at or beyond this stripe offset yet. */
939 static void handle_short_read(int nob_read, size_t page_count,
940                               struct brw_page **pga)
941 {
942         char *ptr;
943         int i = 0;
944
945         /* skip bytes read OK */
946         while (nob_read > 0) {
947                 LASSERT (page_count > 0);
948
949                 if (pga[i]->count > nob_read) {
950                         /* EOF inside this page */
951                         ptr = kmap(pga[i]->pg) +
952                                 (pga[i]->off & ~PAGE_MASK);
953                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
954                         kunmap(pga[i]->pg);
955                         page_count--;
956                         i++;
957                         break;
958                 }
959
960                 nob_read -= pga[i]->count;
961                 page_count--;
962                 i++;
963         }
964
965         /* zero remaining pages */
966         while (page_count-- > 0) {
967                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
968                 memset(ptr, 0, pga[i]->count);
969                 kunmap(pga[i]->pg);
970                 i++;
971         }
972 }
973
974 static int check_write_rcs(struct ptlrpc_request *req,
975                            int requested_nob, int niocount,
976                            size_t page_count, struct brw_page **pga)
977 {
978         int     i;
979         __u32   *remote_rcs;
980
981         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
982                                                   sizeof(*remote_rcs) *
983                                                   niocount);
984         if (remote_rcs == NULL) {
985                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
986                 return(-EPROTO);
987         }
988
989         /* return error if any niobuf was in error */
990         for (i = 0; i < niocount; i++) {
991                 if ((int)remote_rcs[i] < 0)
992                         return(remote_rcs[i]);
993
994                 if (remote_rcs[i] != 0) {
995                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
996                                 i, remote_rcs[i], req);
997                         return(-EPROTO);
998                 }
999         }
1000         if (req->rq_bulk != NULL &&
1001             req->rq_bulk->bd_nob_transferred != requested_nob) {
1002                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1003                        req->rq_bulk->bd_nob_transferred, requested_nob);
1004                 return(-EPROTO);
1005         }
1006
1007         return (0);
1008 }
1009
1010 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1011 {
1012         if (p1->flag != p2->flag) {
1013                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1014                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1015                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1016
1017                 /* warn if we try to combine flags that we don't know to be
1018                  * safe to combine */
1019                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1020                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1021                               "report this at https://jira.hpdd.intel.com/\n",
1022                               p1->flag, p2->flag);
1023                 }
1024                 return 0;
1025         }
1026
1027         return (p1->off + p1->count == p2->off);
1028 }
1029
1030 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1031                              struct brw_page **pga, int opc,
1032                              enum cksum_types cksum_type)
1033 {
1034         u32                             cksum;
1035         int                             i = 0;
1036         struct cfs_crypto_hash_desc     *hdesc;
1037         unsigned int                    bufsize;
1038         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1039
1040         LASSERT(pg_count > 0);
1041
1042         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1043         if (IS_ERR(hdesc)) {
1044                 CERROR("Unable to initialize checksum hash %s\n",
1045                        cfs_crypto_hash_name(cfs_alg));
1046                 return PTR_ERR(hdesc);
1047         }
1048
1049         while (nob > 0 && pg_count > 0) {
1050                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1051
1052                 /* corrupt the data before we compute the checksum, to
1053                  * simulate an OST->client data error */
1054                 if (i == 0 && opc == OST_READ &&
1055                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1056                         unsigned char *ptr = kmap(pga[i]->pg);
1057                         int off = pga[i]->off & ~PAGE_MASK;
1058
1059                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1060                         kunmap(pga[i]->pg);
1061                 }
1062                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1063                                             pga[i]->off & ~PAGE_MASK,
1064                                             count);
1065                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1066                                (int)(pga[i]->off & ~PAGE_MASK));
1067
1068                 nob -= pga[i]->count;
1069                 pg_count--;
1070                 i++;
1071         }
1072
1073         bufsize = sizeof(cksum);
1074         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1075
1076         /* For sending we only compute the wrong checksum instead
1077          * of corrupting the data so it is still correct on a redo */
1078         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1079                 cksum++;
1080
1081         return cksum;
1082 }
1083
1084 static int
1085 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1086                      u32 page_count, struct brw_page **pga,
1087                      struct ptlrpc_request **reqp, int resend)
1088 {
1089         struct ptlrpc_request   *req;
1090         struct ptlrpc_bulk_desc *desc;
1091         struct ost_body         *body;
1092         struct obd_ioobj        *ioobj;
1093         struct niobuf_remote    *niobuf;
1094         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1095         struct osc_brw_async_args *aa;
1096         struct req_capsule      *pill;
1097         struct brw_page *pg_prev;
1098         void *short_io_buf;
1099
1100         ENTRY;
1101         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1102                 RETURN(-ENOMEM); /* Recoverable */
1103         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1104                 RETURN(-EINVAL); /* Fatal */
1105
1106         if ((cmd & OBD_BRW_WRITE) != 0) {
1107                 opc = OST_WRITE;
1108                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1109                                                 osc_rq_pool,
1110                                                 &RQF_OST_BRW_WRITE);
1111         } else {
1112                 opc = OST_READ;
1113                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1114         }
1115         if (req == NULL)
1116                 RETURN(-ENOMEM);
1117
1118         for (niocount = i = 1; i < page_count; i++) {
1119                 if (!can_merge_pages(pga[i - 1], pga[i]))
1120                         niocount++;
1121         }
1122
1123         pill = &req->rq_pill;
1124         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1125                              sizeof(*ioobj));
1126         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1127                              niocount * sizeof(*niobuf));
1128
1129         for (i = 0; i < page_count; i++)
1130                 short_io_size += pga[i]->count;
1131
1132         /* Check if we can do a short io. */
1133         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1134             imp_connect_shortio(cli->cl_import)))
1135                 short_io_size = 0;
1136
1137         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1138                              opc == OST_READ ? 0 : short_io_size);
1139         if (opc == OST_READ)
1140                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1141                                      short_io_size);
1142
1143         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1144         if (rc) {
1145                 ptlrpc_request_free(req);
1146                 RETURN(rc);
1147         }
1148         osc_set_io_portal(req);
1149
1150         ptlrpc_at_set_req_timeout(req);
1151         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1152          * retry logic */
1153         req->rq_no_retry_einprogress = 1;
1154
1155         if (short_io_size != 0) {
1156                 desc = NULL;
1157                 short_io_buf = NULL;
1158                 goto no_bulk;
1159         }
1160
1161         desc = ptlrpc_prep_bulk_imp(req, page_count,
1162                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1163                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1164                         PTLRPC_BULK_PUT_SINK) |
1165                         PTLRPC_BULK_BUF_KIOV,
1166                 OST_BULK_PORTAL,
1167                 &ptlrpc_bulk_kiov_pin_ops);
1168
1169         if (desc == NULL)
1170                 GOTO(out, rc = -ENOMEM);
1171         /* NB request now owns desc and will free it when it gets freed */
1172 no_bulk:
1173         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1174         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1175         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1176         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1177
1178         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1179
1180         obdo_to_ioobj(oa, ioobj);
1181         ioobj->ioo_bufcnt = niocount;
1182         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1183          * that might be send for this request.  The actual number is decided
1184          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1185          * "max - 1" for old client compatibility sending "0", and also so the
1186          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1187         if (desc != NULL)
1188                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1189         else /* short io */
1190                 ioobj_max_brw_set(ioobj, 0);
1191
1192         if (short_io_size != 0) {
1193                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1194                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1195                         body->oa.o_flags = 0;
1196                 }
1197                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1198                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1199                        short_io_size);
1200                 if (opc == OST_WRITE) {
1201                         short_io_buf = req_capsule_client_get(pill,
1202                                                               &RMF_SHORT_IO);
1203                         LASSERT(short_io_buf != NULL);
1204                 }
1205         }
1206
1207         LASSERT(page_count > 0);
1208         pg_prev = pga[0];
1209         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1210                 struct brw_page *pg = pga[i];
1211                 int poff = pg->off & ~PAGE_MASK;
1212
1213                 LASSERT(pg->count > 0);
1214                 /* make sure there is no gap in the middle of page array */
1215                 LASSERTF(page_count == 1 ||
1216                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1217                           ergo(i > 0 && i < page_count - 1,
1218                                poff == 0 && pg->count == PAGE_SIZE)   &&
1219                           ergo(i == page_count - 1, poff == 0)),
1220                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1221                          i, page_count, pg, pg->off, pg->count);
1222                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1223                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1224                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1225                          i, page_count,
1226                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1227                          pg_prev->pg, page_private(pg_prev->pg),
1228                          pg_prev->pg->index, pg_prev->off);
1229                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1230                         (pg->flag & OBD_BRW_SRVLOCK));
1231                 if (short_io_size != 0 && opc == OST_WRITE) {
1232                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1233
1234                         LASSERT(short_io_size >= requested_nob + pg->count);
1235                         memcpy(short_io_buf + requested_nob,
1236                                ptr + poff,
1237                                pg->count);
1238                         ll_kunmap_atomic(ptr, KM_USER0);
1239                 } else if (short_io_size == 0) {
1240                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1241                                                          pg->count);
1242                 }
1243                 requested_nob += pg->count;
1244
1245                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1246                         niobuf--;
1247                         niobuf->rnb_len += pg->count;
1248                 } else {
1249                         niobuf->rnb_offset = pg->off;
1250                         niobuf->rnb_len    = pg->count;
1251                         niobuf->rnb_flags  = pg->flag;
1252                 }
1253                 pg_prev = pg;
1254         }
1255
1256         LASSERTF((void *)(niobuf - niocount) ==
1257                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1258                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1259                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1260
1261         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1262         if (resend) {
1263                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1264                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1265                         body->oa.o_flags = 0;
1266                 }
1267                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1268         }
1269
1270         if (osc_should_shrink_grant(cli))
1271                 osc_shrink_grant_local(cli, &body->oa);
1272
1273         /* size[REQ_REC_OFF] still sizeof (*body) */
1274         if (opc == OST_WRITE) {
1275                 if (cli->cl_checksum &&
1276                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1277                         /* store cl_cksum_type in a local variable since
1278                          * it can be changed via lprocfs */
1279                         enum cksum_types cksum_type = cli->cl_cksum_type;
1280
1281                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1282                                 body->oa.o_flags = 0;
1283
1284                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1285                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1286                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1287                                                              page_count, pga,
1288                                                              OST_WRITE,
1289                                                              cksum_type);
1290                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1291                                body->oa.o_cksum);
1292                         /* save this in 'oa', too, for later checking */
1293                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1294                         oa->o_flags |= cksum_type_pack(cksum_type);
1295                 } else {
1296                         /* clear out the checksum flag, in case this is a
1297                          * resend but cl_checksum is no longer set. b=11238 */
1298                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1299                 }
1300                 oa->o_cksum = body->oa.o_cksum;
1301                 /* 1 RC per niobuf */
1302                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1303                                      sizeof(__u32) * niocount);
1304         } else {
1305                 if (cli->cl_checksum &&
1306                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1307                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1308                                 body->oa.o_flags = 0;
1309                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1310                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1311                 }
1312
1313                 /* Client cksum has been already copied to wire obdo in previous
1314                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1315                  * resent due to cksum error, this will allow Server to
1316                  * check+dump pages on its side */
1317         }
1318         ptlrpc_request_set_replen(req);
1319
1320         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1321         aa = ptlrpc_req_async_args(req);
1322         aa->aa_oa = oa;
1323         aa->aa_requested_nob = requested_nob;
1324         aa->aa_nio_count = niocount;
1325         aa->aa_page_count = page_count;
1326         aa->aa_resends = 0;
1327         aa->aa_ppga = pga;
1328         aa->aa_cli = cli;
1329         INIT_LIST_HEAD(&aa->aa_oaps);
1330
1331         *reqp = req;
1332         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1334                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1335                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1336         RETURN(0);
1337
1338  out:
1339         ptlrpc_req_finished(req);
1340         RETURN(rc);
1341 }
1342
1343 char dbgcksum_file_name[PATH_MAX];
1344
1345 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1346                                 struct brw_page **pga, __u32 server_cksum,
1347                                 __u32 client_cksum)
1348 {
1349         struct file *filp;
1350         int rc, i;
1351         unsigned int len;
1352         char *buf;
1353         mm_segment_t oldfs;
1354
1355         /* will only keep dump of pages on first error for the same range in
1356          * file/fid, not during the resends/retries. */
1357         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1358                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1359                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1360                   libcfs_debug_file_path_arr :
1361                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1362                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1363                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1364                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1365                  pga[0]->off,
1366                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1367                  client_cksum, server_cksum);
1368         filp = filp_open(dbgcksum_file_name,
1369                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1370         if (IS_ERR(filp)) {
1371                 rc = PTR_ERR(filp);
1372                 if (rc == -EEXIST)
1373                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1374                                "checksum error: rc = %d\n", dbgcksum_file_name,
1375                                rc);
1376                 else
1377                         CERROR("%s: can't open to dump pages with checksum "
1378                                "error: rc = %d\n", dbgcksum_file_name, rc);
1379                 return;
1380         }
1381
1382         oldfs = get_fs();
1383         set_fs(KERNEL_DS);
1384         for (i = 0; i < page_count; i++) {
1385                 len = pga[i]->count;
1386                 buf = kmap(pga[i]->pg);
1387                 while (len != 0) {
1388                         rc = vfs_write(filp, (__force const char __user *)buf,
1389                                        len, &filp->f_pos);
1390                         if (rc < 0) {
1391                                 CERROR("%s: wanted to write %u but got %d "
1392                                        "error\n", dbgcksum_file_name, len, rc);
1393                                 break;
1394                         }
1395                         len -= rc;
1396                         buf += rc;
1397                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1398                                dbgcksum_file_name, rc);
1399                 }
1400                 kunmap(pga[i]->pg);
1401         }
1402         set_fs(oldfs);
1403
1404         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1405         if (rc)
1406                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1407         filp_close(filp, NULL);
1408         return;
1409 }
1410
1411 static int
1412 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1413                                 __u32 client_cksum, __u32 server_cksum,
1414                                 struct osc_brw_async_args *aa)
1415 {
1416         __u32 new_cksum;
1417         char *msg;
1418         enum cksum_types cksum_type;
1419
1420         if (server_cksum == client_cksum) {
1421                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1422                 return 0;
1423         }
1424
1425         if (aa->aa_cli->cl_checksum_dump)
1426                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1427                                     server_cksum, client_cksum);
1428
1429         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1430                                        oa->o_flags : 0);
1431         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1432                                       aa->aa_ppga, OST_WRITE, cksum_type);
1433
1434         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1435                 msg = "the server did not use the checksum type specified in "
1436                       "the original request - likely a protocol problem";
1437         else if (new_cksum == server_cksum)
1438                 msg = "changed on the client after we checksummed it - "
1439                       "likely false positive due to mmap IO (bug 11742)";
1440         else if (new_cksum == client_cksum)
1441                 msg = "changed in transit before arrival at OST";
1442         else
1443                 msg = "changed in transit AND doesn't match the original - "
1444                       "likely false positive due to mmap IO (bug 11742)";
1445
1446         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1447                            DFID " object "DOSTID" extent [%llu-%llu], original "
1448                            "client csum %x (type %x), server csum %x (type %x),"
1449                            " client csum now %x\n",
1450                            aa->aa_cli->cl_import->imp_obd->obd_name,
1451                            msg, libcfs_nid2str(peer->nid),
1452                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1453                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1455                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1456                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1457                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1458                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1459                            server_cksum, cksum_type, new_cksum);
1460         return 1;
1461 }
1462
1463 /* Note rc enters this function as number of bytes transferred */
1464 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1465 {
1466         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1467         const struct lnet_process_id *peer =
1468                         &req->rq_import->imp_connection->c_peer;
1469         struct client_obd *cli = aa->aa_cli;
1470         struct ost_body *body;
1471         u32 client_cksum = 0;
1472         ENTRY;
1473
1474         if (rc < 0 && rc != -EDQUOT) {
1475                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1476                 RETURN(rc);
1477         }
1478
1479         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1481         if (body == NULL) {
1482                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1483                 RETURN(-EPROTO);
1484         }
1485
1486         /* set/clear over quota flag for a uid/gid/projid */
1487         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1488             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1489                 unsigned qid[LL_MAXQUOTAS] = {
1490                                          body->oa.o_uid, body->oa.o_gid,
1491                                          body->oa.o_projid };
1492                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1493                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1494                        body->oa.o_valid, body->oa.o_flags);
1495                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1496                                        body->oa.o_flags);
1497         }
1498
1499         osc_update_grant(cli, body);
1500
1501         if (rc < 0)
1502                 RETURN(rc);
1503
1504         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1505                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1506
1507         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1508                 if (rc > 0) {
1509                         CERROR("Unexpected +ve rc %d\n", rc);
1510                         RETURN(-EPROTO);
1511                 }
1512
1513                 if (req->rq_bulk != NULL &&
1514                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1515                         RETURN(-EAGAIN);
1516
1517                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1518                     check_write_checksum(&body->oa, peer, client_cksum,
1519                                          body->oa.o_cksum, aa))
1520                         RETURN(-EAGAIN);
1521
1522                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1523                                      aa->aa_page_count, aa->aa_ppga);
1524                 GOTO(out, rc);
1525         }
1526
1527         /* The rest of this function executes only for OST_READs */
1528
1529         if (req->rq_bulk == NULL) {
1530                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1531                                           RCL_SERVER);
1532                 LASSERT(rc == req->rq_status);
1533         } else {
1534                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1535                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536         }
1537         if (rc < 0)
1538                 GOTO(out, rc = -EAGAIN);
1539
1540         if (rc > aa->aa_requested_nob) {
1541                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1542                        aa->aa_requested_nob);
1543                 RETURN(-EPROTO);
1544         }
1545
1546         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1547                 CERROR ("Unexpected rc %d (%d transferred)\n",
1548                         rc, req->rq_bulk->bd_nob_transferred);
1549                 return (-EPROTO);
1550         }
1551
1552         if (req->rq_bulk == NULL) {
1553                 /* short io */
1554                 int nob, pg_count, i = 0;
1555                 unsigned char *buf;
1556
1557                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1558                 pg_count = aa->aa_page_count;
1559                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1560                                                    rc);
1561                 nob = rc;
1562                 while (nob > 0 && pg_count > 0) {
1563                         unsigned char *ptr;
1564                         int count = aa->aa_ppga[i]->count > nob ?
1565                                     nob : aa->aa_ppga[i]->count;
1566
1567                         CDEBUG(D_CACHE, "page %p count %d\n",
1568                                aa->aa_ppga[i]->pg, count);
1569                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1570                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1571                                count);
1572                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1573
1574                         buf += count;
1575                         nob -= count;
1576                         i++;
1577                         pg_count--;
1578                 }
1579         }
1580
1581         if (rc < aa->aa_requested_nob)
1582                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1583
1584         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1585                 static int cksum_counter;
1586                 u32        server_cksum = body->oa.o_cksum;
1587                 char      *via = "";
1588                 char      *router = "";
1589                 enum cksum_types cksum_type;
1590
1591                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1592                                                body->oa.o_flags : 0);
1593                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1594                                                  aa->aa_ppga, OST_READ,
1595                                                  cksum_type);
1596
1597                 if (req->rq_bulk != NULL &&
1598                     peer->nid != req->rq_bulk->bd_sender) {
1599                         via = " via ";
1600                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1601                 }
1602
1603                 if (server_cksum != client_cksum) {
1604                         struct ost_body *clbody;
1605                         u32 page_count = aa->aa_page_count;
1606
1607                         clbody = req_capsule_client_get(&req->rq_pill,
1608                                                         &RMF_OST_BODY);
1609                         if (cli->cl_checksum_dump)
1610                                 dump_all_bulk_pages(&clbody->oa, page_count,
1611                                                     aa->aa_ppga, server_cksum,
1612                                                     client_cksum);
1613
1614                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1615                                            "%s%s%s inode "DFID" object "DOSTID
1616                                            " extent [%llu-%llu], client %x, "
1617                                            "server %x, cksum_type %x\n",
1618                                            req->rq_import->imp_obd->obd_name,
1619                                            libcfs_nid2str(peer->nid),
1620                                            via, router,
1621                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1622                                                 clbody->oa.o_parent_seq : 0ULL,
1623                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1624                                                 clbody->oa.o_parent_oid : 0,
1625                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1626                                                 clbody->oa.o_parent_ver : 0,
1627                                            POSTID(&body->oa.o_oi),
1628                                            aa->aa_ppga[0]->off,
1629                                            aa->aa_ppga[page_count-1]->off +
1630                                            aa->aa_ppga[page_count-1]->count - 1,
1631                                            client_cksum, server_cksum,
1632                                            cksum_type);
1633                         cksum_counter = 0;
1634                         aa->aa_oa->o_cksum = client_cksum;
1635                         rc = -EAGAIN;
1636                 } else {
1637                         cksum_counter++;
1638                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1639                         rc = 0;
1640                 }
1641         } else if (unlikely(client_cksum)) {
1642                 static int cksum_missed;
1643
1644                 cksum_missed++;
1645                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1646                         CERROR("Checksum %u requested from %s but not sent\n",
1647                                cksum_missed, libcfs_nid2str(peer->nid));
1648         } else {
1649                 rc = 0;
1650         }
1651 out:
1652         if (rc >= 0)
1653                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1654                                      aa->aa_oa, &body->oa);
1655
1656         RETURN(rc);
1657 }
1658
1659 static int osc_brw_redo_request(struct ptlrpc_request *request,
1660                                 struct osc_brw_async_args *aa, int rc)
1661 {
1662         struct ptlrpc_request *new_req;
1663         struct osc_brw_async_args *new_aa;
1664         struct osc_async_page *oap;
1665         ENTRY;
1666
1667         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1668                   "redo for recoverable error %d", rc);
1669
1670         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1671                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1672                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1673                                   aa->aa_ppga, &new_req, 1);
1674         if (rc)
1675                 RETURN(rc);
1676
1677         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1678                 if (oap->oap_request != NULL) {
1679                         LASSERTF(request == oap->oap_request,
1680                                  "request %p != oap_request %p\n",
1681                                  request, oap->oap_request);
1682                         if (oap->oap_interrupted) {
1683                                 ptlrpc_req_finished(new_req);
1684                                 RETURN(-EINTR);
1685                         }
1686                 }
1687         }
1688         /* New request takes over pga and oaps from old request.
1689          * Note that copying a list_head doesn't work, need to move it... */
1690         aa->aa_resends++;
1691         new_req->rq_interpret_reply = request->rq_interpret_reply;
1692         new_req->rq_async_args = request->rq_async_args;
1693         new_req->rq_commit_cb = request->rq_commit_cb;
1694         /* cap resend delay to the current request timeout, this is similar to
1695          * what ptlrpc does (see after_reply()) */
1696         if (aa->aa_resends > new_req->rq_timeout)
1697                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1698         else
1699                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1700         new_req->rq_generation_set = 1;
1701         new_req->rq_import_generation = request->rq_import_generation;
1702
1703         new_aa = ptlrpc_req_async_args(new_req);
1704
1705         INIT_LIST_HEAD(&new_aa->aa_oaps);
1706         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1707         INIT_LIST_HEAD(&new_aa->aa_exts);
1708         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1709         new_aa->aa_resends = aa->aa_resends;
1710
1711         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1712                 if (oap->oap_request) {
1713                         ptlrpc_req_finished(oap->oap_request);
1714                         oap->oap_request = ptlrpc_request_addref(new_req);
1715                 }
1716         }
1717
1718         /* XXX: This code will run into problem if we're going to support
1719          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1720          * and wait for all of them to be finished. We should inherit request
1721          * set from old request. */
1722         ptlrpcd_add_req(new_req);
1723
1724         DEBUG_REQ(D_INFO, new_req, "new request");
1725         RETURN(0);
1726 }
1727
1728 /*
1729  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1730  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1731  * fine for our small page arrays and doesn't require allocation.  its an
1732  * insertion sort that swaps elements that are strides apart, shrinking the
1733  * stride down until its '1' and the array is sorted.
1734  */
1735 static void sort_brw_pages(struct brw_page **array, int num)
1736 {
1737         int stride, i, j;
1738         struct brw_page *tmp;
1739
1740         if (num == 1)
1741                 return;
1742         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1743                 ;
1744
1745         do {
1746                 stride /= 3;
1747                 for (i = stride ; i < num ; i++) {
1748                         tmp = array[i];
1749                         j = i;
1750                         while (j >= stride && array[j - stride]->off > tmp->off) {
1751                                 array[j] = array[j - stride];
1752                                 j -= stride;
1753                         }
1754                         array[j] = tmp;
1755                 }
1756         } while (stride > 1);
1757 }
1758
1759 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1760 {
1761         LASSERT(ppga != NULL);
1762         OBD_FREE(ppga, sizeof(*ppga) * count);
1763 }
1764
1765 static int brw_interpret(const struct lu_env *env,
1766                          struct ptlrpc_request *req, void *data, int rc)
1767 {
1768         struct osc_brw_async_args *aa = data;
1769         struct osc_extent *ext;
1770         struct osc_extent *tmp;
1771         struct client_obd *cli = aa->aa_cli;
1772         unsigned long           transferred = 0;
1773         ENTRY;
1774
1775         rc = osc_brw_fini_request(req, rc);
1776         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1777         /* When server return -EINPROGRESS, client should always retry
1778          * regardless of the number of times the bulk was resent already. */
1779         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1780                 if (req->rq_import_generation !=
1781                     req->rq_import->imp_generation) {
1782                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1783                                ""DOSTID", rc = %d.\n",
1784                                req->rq_import->imp_obd->obd_name,
1785                                POSTID(&aa->aa_oa->o_oi), rc);
1786                 } else if (rc == -EINPROGRESS ||
1787                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1788                         rc = osc_brw_redo_request(req, aa, rc);
1789                 } else {
1790                         CERROR("%s: too many resent retries for object: "
1791                                "%llu:%llu, rc = %d.\n",
1792                                req->rq_import->imp_obd->obd_name,
1793                                POSTID(&aa->aa_oa->o_oi), rc);
1794                 }
1795
1796                 if (rc == 0)
1797                         RETURN(0);
1798                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1799                         rc = -EIO;
1800         }
1801
1802         if (rc == 0) {
1803                 struct obdo *oa = aa->aa_oa;
1804                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1805                 unsigned long valid = 0;
1806                 struct cl_object *obj;
1807                 struct osc_async_page *last;
1808
1809                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1810                 obj = osc2cl(last->oap_obj);
1811
1812                 cl_object_attr_lock(obj);
1813                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1814                         attr->cat_blocks = oa->o_blocks;
1815                         valid |= CAT_BLOCKS;
1816                 }
1817                 if (oa->o_valid & OBD_MD_FLMTIME) {
1818                         attr->cat_mtime = oa->o_mtime;
1819                         valid |= CAT_MTIME;
1820                 }
1821                 if (oa->o_valid & OBD_MD_FLATIME) {
1822                         attr->cat_atime = oa->o_atime;
1823                         valid |= CAT_ATIME;
1824                 }
1825                 if (oa->o_valid & OBD_MD_FLCTIME) {
1826                         attr->cat_ctime = oa->o_ctime;
1827                         valid |= CAT_CTIME;
1828                 }
1829
1830                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1831                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1832                         loff_t last_off = last->oap_count + last->oap_obj_off +
1833                                 last->oap_page_off;
1834
1835                         /* Change file size if this is an out of quota or
1836                          * direct IO write and it extends the file size */
1837                         if (loi->loi_lvb.lvb_size < last_off) {
1838                                 attr->cat_size = last_off;
1839                                 valid |= CAT_SIZE;
1840                         }
1841                         /* Extend KMS if it's not a lockless write */
1842                         if (loi->loi_kms < last_off &&
1843                             oap2osc_page(last)->ops_srvlock == 0) {
1844                                 attr->cat_kms = last_off;
1845                                 valid |= CAT_KMS;
1846                         }
1847                 }
1848
1849                 if (valid != 0)
1850                         cl_object_attr_update(env, obj, attr, valid);
1851                 cl_object_attr_unlock(obj);
1852         }
1853         OBDO_FREE(aa->aa_oa);
1854
1855         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1856                 osc_inc_unstable_pages(req);
1857
1858         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1859                 list_del_init(&ext->oe_link);
1860                 osc_extent_finish(env, ext, 1,
1861                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
1862         }
1863         LASSERT(list_empty(&aa->aa_exts));
1864         LASSERT(list_empty(&aa->aa_oaps));
1865
1866         transferred = (req->rq_bulk == NULL ? /* short io */
1867                        aa->aa_requested_nob :
1868                        req->rq_bulk->bd_nob_transferred);
1869
1870         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1871         ptlrpc_lprocfs_brw(req, transferred);
1872
1873         spin_lock(&cli->cl_loi_list_lock);
1874         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1875          * is called so we know whether to go to sync BRWs or wait for more
1876          * RPCs to complete */
1877         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1878                 cli->cl_w_in_flight--;
1879         else
1880                 cli->cl_r_in_flight--;
1881         osc_wake_cache_waiters(cli);
1882         spin_unlock(&cli->cl_loi_list_lock);
1883
1884         osc_io_unplug(env, cli, NULL);
1885         RETURN(rc);
1886 }
1887
1888 static void brw_commit(struct ptlrpc_request *req)
1889 {
1890         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1891          * this called via the rq_commit_cb, I need to ensure
1892          * osc_dec_unstable_pages is still called. Otherwise unstable
1893          * pages may be leaked. */
1894         spin_lock(&req->rq_lock);
1895         if (likely(req->rq_unstable)) {
1896                 req->rq_unstable = 0;
1897                 spin_unlock(&req->rq_lock);
1898
1899                 osc_dec_unstable_pages(req);
1900         } else {
1901                 req->rq_committed = 1;
1902                 spin_unlock(&req->rq_lock);
1903         }
1904 }
1905
1906 /**
1907  * Build an RPC by the list of extent @ext_list. The caller must ensure
1908  * that the total pages in this list are NOT over max pages per RPC.
1909  * Extents in the list must be in OES_RPC state.
1910  */
1911 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1912                   struct list_head *ext_list, int cmd)
1913 {
1914         struct ptlrpc_request           *req = NULL;
1915         struct osc_extent               *ext;
1916         struct brw_page                 **pga = NULL;
1917         struct osc_brw_async_args       *aa = NULL;
1918         struct obdo                     *oa = NULL;
1919         struct osc_async_page           *oap;
1920         struct osc_object               *obj = NULL;
1921         struct cl_req_attr              *crattr = NULL;
1922         loff_t                          starting_offset = OBD_OBJECT_EOF;
1923         loff_t                          ending_offset = 0;
1924         int                             mpflag = 0;
1925         int                             mem_tight = 0;
1926         int                             page_count = 0;
1927         bool                            soft_sync = false;
1928         bool                            interrupted = false;
1929         bool                            ndelay = false;
1930         int                             i;
1931         int                             grant = 0;
1932         int                             rc;
1933         __u32                           layout_version = 0;
1934         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1935         struct ost_body                 *body;
1936         ENTRY;
1937         LASSERT(!list_empty(ext_list));
1938
1939         /* add pages into rpc_list to build BRW rpc */
1940         list_for_each_entry(ext, ext_list, oe_link) {
1941                 LASSERT(ext->oe_state == OES_RPC);
1942                 mem_tight |= ext->oe_memalloc;
1943                 grant += ext->oe_grants;
1944                 page_count += ext->oe_nr_pages;
1945                 layout_version = MAX(layout_version, ext->oe_layout_version);
1946                 if (obj == NULL)
1947                         obj = ext->oe_obj;
1948         }
1949
1950         soft_sync = osc_over_unstable_soft_limit(cli);
1951         if (mem_tight)
1952                 mpflag = cfs_memory_pressure_get_and_set();
1953
1954         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1955         if (pga == NULL)
1956                 GOTO(out, rc = -ENOMEM);
1957
1958         OBDO_ALLOC(oa);
1959         if (oa == NULL)
1960                 GOTO(out, rc = -ENOMEM);
1961
1962         i = 0;
1963         list_for_each_entry(ext, ext_list, oe_link) {
1964                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1965                         if (mem_tight)
1966                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1967                         if (soft_sync)
1968                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1969                         pga[i] = &oap->oap_brw_page;
1970                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1971                         i++;
1972
1973                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1974                         if (starting_offset == OBD_OBJECT_EOF ||
1975                             starting_offset > oap->oap_obj_off)
1976                                 starting_offset = oap->oap_obj_off;
1977                         else
1978                                 LASSERT(oap->oap_page_off == 0);
1979                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1980                                 ending_offset = oap->oap_obj_off +
1981                                                 oap->oap_count;
1982                         else
1983                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1984                                         PAGE_SIZE);
1985                         if (oap->oap_interrupted)
1986                                 interrupted = true;
1987                 }
1988                 if (ext->oe_ndelay)
1989                         ndelay = true;
1990         }
1991
1992         /* first page in the list */
1993         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1994
1995         crattr = &osc_env_info(env)->oti_req_attr;
1996         memset(crattr, 0, sizeof(*crattr));
1997         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1998         crattr->cra_flags = ~0ULL;
1999         crattr->cra_page = oap2cl_page(oap);
2000         crattr->cra_oa = oa;
2001         cl_req_attr_set(env, osc2cl(obj), crattr);
2002
2003         if (cmd == OBD_BRW_WRITE) {
2004                 oa->o_grant_used = grant;
2005                 if (layout_version > 0) {
2006                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2007                                PFID(&oa->o_oi.oi_fid), layout_version);
2008
2009                         oa->o_layout_version = layout_version;
2010                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2011                 }
2012         }
2013
2014         sort_brw_pages(pga, page_count);
2015         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2016         if (rc != 0) {
2017                 CERROR("prep_req failed: %d\n", rc);
2018                 GOTO(out, rc);
2019         }
2020
2021         req->rq_commit_cb = brw_commit;
2022         req->rq_interpret_reply = brw_interpret;
2023         req->rq_memalloc = mem_tight != 0;
2024         oap->oap_request = ptlrpc_request_addref(req);
2025         if (interrupted && !req->rq_intr)
2026                 ptlrpc_mark_interrupted(req);
2027         if (ndelay) {
2028                 req->rq_no_resend = req->rq_no_delay = 1;
2029                 /* probably set a shorter timeout value.
2030                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2031                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2032         }
2033
2034         /* Need to update the timestamps after the request is built in case
2035          * we race with setattr (locally or in queue at OST).  If OST gets
2036          * later setattr before earlier BRW (as determined by the request xid),
2037          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2038          * way to do this in a single call.  bug 10150 */
2039         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2040         crattr->cra_oa = &body->oa;
2041         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2042         cl_req_attr_set(env, osc2cl(obj), crattr);
2043         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2044
2045         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2046         aa = ptlrpc_req_async_args(req);
2047         INIT_LIST_HEAD(&aa->aa_oaps);
2048         list_splice_init(&rpc_list, &aa->aa_oaps);
2049         INIT_LIST_HEAD(&aa->aa_exts);
2050         list_splice_init(ext_list, &aa->aa_exts);
2051
2052         spin_lock(&cli->cl_loi_list_lock);
2053         starting_offset >>= PAGE_SHIFT;
2054         if (cmd == OBD_BRW_READ) {
2055                 cli->cl_r_in_flight++;
2056                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2057                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2058                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2059                                       starting_offset + 1);
2060         } else {
2061                 cli->cl_w_in_flight++;
2062                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2063                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2064                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2065                                       starting_offset + 1);
2066         }
2067         spin_unlock(&cli->cl_loi_list_lock);
2068
2069         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2070                   page_count, aa, cli->cl_r_in_flight,
2071                   cli->cl_w_in_flight);
2072         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2073
2074         ptlrpcd_add_req(req);
2075         rc = 0;
2076         EXIT;
2077
2078 out:
2079         if (mem_tight != 0)
2080                 cfs_memory_pressure_restore(mpflag);
2081
2082         if (rc != 0) {
2083                 LASSERT(req == NULL);
2084
2085                 if (oa)
2086                         OBDO_FREE(oa);
2087                 if (pga)
2088                         OBD_FREE(pga, sizeof(*pga) * page_count);
2089                 /* this should happen rarely and is pretty bad, it makes the
2090                  * pending list not follow the dirty order */
2091                 while (!list_empty(ext_list)) {
2092                         ext = list_entry(ext_list->next, struct osc_extent,
2093                                          oe_link);
2094                         list_del_init(&ext->oe_link);
2095                         osc_extent_finish(env, ext, 0, rc);
2096                 }
2097         }
2098         RETURN(rc);
2099 }
2100
2101 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2102 {
2103         int set = 0;
2104
2105         LASSERT(lock != NULL);
2106
2107         lock_res_and_lock(lock);
2108
2109         if (lock->l_ast_data == NULL)
2110                 lock->l_ast_data = data;
2111         if (lock->l_ast_data == data)
2112                 set = 1;
2113
2114         unlock_res_and_lock(lock);
2115
2116         return set;
2117 }
2118
2119 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2120                      void *cookie, struct lustre_handle *lockh,
2121                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2122                      int errcode)
2123 {
2124         bool intent = *flags & LDLM_FL_HAS_INTENT;
2125         int rc;
2126         ENTRY;
2127
2128         /* The request was created before ldlm_cli_enqueue call. */
2129         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2130                 struct ldlm_reply *rep;
2131
2132                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2133                 LASSERT(rep != NULL);
2134
2135                 rep->lock_policy_res1 =
2136                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2137                 if (rep->lock_policy_res1)
2138                         errcode = rep->lock_policy_res1;
2139                 if (!speculative)
2140                         *flags |= LDLM_FL_LVB_READY;
2141         } else if (errcode == ELDLM_OK) {
2142                 *flags |= LDLM_FL_LVB_READY;
2143         }
2144
2145         /* Call the update callback. */
2146         rc = (*upcall)(cookie, lockh, errcode);
2147
2148         /* release the reference taken in ldlm_cli_enqueue() */
2149         if (errcode == ELDLM_LOCK_MATCHED)
2150                 errcode = ELDLM_OK;
2151         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2152                 ldlm_lock_decref(lockh, mode);
2153
2154         RETURN(rc);
2155 }
2156
2157 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2158                           struct osc_enqueue_args *aa, int rc)
2159 {
2160         struct ldlm_lock *lock;
2161         struct lustre_handle *lockh = &aa->oa_lockh;
2162         enum ldlm_mode mode = aa->oa_mode;
2163         struct ost_lvb *lvb = aa->oa_lvb;
2164         __u32 lvb_len = sizeof(*lvb);
2165         __u64 flags = 0;
2166
2167         ENTRY;
2168
2169         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2170          * be valid. */
2171         lock = ldlm_handle2lock(lockh);
2172         LASSERTF(lock != NULL,
2173                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2174                  lockh->cookie, req, aa);
2175
2176         /* Take an additional reference so that a blocking AST that
2177          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2178          * to arrive after an upcall has been executed by
2179          * osc_enqueue_fini(). */
2180         ldlm_lock_addref(lockh, mode);
2181
2182         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2183         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2184
2185         /* Let CP AST to grant the lock first. */
2186         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2187
2188         if (aa->oa_speculative) {
2189                 LASSERT(aa->oa_lvb == NULL);
2190                 LASSERT(aa->oa_flags == NULL);
2191                 aa->oa_flags = &flags;
2192         }
2193
2194         /* Complete obtaining the lock procedure. */
2195         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2196                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2197                                    lockh, rc);
2198         /* Complete osc stuff. */
2199         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2200                               aa->oa_flags, aa->oa_speculative, rc);
2201
2202         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2203
2204         ldlm_lock_decref(lockh, mode);
2205         LDLM_LOCK_PUT(lock);
2206         RETURN(rc);
2207 }
2208
2209 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2210
2211 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2212  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2213  * other synchronous requests, however keeping some locks and trying to obtain
2214  * others may take a considerable amount of time in a case of ost failure; and
2215  * when other sync requests do not get released lock from a client, the client
2216  * is evicted from the cluster -- such scenarious make the life difficult, so
2217  * release locks just after they are obtained. */
2218 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2219                      __u64 *flags, union ldlm_policy_data *policy,
2220                      struct ost_lvb *lvb, int kms_valid,
2221                      osc_enqueue_upcall_f upcall, void *cookie,
2222                      struct ldlm_enqueue_info *einfo,
2223                      struct ptlrpc_request_set *rqset, int async,
2224                      bool speculative)
2225 {
2226         struct obd_device *obd = exp->exp_obd;
2227         struct lustre_handle lockh = { 0 };
2228         struct ptlrpc_request *req = NULL;
2229         int intent = *flags & LDLM_FL_HAS_INTENT;
2230         __u64 match_flags = *flags;
2231         enum ldlm_mode mode;
2232         int rc;
2233         ENTRY;
2234
2235         /* Filesystem lock extents are extended to page boundaries so that
2236          * dealing with the page cache is a little smoother.  */
2237         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2238         policy->l_extent.end |= ~PAGE_MASK;
2239
2240         /*
2241          * kms is not valid when either object is completely fresh (so that no
2242          * locks are cached), or object was evicted. In the latter case cached
2243          * lock cannot be used, because it would prime inode state with
2244          * potentially stale LVB.
2245          */
2246         if (!kms_valid)
2247                 goto no_match;
2248
2249         /* Next, search for already existing extent locks that will cover us */
2250         /* If we're trying to read, we also search for an existing PW lock.  The
2251          * VFS and page cache already protect us locally, so lots of readers/
2252          * writers can share a single PW lock.
2253          *
2254          * There are problems with conversion deadlocks, so instead of
2255          * converting a read lock to a write lock, we'll just enqueue a new
2256          * one.
2257          *
2258          * At some point we should cancel the read lock instead of making them
2259          * send us a blocking callback, but there are problems with canceling
2260          * locks out from other users right now, too. */
2261         mode = einfo->ei_mode;
2262         if (einfo->ei_mode == LCK_PR)
2263                 mode |= LCK_PW;
2264         /* Normal lock requests must wait for the LVB to be ready before
2265          * matching a lock; speculative lock requests do not need to,
2266          * because they will not actually use the lock. */
2267         if (!speculative)
2268                 match_flags |= LDLM_FL_LVB_READY;
2269         if (intent != 0)
2270                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2271         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2272                                einfo->ei_type, policy, mode, &lockh, 0);
2273         if (mode) {
2274                 struct ldlm_lock *matched;
2275
2276                 if (*flags & LDLM_FL_TEST_LOCK)
2277                         RETURN(ELDLM_OK);
2278
2279                 matched = ldlm_handle2lock(&lockh);
2280                 if (speculative) {
2281                         /* This DLM lock request is speculative, and does not
2282                          * have an associated IO request. Therefore if there
2283                          * is already a DLM lock, it wll just inform the
2284                          * caller to cancel the request for this stripe.*/
2285                         lock_res_and_lock(matched);
2286                         if (ldlm_extent_equal(&policy->l_extent,
2287                             &matched->l_policy_data.l_extent))
2288                                 rc = -EEXIST;
2289                         else
2290                                 rc = -ECANCELED;
2291                         unlock_res_and_lock(matched);
2292
2293                         ldlm_lock_decref(&lockh, mode);
2294                         LDLM_LOCK_PUT(matched);
2295                         RETURN(rc);
2296                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2297                         *flags |= LDLM_FL_LVB_READY;
2298
2299                         /* We already have a lock, and it's referenced. */
2300                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2301
2302                         ldlm_lock_decref(&lockh, mode);
2303                         LDLM_LOCK_PUT(matched);
2304                         RETURN(ELDLM_OK);
2305                 } else {
2306                         ldlm_lock_decref(&lockh, mode);
2307                         LDLM_LOCK_PUT(matched);
2308                 }
2309         }
2310
2311 no_match:
2312         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2313                 RETURN(-ENOLCK);
2314
2315         if (intent) {
2316                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2317                                            &RQF_LDLM_ENQUEUE_LVB);
2318                 if (req == NULL)
2319                         RETURN(-ENOMEM);
2320
2321                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2322                 if (rc) {
2323                         ptlrpc_request_free(req);
2324                         RETURN(rc);
2325                 }
2326
2327                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2328                                      sizeof *lvb);
2329                 ptlrpc_request_set_replen(req);
2330         }
2331
2332         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2333         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2334
2335         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2336                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2337         if (async) {
2338                 if (!rc) {
2339                         struct osc_enqueue_args *aa;
2340                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2341                         aa = ptlrpc_req_async_args(req);
2342                         aa->oa_exp         = exp;
2343                         aa->oa_mode        = einfo->ei_mode;
2344                         aa->oa_type        = einfo->ei_type;
2345                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2346                         aa->oa_upcall      = upcall;
2347                         aa->oa_cookie      = cookie;
2348                         aa->oa_speculative = speculative;
2349                         if (!speculative) {
2350                                 aa->oa_flags  = flags;
2351                                 aa->oa_lvb    = lvb;
2352                         } else {
2353                                 /* speculative locks are essentially to enqueue
2354                                  * a DLM lock  in advance, so we don't care
2355                                  * about the result of the enqueue. */
2356                                 aa->oa_lvb    = NULL;
2357                                 aa->oa_flags  = NULL;
2358                         }
2359
2360                         req->rq_interpret_reply =
2361                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2362                         if (rqset == PTLRPCD_SET)
2363                                 ptlrpcd_add_req(req);
2364                         else
2365                                 ptlrpc_set_add_req(rqset, req);
2366                 } else if (intent) {
2367                         ptlrpc_req_finished(req);
2368                 }
2369                 RETURN(rc);
2370         }
2371
2372         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2373                               flags, speculative, rc);
2374         if (intent)
2375                 ptlrpc_req_finished(req);
2376
2377         RETURN(rc);
2378 }
2379
2380 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2381                    enum ldlm_type type, union ldlm_policy_data *policy,
2382                    enum ldlm_mode mode, __u64 *flags, void *data,
2383                    struct lustre_handle *lockh, int unref)
2384 {
2385         struct obd_device *obd = exp->exp_obd;
2386         __u64 lflags = *flags;
2387         enum ldlm_mode rc;
2388         ENTRY;
2389
2390         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2391                 RETURN(-EIO);
2392
2393         /* Filesystem lock extents are extended to page boundaries so that
2394          * dealing with the page cache is a little smoother */
2395         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2396         policy->l_extent.end |= ~PAGE_MASK;
2397
2398         /* Next, search for already existing extent locks that will cover us */
2399         /* If we're trying to read, we also search for an existing PW lock.  The
2400          * VFS and page cache already protect us locally, so lots of readers/
2401          * writers can share a single PW lock. */
2402         rc = mode;
2403         if (mode == LCK_PR)
2404                 rc |= LCK_PW;
2405         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2406                              res_id, type, policy, rc, lockh, unref);
2407         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2408                 RETURN(rc);
2409
2410         if (data != NULL) {
2411                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2412
2413                 LASSERT(lock != NULL);
2414                 if (!osc_set_lock_data(lock, data)) {
2415                         ldlm_lock_decref(lockh, rc);
2416                         rc = 0;
2417                 }
2418                 LDLM_LOCK_PUT(lock);
2419         }
2420         RETURN(rc);
2421 }
2422
2423 static int osc_statfs_interpret(const struct lu_env *env,
2424                                 struct ptlrpc_request *req,
2425                                 struct osc_async_args *aa, int rc)
2426 {
2427         struct obd_statfs *msfs;
2428         ENTRY;
2429
2430         if (rc == -EBADR)
2431                 /* The request has in fact never been sent
2432                  * due to issues at a higher level (LOV).
2433                  * Exit immediately since the caller is
2434                  * aware of the problem and takes care
2435                  * of the clean up */
2436                  RETURN(rc);
2437
2438         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2439             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2440                 GOTO(out, rc = 0);
2441
2442         if (rc != 0)
2443                 GOTO(out, rc);
2444
2445         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2446         if (msfs == NULL) {
2447                 GOTO(out, rc = -EPROTO);
2448         }
2449
2450         *aa->aa_oi->oi_osfs = *msfs;
2451 out:
2452         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2453         RETURN(rc);
2454 }
2455
2456 static int osc_statfs_async(struct obd_export *exp,
2457                             struct obd_info *oinfo, __u64 max_age,
2458                             struct ptlrpc_request_set *rqset)
2459 {
2460         struct obd_device     *obd = class_exp2obd(exp);
2461         struct ptlrpc_request *req;
2462         struct osc_async_args *aa;
2463         int                    rc;
2464         ENTRY;
2465
2466         /* We could possibly pass max_age in the request (as an absolute
2467          * timestamp or a "seconds.usec ago") so the target can avoid doing
2468          * extra calls into the filesystem if that isn't necessary (e.g.
2469          * during mount that would help a bit).  Having relative timestamps
2470          * is not so great if request processing is slow, while absolute
2471          * timestamps are not ideal because they need time synchronization. */
2472         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2473         if (req == NULL)
2474                 RETURN(-ENOMEM);
2475
2476         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2477         if (rc) {
2478                 ptlrpc_request_free(req);
2479                 RETURN(rc);
2480         }
2481         ptlrpc_request_set_replen(req);
2482         req->rq_request_portal = OST_CREATE_PORTAL;
2483         ptlrpc_at_set_req_timeout(req);
2484
2485         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2486                 /* procfs requests not want stat in wait for avoid deadlock */
2487                 req->rq_no_resend = 1;
2488                 req->rq_no_delay = 1;
2489         }
2490
2491         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2492         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2493         aa = ptlrpc_req_async_args(req);
2494         aa->aa_oi = oinfo;
2495
2496         ptlrpc_set_add_req(rqset, req);
2497         RETURN(0);
2498 }
2499
2500 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2501                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2502 {
2503         struct obd_device     *obd = class_exp2obd(exp);
2504         struct obd_statfs     *msfs;
2505         struct ptlrpc_request *req;
2506         struct obd_import     *imp = NULL;
2507         int rc;
2508         ENTRY;
2509
2510         /*Since the request might also come from lprocfs, so we need
2511          *sync this with client_disconnect_export Bug15684*/
2512         down_read(&obd->u.cli.cl_sem);
2513         if (obd->u.cli.cl_import)
2514                 imp = class_import_get(obd->u.cli.cl_import);
2515         up_read(&obd->u.cli.cl_sem);
2516         if (!imp)
2517                 RETURN(-ENODEV);
2518
2519         /* We could possibly pass max_age in the request (as an absolute
2520          * timestamp or a "seconds.usec ago") so the target can avoid doing
2521          * extra calls into the filesystem if that isn't necessary (e.g.
2522          * during mount that would help a bit).  Having relative timestamps
2523          * is not so great if request processing is slow, while absolute
2524          * timestamps are not ideal because they need time synchronization. */
2525         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2526
2527         class_import_put(imp);
2528
2529         if (req == NULL)
2530                 RETURN(-ENOMEM);
2531
2532         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2533         if (rc) {
2534                 ptlrpc_request_free(req);
2535                 RETURN(rc);
2536         }
2537         ptlrpc_request_set_replen(req);
2538         req->rq_request_portal = OST_CREATE_PORTAL;
2539         ptlrpc_at_set_req_timeout(req);
2540
2541         if (flags & OBD_STATFS_NODELAY) {
2542                 /* procfs requests not want stat in wait for avoid deadlock */
2543                 req->rq_no_resend = 1;
2544                 req->rq_no_delay = 1;
2545         }
2546
2547         rc = ptlrpc_queue_wait(req);
2548         if (rc)
2549                 GOTO(out, rc);
2550
2551         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2552         if (msfs == NULL) {
2553                 GOTO(out, rc = -EPROTO);
2554         }
2555
2556         *osfs = *msfs;
2557
2558         EXIT;
2559  out:
2560         ptlrpc_req_finished(req);
2561         return rc;
2562 }
2563
2564 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2565                          void *karg, void __user *uarg)
2566 {
2567         struct obd_device *obd = exp->exp_obd;
2568         struct obd_ioctl_data *data = karg;
2569         int err = 0;
2570         ENTRY;
2571
2572         if (!try_module_get(THIS_MODULE)) {
2573                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2574                        module_name(THIS_MODULE));
2575                 return -EINVAL;
2576         }
2577         switch (cmd) {
2578         case OBD_IOC_CLIENT_RECOVER:
2579                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2580                                             data->ioc_inlbuf1, 0);
2581                 if (err > 0)
2582                         err = 0;
2583                 GOTO(out, err);
2584         case IOC_OSC_SET_ACTIVE:
2585                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2586                                                data->ioc_offset);
2587                 GOTO(out, err);
2588         case OBD_IOC_PING_TARGET:
2589                 err = ptlrpc_obd_ping(obd);
2590                 GOTO(out, err);
2591         default:
2592                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2593                        cmd, current_comm());
2594                 GOTO(out, err = -ENOTTY);
2595         }
2596 out:
2597         module_put(THIS_MODULE);
2598         return err;
2599 }
2600
2601 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2602                        u32 keylen, void *key, u32 vallen, void *val,
2603                        struct ptlrpc_request_set *set)
2604 {
2605         struct ptlrpc_request *req;
2606         struct obd_device     *obd = exp->exp_obd;
2607         struct obd_import     *imp = class_exp2cliimp(exp);
2608         char                  *tmp;
2609         int                    rc;
2610         ENTRY;
2611
2612         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2613
2614         if (KEY_IS(KEY_CHECKSUM)) {
2615                 if (vallen != sizeof(int))
2616                         RETURN(-EINVAL);
2617                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2618                 RETURN(0);
2619         }
2620
2621         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2622                 sptlrpc_conf_client_adapt(obd);
2623                 RETURN(0);
2624         }
2625
2626         if (KEY_IS(KEY_FLUSH_CTX)) {
2627                 sptlrpc_import_flush_my_ctx(imp);
2628                 RETURN(0);
2629         }
2630
2631         if (KEY_IS(KEY_CACHE_SET)) {
2632                 struct client_obd *cli = &obd->u.cli;
2633
2634                 LASSERT(cli->cl_cache == NULL); /* only once */
2635                 cli->cl_cache = (struct cl_client_cache *)val;
2636                 cl_cache_incref(cli->cl_cache);
2637                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2638
2639                 /* add this osc into entity list */
2640                 LASSERT(list_empty(&cli->cl_lru_osc));
2641                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2642                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2643                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2644
2645                 RETURN(0);
2646         }
2647
2648         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2649                 struct client_obd *cli = &obd->u.cli;
2650                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2651                 long target = *(long *)val;
2652
2653                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2654                 *(long *)val -= nr;
2655                 RETURN(0);
2656         }
2657
2658         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2659                 RETURN(-EINVAL);
2660
2661         /* We pass all other commands directly to OST. Since nobody calls osc
2662            methods directly and everybody is supposed to go through LOV, we
2663            assume lov checked invalid values for us.
2664            The only recognised values so far are evict_by_nid and mds_conn.
2665            Even if something bad goes through, we'd get a -EINVAL from OST
2666            anyway. */
2667
2668         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2669                                                 &RQF_OST_SET_GRANT_INFO :
2670                                                 &RQF_OBD_SET_INFO);
2671         if (req == NULL)
2672                 RETURN(-ENOMEM);
2673
2674         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2675                              RCL_CLIENT, keylen);
2676         if (!KEY_IS(KEY_GRANT_SHRINK))
2677                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2678                                      RCL_CLIENT, vallen);
2679         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2680         if (rc) {
2681                 ptlrpc_request_free(req);
2682                 RETURN(rc);
2683         }
2684
2685         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2686         memcpy(tmp, key, keylen);
2687         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2688                                                         &RMF_OST_BODY :
2689                                                         &RMF_SETINFO_VAL);
2690         memcpy(tmp, val, vallen);
2691
2692         if (KEY_IS(KEY_GRANT_SHRINK)) {
2693                 struct osc_grant_args *aa;
2694                 struct obdo *oa;
2695
2696                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2697                 aa = ptlrpc_req_async_args(req);
2698                 OBDO_ALLOC(oa);
2699                 if (!oa) {
2700                         ptlrpc_req_finished(req);
2701                         RETURN(-ENOMEM);
2702                 }
2703                 *oa = ((struct ost_body *)val)->oa;
2704                 aa->aa_oa = oa;
2705                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2706         }
2707
2708         ptlrpc_request_set_replen(req);
2709         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2710                 LASSERT(set != NULL);
2711                 ptlrpc_set_add_req(set, req);
2712                 ptlrpc_check_set(NULL, set);
2713         } else {
2714                 ptlrpcd_add_req(req);
2715         }
2716
2717         RETURN(0);
2718 }
2719 EXPORT_SYMBOL(osc_set_info_async);
2720
2721 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2722                   struct obd_device *obd, struct obd_uuid *cluuid,
2723                   struct obd_connect_data *data, void *localdata)
2724 {
2725         struct client_obd *cli = &obd->u.cli;
2726
2727         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2728                 long lost_grant;
2729                 long grant;
2730
2731                 spin_lock(&cli->cl_loi_list_lock);
2732                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2733                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2734                         grant += cli->cl_dirty_grant;
2735                 else
2736                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2737                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2738                 lost_grant = cli->cl_lost_grant;
2739                 cli->cl_lost_grant = 0;
2740                 spin_unlock(&cli->cl_loi_list_lock);
2741
2742                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2743                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2744                        data->ocd_version, data->ocd_grant, lost_grant);
2745         }
2746
2747         RETURN(0);
2748 }
2749 EXPORT_SYMBOL(osc_reconnect);
2750
2751 int osc_disconnect(struct obd_export *exp)
2752 {
2753         struct obd_device *obd = class_exp2obd(exp);
2754         int rc;
2755
2756         rc = client_disconnect_export(exp);
2757         /**
2758          * Initially we put del_shrink_grant before disconnect_export, but it
2759          * causes the following problem if setup (connect) and cleanup
2760          * (disconnect) are tangled together.
2761          *      connect p1                     disconnect p2
2762          *   ptlrpc_connect_import
2763          *     ...............               class_manual_cleanup
2764          *                                     osc_disconnect
2765          *                                     del_shrink_grant
2766          *   ptlrpc_connect_interrupt
2767          *     init_grant_shrink
2768          *   add this client to shrink list
2769          *                                      cleanup_osc
2770          * Bang! pinger trigger the shrink.
2771          * So the osc should be disconnected from the shrink list, after we
2772          * are sure the import has been destroyed. BUG18662
2773          */
2774         if (obd->u.cli.cl_import == NULL)
2775                 osc_del_shrink_grant(&obd->u.cli);
2776         return rc;
2777 }
2778 EXPORT_SYMBOL(osc_disconnect);
2779
2780 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2781                                  struct hlist_node *hnode, void *arg)
2782 {
2783         struct lu_env *env = arg;
2784         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2785         struct ldlm_lock *lock;
2786         struct osc_object *osc = NULL;
2787         ENTRY;
2788
2789         lock_res(res);
2790         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2791                 if (lock->l_ast_data != NULL && osc == NULL) {
2792                         osc = lock->l_ast_data;
2793                         cl_object_get(osc2cl(osc));
2794                 }
2795
2796                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2797                  * by the 2nd round of ldlm_namespace_clean() call in
2798                  * osc_import_event(). */
2799                 ldlm_clear_cleaned(lock);
2800         }
2801         unlock_res(res);
2802
2803         if (osc != NULL) {
2804                 osc_object_invalidate(env, osc);
2805                 cl_object_put(env, osc2cl(osc));
2806         }
2807
2808         RETURN(0);
2809 }
2810 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2811
2812 static int osc_import_event(struct obd_device *obd,
2813                             struct obd_import *imp,
2814                             enum obd_import_event event)
2815 {
2816         struct client_obd *cli;
2817         int rc = 0;
2818
2819         ENTRY;
2820         LASSERT(imp->imp_obd == obd);
2821
2822         switch (event) {
2823         case IMP_EVENT_DISCON: {
2824                 cli = &obd->u.cli;
2825                 spin_lock(&cli->cl_loi_list_lock);
2826                 cli->cl_avail_grant = 0;
2827                 cli->cl_lost_grant = 0;
2828                 spin_unlock(&cli->cl_loi_list_lock);
2829                 break;
2830         }
2831         case IMP_EVENT_INACTIVE: {
2832                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2833                 break;
2834         }
2835         case IMP_EVENT_INVALIDATE: {
2836                 struct ldlm_namespace *ns = obd->obd_namespace;
2837                 struct lu_env         *env;
2838                 __u16                  refcheck;
2839
2840                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2841
2842                 env = cl_env_get(&refcheck);
2843                 if (!IS_ERR(env)) {
2844                         osc_io_unplug(env, &obd->u.cli, NULL);
2845
2846                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2847                                                  osc_ldlm_resource_invalidate,
2848                                                  env, 0);
2849                         cl_env_put(env, &refcheck);
2850
2851                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2852                 } else
2853                         rc = PTR_ERR(env);
2854                 break;
2855         }
2856         case IMP_EVENT_ACTIVE: {
2857                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2858                 break;
2859         }
2860         case IMP_EVENT_OCD: {
2861                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2862
2863                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2864                         osc_init_grant(&obd->u.cli, ocd);
2865
2866                 /* See bug 7198 */
2867                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2868                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2869
2870                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2871                 break;
2872         }
2873         case IMP_EVENT_DEACTIVATE: {
2874                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2875                 break;
2876         }
2877         case IMP_EVENT_ACTIVATE: {
2878                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2879                 break;
2880         }
2881         default:
2882                 CERROR("Unknown import event %d\n", event);
2883                 LBUG();
2884         }
2885         RETURN(rc);
2886 }
2887
2888 /**
2889  * Determine whether the lock can be canceled before replaying the lock
2890  * during recovery, see bug16774 for detailed information.
2891  *
2892  * \retval zero the lock can't be canceled
2893  * \retval other ok to cancel
2894  */
2895 static int osc_cancel_weight(struct ldlm_lock *lock)
2896 {
2897         /*
2898          * Cancel all unused and granted extent lock.
2899          */
2900         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2901             lock->l_granted_mode == lock->l_req_mode &&
2902             osc_ldlm_weigh_ast(lock) == 0)
2903                 RETURN(1);
2904
2905         RETURN(0);
2906 }
2907
2908 static int brw_queue_work(const struct lu_env *env, void *data)
2909 {
2910         struct client_obd *cli = data;
2911
2912         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2913
2914         osc_io_unplug(env, cli, NULL);
2915         RETURN(0);
2916 }
2917
2918 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2919 {
2920         struct client_obd *cli = &obd->u.cli;
2921         void *handler;
2922         int rc;
2923
2924         ENTRY;
2925
2926         rc = ptlrpcd_addref();
2927         if (rc)
2928                 RETURN(rc);
2929
2930         rc = client_obd_setup(obd, lcfg);
2931         if (rc)
2932                 GOTO(out_ptlrpcd, rc);
2933
2934
2935         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2936         if (IS_ERR(handler))
2937                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2938         cli->cl_writeback_work = handler;
2939
2940         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2941         if (IS_ERR(handler))
2942                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2943         cli->cl_lru_work = handler;
2944
2945         rc = osc_quota_setup(obd);
2946         if (rc)
2947                 GOTO(out_ptlrpcd_work, rc);
2948
2949         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2950
2951         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2952         RETURN(rc);
2953
2954 out_ptlrpcd_work:
2955         if (cli->cl_writeback_work != NULL) {
2956                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2957                 cli->cl_writeback_work = NULL;
2958         }
2959         if (cli->cl_lru_work != NULL) {
2960                 ptlrpcd_destroy_work(cli->cl_lru_work);
2961                 cli->cl_lru_work = NULL;
2962         }
2963         client_obd_cleanup(obd);
2964 out_ptlrpcd:
2965         ptlrpcd_decref();
2966         RETURN(rc);
2967 }
2968 EXPORT_SYMBOL(osc_setup_common);
2969
2970 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2971 {
2972         struct client_obd *cli = &obd->u.cli;
2973         struct obd_type   *type;
2974         int                adding;
2975         int                added;
2976         int                req_count;
2977         int                rc;
2978
2979         ENTRY;
2980
2981         rc = osc_setup_common(obd, lcfg);
2982         if (rc < 0)
2983                 RETURN(rc);
2984
2985 #ifdef CONFIG_PROC_FS
2986         obd->obd_vars = lprocfs_osc_obd_vars;
2987 #endif
2988         /* If this is true then both client (osc) and server (osp) are on the
2989          * same node. The osp layer if loaded first will register the osc proc
2990          * directory. In that case this obd_device will be attached its proc
2991          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2992          */
2993         type = class_search_type(LUSTRE_OSP_NAME);
2994         if (type && type->typ_procsym) {
2995                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2996                                                        type->typ_procsym,
2997                                                        obd->obd_vars, obd);
2998                 if (IS_ERR(obd->obd_proc_entry)) {
2999                         rc = PTR_ERR(obd->obd_proc_entry);
3000                         CERROR("error %d setting up lprocfs for %s\n", rc,
3001                                obd->obd_name);
3002                         obd->obd_proc_entry = NULL;
3003                 }
3004         }
3005
3006         rc = lprocfs_obd_setup(obd, false);
3007         if (!rc) {
3008                 /* If the basic OSC proc tree construction succeeded then
3009                  * lets do the rest.
3010                  */
3011                 lproc_osc_attach_seqstat(obd);
3012                 sptlrpc_lprocfs_cliobd_attach(obd);
3013                 ptlrpc_lprocfs_register_obd(obd);
3014         }
3015
3016         /*
3017          * We try to control the total number of requests with a upper limit
3018          * osc_reqpool_maxreqcount. There might be some race which will cause
3019          * over-limit allocation, but it is fine.
3020          */
3021         req_count = atomic_read(&osc_pool_req_count);
3022         if (req_count < osc_reqpool_maxreqcount) {
3023                 adding = cli->cl_max_rpcs_in_flight + 2;
3024                 if (req_count + adding > osc_reqpool_maxreqcount)
3025                         adding = osc_reqpool_maxreqcount - req_count;
3026
3027                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3028                 atomic_add(added, &osc_pool_req_count);
3029         }
3030
3031         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3032         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3033
3034         spin_lock(&osc_shrink_lock);
3035         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3036         spin_unlock(&osc_shrink_lock);
3037
3038         RETURN(0);
3039 }
3040
3041 int osc_precleanup_common(struct obd_device *obd)
3042 {
3043         struct client_obd *cli = &obd->u.cli;
3044         ENTRY;
3045
3046         /* LU-464
3047          * for echo client, export may be on zombie list, wait for
3048          * zombie thread to cull it, because cli.cl_import will be
3049          * cleared in client_disconnect_export():
3050          *   class_export_destroy() -> obd_cleanup() ->
3051          *   echo_device_free() -> echo_client_cleanup() ->
3052          *   obd_disconnect() -> osc_disconnect() ->
3053          *   client_disconnect_export()
3054          */
3055         obd_zombie_barrier();
3056         if (cli->cl_writeback_work) {
3057                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3058                 cli->cl_writeback_work = NULL;
3059         }
3060
3061         if (cli->cl_lru_work) {
3062                 ptlrpcd_destroy_work(cli->cl_lru_work);
3063                 cli->cl_lru_work = NULL;
3064         }
3065
3066         obd_cleanup_client_import(obd);
3067         RETURN(0);
3068 }
3069 EXPORT_SYMBOL(osc_precleanup_common);
3070
3071 static int osc_precleanup(struct obd_device *obd)
3072 {
3073         ENTRY;
3074
3075         osc_precleanup_common(obd);
3076
3077         ptlrpc_lprocfs_unregister_obd(obd);
3078         RETURN(0);
3079 }
3080
3081 int osc_cleanup_common(struct obd_device *obd)
3082 {
3083         struct client_obd *cli = &obd->u.cli;
3084         int rc;
3085
3086         ENTRY;
3087
3088         spin_lock(&osc_shrink_lock);
3089         list_del(&cli->cl_shrink_list);
3090         spin_unlock(&osc_shrink_lock);
3091
3092         /* lru cleanup */
3093         if (cli->cl_cache != NULL) {
3094                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3095                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3096                 list_del_init(&cli->cl_lru_osc);
3097                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3098                 cli->cl_lru_left = NULL;
3099                 cl_cache_decref(cli->cl_cache);
3100                 cli->cl_cache = NULL;
3101         }
3102
3103         /* free memory of osc quota cache */
3104         osc_quota_cleanup(obd);
3105
3106         rc = client_obd_cleanup(obd);
3107
3108         ptlrpcd_decref();
3109         RETURN(rc);
3110 }
3111 EXPORT_SYMBOL(osc_cleanup_common);
3112
3113 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3114 {
3115         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3116         return rc > 0 ? 0: rc;
3117 }
3118
3119 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3120 {
3121         return osc_process_config_base(obd, buf);
3122 }
3123
3124 static struct obd_ops osc_obd_ops = {
3125         .o_owner                = THIS_MODULE,
3126         .o_setup                = osc_setup,
3127         .o_precleanup           = osc_precleanup,
3128         .o_cleanup              = osc_cleanup_common,
3129         .o_add_conn             = client_import_add_conn,
3130         .o_del_conn             = client_import_del_conn,
3131         .o_connect              = client_connect_import,
3132         .o_reconnect            = osc_reconnect,
3133         .o_disconnect           = osc_disconnect,
3134         .o_statfs               = osc_statfs,
3135         .o_statfs_async         = osc_statfs_async,
3136         .o_create               = osc_create,
3137         .o_destroy              = osc_destroy,
3138         .o_getattr              = osc_getattr,
3139         .o_setattr              = osc_setattr,
3140         .o_iocontrol            = osc_iocontrol,
3141         .o_set_info_async       = osc_set_info_async,
3142         .o_import_event         = osc_import_event,
3143         .o_process_config       = osc_process_config,
3144         .o_quotactl             = osc_quotactl,
3145 };
3146
3147 static struct shrinker *osc_cache_shrinker;
3148 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3149 DEFINE_SPINLOCK(osc_shrink_lock);
3150
3151 #ifndef HAVE_SHRINKER_COUNT
3152 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3153 {
3154         struct shrink_control scv = {
3155                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3156                 .gfp_mask   = shrink_param(sc, gfp_mask)
3157         };
3158 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3159         struct shrinker *shrinker = NULL;
3160 #endif
3161
3162         (void)osc_cache_shrink_scan(shrinker, &scv);
3163
3164         return osc_cache_shrink_count(shrinker, &scv);
3165 }
3166 #endif
3167
3168 static int __init osc_init(void)
3169 {
3170         bool enable_proc = true;
3171         struct obd_type *type;
3172         unsigned int reqpool_size;
3173         unsigned int reqsize;
3174         int rc;
3175         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3176                          osc_cache_shrink_count, osc_cache_shrink_scan);
3177         ENTRY;
3178
3179         /* print an address of _any_ initialized kernel symbol from this
3180          * module, to allow debugging with gdb that doesn't support data
3181          * symbols from modules.*/
3182         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3183
3184         rc = lu_kmem_init(osc_caches);
3185         if (rc)
3186                 RETURN(rc);
3187
3188         type = class_search_type(LUSTRE_OSP_NAME);
3189         if (type != NULL && type->typ_procsym != NULL)
3190                 enable_proc = false;
3191
3192         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3193                                  LUSTRE_OSC_NAME, &osc_device_type);
3194         if (rc)
3195                 GOTO(out_kmem, rc);
3196
3197         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3198
3199         /* This is obviously too much memory, only prevent overflow here */
3200         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3201                 GOTO(out_type, rc = -EINVAL);
3202
3203         reqpool_size = osc_reqpool_mem_max << 20;
3204
3205         reqsize = 1;
3206         while (reqsize < OST_IO_MAXREQSIZE)
3207                 reqsize = reqsize << 1;
3208
3209         /*
3210          * We don't enlarge the request count in OSC pool according to
3211          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3212          * tried after normal allocation failed. So a small OSC pool won't
3213          * cause much performance degression in most of cases.
3214          */
3215         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3216
3217         atomic_set(&osc_pool_req_count, 0);
3218         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3219                                           ptlrpc_add_rqs_to_pool);
3220
3221         if (osc_rq_pool != NULL)
3222                 GOTO(out, rc);
3223         rc = -ENOMEM;
3224 out_type:
3225         class_unregister_type(LUSTRE_OSC_NAME);
3226 out_kmem:
3227         lu_kmem_fini(osc_caches);
3228 out:
3229         RETURN(rc);
3230 }
3231
3232 static void __exit osc_exit(void)
3233 {
3234         remove_shrinker(osc_cache_shrinker);
3235         class_unregister_type(LUSTRE_OSC_NAME);
3236         lu_kmem_fini(osc_caches);
3237         ptlrpc_free_rq_pool(osc_rq_pool);
3238 }
3239
3240 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3241 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3242 MODULE_VERSION(LUSTRE_VERSION_STRING);
3243 MODULE_LICENSE("GPL");
3244
3245 module_init(osc_init);
3246 module_exit(osc_exit);