Whamcloud - gitweb
LU-3285 merge: 'dom' branch merging
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 #define osc_grant_args osc_brw_async_args
62
63 struct osc_setattr_args {
64         struct obdo             *sa_oa;
65         obd_enqueue_update_f     sa_upcall;
66         void                    *sa_cookie;
67 };
68
69 struct osc_fsync_args {
70         struct osc_object       *fa_obj;
71         struct obdo             *fa_oa;
72         obd_enqueue_update_f    fa_upcall;
73         void                    *fa_cookie;
74 };
75
76 struct osc_ladvise_args {
77         struct obdo             *la_oa;
78         obd_enqueue_update_f     la_upcall;
79         void                    *la_cookie;
80 };
81
82 static void osc_release_ppga(struct brw_page **ppga, size_t count);
83 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
84                          void *data, int rc);
85
86 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
87 {
88         struct ost_body *body;
89
90         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
91         LASSERT(body);
92
93         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
94 }
95
96 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
97                        struct obdo *oa)
98 {
99         struct ptlrpc_request   *req;
100         struct ost_body         *body;
101         int                      rc;
102
103         ENTRY;
104         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
105         if (req == NULL)
106                 RETURN(-ENOMEM);
107
108         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
109         if (rc) {
110                 ptlrpc_request_free(req);
111                 RETURN(rc);
112         }
113
114         osc_pack_req_body(req, oa);
115
116         ptlrpc_request_set_replen(req);
117
118         rc = ptlrpc_queue_wait(req);
119         if (rc)
120                 GOTO(out, rc);
121
122         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
123         if (body == NULL)
124                 GOTO(out, rc = -EPROTO);
125
126         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
127         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
128
129         oa->o_blksize = cli_brw_size(exp->exp_obd);
130         oa->o_valid |= OBD_MD_FLBLKSZ;
131
132         EXIT;
133 out:
134         ptlrpc_req_finished(req);
135
136         return rc;
137 }
138
139 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
140                        struct obdo *oa)
141 {
142         struct ptlrpc_request   *req;
143         struct ost_body         *body;
144         int                      rc;
145
146         ENTRY;
147         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
148
149         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
150         if (req == NULL)
151                 RETURN(-ENOMEM);
152
153         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
154         if (rc) {
155                 ptlrpc_request_free(req);
156                 RETURN(rc);
157         }
158
159         osc_pack_req_body(req, oa);
160
161         ptlrpc_request_set_replen(req);
162
163         rc = ptlrpc_queue_wait(req);
164         if (rc)
165                 GOTO(out, rc);
166
167         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
168         if (body == NULL)
169                 GOTO(out, rc = -EPROTO);
170
171         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
172
173         EXIT;
174 out:
175         ptlrpc_req_finished(req);
176
177         RETURN(rc);
178 }
179
180 static int osc_setattr_interpret(const struct lu_env *env,
181                                  struct ptlrpc_request *req,
182                                  struct osc_setattr_args *sa, int rc)
183 {
184         struct ost_body *body;
185         ENTRY;
186
187         if (rc != 0)
188                 GOTO(out, rc);
189
190         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191         if (body == NULL)
192                 GOTO(out, rc = -EPROTO);
193
194         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
195                              &body->oa);
196 out:
197         rc = sa->sa_upcall(sa->sa_cookie, rc);
198         RETURN(rc);
199 }
200
201 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
202                       obd_enqueue_update_f upcall, void *cookie,
203                       struct ptlrpc_request_set *rqset)
204 {
205         struct ptlrpc_request   *req;
206         struct osc_setattr_args *sa;
207         int                      rc;
208
209         ENTRY;
210
211         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
212         if (req == NULL)
213                 RETURN(-ENOMEM);
214
215         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
216         if (rc) {
217                 ptlrpc_request_free(req);
218                 RETURN(rc);
219         }
220
221         osc_pack_req_body(req, oa);
222
223         ptlrpc_request_set_replen(req);
224
225         /* do mds to ost setattr asynchronously */
226         if (!rqset) {
227                 /* Do not wait for response. */
228                 ptlrpcd_add_req(req);
229         } else {
230                 req->rq_interpret_reply =
231                         (ptlrpc_interpterer_t)osc_setattr_interpret;
232
233                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
234                 sa = ptlrpc_req_async_args(req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
323         la = ptlrpc_req_async_args(req);
324         la->la_oa = oa;
325         la->la_upcall = upcall;
326         la->la_cookie = cookie;
327
328         if (rqset == PTLRPCD_SET)
329                 ptlrpcd_add_req(req);
330         else
331                 ptlrpc_set_add_req(rqset, req);
332
333         RETURN(0);
334 }
335
336 static int osc_create(const struct lu_env *env, struct obd_export *exp,
337                       struct obdo *oa)
338 {
339         struct ptlrpc_request *req;
340         struct ost_body       *body;
341         int                    rc;
342         ENTRY;
343
344         LASSERT(oa != NULL);
345         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
346         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
347
348         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
349         if (req == NULL)
350                 GOTO(out, rc = -ENOMEM);
351
352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
353         if (rc) {
354                 ptlrpc_request_free(req);
355                 GOTO(out, rc);
356         }
357
358         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359         LASSERT(body);
360
361         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
362
363         ptlrpc_request_set_replen(req);
364
365         rc = ptlrpc_queue_wait(req);
366         if (rc)
367                 GOTO(out_req, rc);
368
369         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
370         if (body == NULL)
371                 GOTO(out_req, rc = -EPROTO);
372
373         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
375
376         oa->o_blksize = cli_brw_size(exp->exp_obd);
377         oa->o_valid |= OBD_MD_FLBLKSZ;
378
379         CDEBUG(D_HA, "transno: %lld\n",
380                lustre_msg_get_transno(req->rq_repmsg));
381 out_req:
382         ptlrpc_req_finished(req);
383 out:
384         RETURN(rc);
385 }
386
387 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
388                    obd_enqueue_update_f upcall, void *cookie)
389 {
390         struct ptlrpc_request *req;
391         struct osc_setattr_args *sa;
392         struct obd_import *imp = class_exp2cliimp(exp);
393         struct ost_body *body;
394         int rc;
395
396         ENTRY;
397
398         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
399         if (req == NULL)
400                 RETURN(-ENOMEM);
401
402         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
403         if (rc < 0) {
404                 ptlrpc_request_free(req);
405                 RETURN(rc);
406         }
407
408         osc_set_io_portal(req);
409
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413
414         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
415
416         ptlrpc_request_set_replen(req);
417
418         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
419         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
420         sa = ptlrpc_req_async_args(req);
421         sa->sa_oa = oa;
422         sa->sa_upcall = upcall;
423         sa->sa_cookie = cookie;
424
425         ptlrpcd_add_req(req);
426
427         RETURN(0);
428 }
429 EXPORT_SYMBOL(osc_punch_send);
430
431 static int osc_sync_interpret(const struct lu_env *env,
432                               struct ptlrpc_request *req,
433                               void *arg, int rc)
434 {
435         struct osc_fsync_args   *fa = arg;
436         struct ost_body         *body;
437         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
438         unsigned long           valid = 0;
439         struct cl_object        *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *data,
551                                  int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676
677                 nrpages = cli->cl_max_pages_per_rpc;
678                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
679                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
680                 oa->o_undirty = nrpages << PAGE_SHIFT;
681                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
682                                  GRANT_PARAM)) {
683                         int nrextents;
684
685                         /* take extent tax into account when asking for more
686                          * grant space */
687                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
688                                      cli->cl_max_extent_pages;
689                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
690                 }
691         }
692         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
693         oa->o_dropped = cli->cl_lost_grant;
694         cli->cl_lost_grant = 0;
695         spin_unlock(&cli->cl_loi_list_lock);
696         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
697                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
698 }
699
700 void osc_update_next_shrink(struct client_obd *cli)
701 {
702         cli->cl_next_shrink_grant = ktime_get_seconds() +
703                                     cli->cl_grant_shrink_interval;
704
705         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
706                cli->cl_next_shrink_grant);
707 }
708
709 static void __osc_update_grant(struct client_obd *cli, u64 grant)
710 {
711         spin_lock(&cli->cl_loi_list_lock);
712         cli->cl_avail_grant += grant;
713         spin_unlock(&cli->cl_loi_list_lock);
714 }
715
716 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
717 {
718         if (body->oa.o_valid & OBD_MD_FLGRANT) {
719                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
720                 __osc_update_grant(cli, body->oa.o_grant);
721         }
722 }
723
724 static int osc_shrink_grant_interpret(const struct lu_env *env,
725                                       struct ptlrpc_request *req,
726                                       void *aa, int rc)
727 {
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
730         struct ost_body *body;
731
732         if (rc != 0) {
733                 __osc_update_grant(cli, oa->o_grant);
734                 GOTO(out, rc);
735         }
736
737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
738         LASSERT(body);
739         osc_update_grant(cli, body);
740 out:
741         OBDO_FREE(oa);
742         return rc;
743 }
744
745 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
746 {
747         spin_lock(&cli->cl_loi_list_lock);
748         oa->o_grant = cli->cl_avail_grant / 4;
749         cli->cl_avail_grant -= oa->o_grant;
750         spin_unlock(&cli->cl_loi_list_lock);
751         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
752                 oa->o_valid |= OBD_MD_FLFLAGS;
753                 oa->o_flags = 0;
754         }
755         oa->o_flags |= OBD_FL_SHRINK_GRANT;
756         osc_update_next_shrink(cli);
757 }
758
759 /* Shrink the current grant, either from some large amount to enough for a
760  * full set of in-flight RPCs, or if we have already shrunk to that limit
761  * then to enough for a single RPC.  This avoids keeping more grant than
762  * needed, and avoids shrinking the grant piecemeal. */
763 static int osc_shrink_grant(struct client_obd *cli)
764 {
765         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
766                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
767
768         spin_lock(&cli->cl_loi_list_lock);
769         if (cli->cl_avail_grant <= target_bytes)
770                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
771         spin_unlock(&cli->cl_loi_list_lock);
772
773         return osc_shrink_grant_to_target(cli, target_bytes);
774 }
775
776 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
777 {
778         int                     rc = 0;
779         struct ost_body        *body;
780         ENTRY;
781
782         spin_lock(&cli->cl_loi_list_lock);
783         /* Don't shrink if we are already above or below the desired limit
784          * We don't want to shrink below a single RPC, as that will negatively
785          * impact block allocation and long-term performance. */
786         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788
789         if (target_bytes >= cli->cl_avail_grant) {
790                 spin_unlock(&cli->cl_loi_list_lock);
791                 RETURN(0);
792         }
793         spin_unlock(&cli->cl_loi_list_lock);
794
795         OBD_ALLOC_PTR(body);
796         if (!body)
797                 RETURN(-ENOMEM);
798
799         osc_announce_cached(cli, &body->oa, 0);
800
801         spin_lock(&cli->cl_loi_list_lock);
802         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
803         cli->cl_avail_grant = target_bytes;
804         spin_unlock(&cli->cl_loi_list_lock);
805         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
806                 body->oa.o_valid |= OBD_MD_FLFLAGS;
807                 body->oa.o_flags = 0;
808         }
809         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
810         osc_update_next_shrink(cli);
811
812         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
813                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
814                                 sizeof(*body), body, NULL);
815         if (rc != 0)
816                 __osc_update_grant(cli, body->oa.o_grant);
817         OBD_FREE_PTR(body);
818         RETURN(rc);
819 }
820
821 static int osc_should_shrink_grant(struct client_obd *client)
822 {
823         time64_t next_shrink = client->cl_next_shrink_grant;
824
825         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
826              OBD_CONNECT_GRANT_SHRINK) == 0)
827                 return 0;
828
829         if (ktime_get_seconds() >= next_shrink - 5) {
830                 /* Get the current RPC size directly, instead of going via:
831                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
832                  * Keep comment here so that it can be found by searching. */
833                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
834
835                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
836                     client->cl_avail_grant > brw_size)
837                         return 1;
838                 else
839                         osc_update_next_shrink(client);
840         }
841         return 0;
842 }
843
844 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
845 {
846         struct client_obd *client;
847
848         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
849                 if (osc_should_shrink_grant(client))
850                         osc_shrink_grant(client);
851         }
852         return 0;
853 }
854
855 static int osc_add_shrink_grant(struct client_obd *client)
856 {
857         int rc;
858
859         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
860                                        TIMEOUT_GRANT,
861                                        osc_grant_shrink_grant_cb, NULL,
862                                        &client->cl_grant_shrink_list);
863         if (rc) {
864                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
865                 return rc;
866         }
867         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
868         osc_update_next_shrink(client);
869         return 0;
870 }
871
872 static int osc_del_shrink_grant(struct client_obd *client)
873 {
874         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
875                                          TIMEOUT_GRANT);
876 }
877
878 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
879 {
880         /*
881          * ocd_grant is the total grant amount we're expect to hold: if we've
882          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
883          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
884          * dirty.
885          *
886          * race is tolerable here: if we're evicted, but imp_state already
887          * left EVICTED state, then cl_dirty_pages must be 0 already.
888          */
889         spin_lock(&cli->cl_loi_list_lock);
890         cli->cl_avail_grant = ocd->ocd_grant;
891         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
892                 cli->cl_avail_grant -= cli->cl_reserved_grant;
893                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
894                         cli->cl_avail_grant -= cli->cl_dirty_grant;
895                 else
896                         cli->cl_avail_grant -=
897                                         cli->cl_dirty_pages << PAGE_SHIFT;
898         }
899
900         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
901                 u64 size;
902                 int chunk_mask;
903
904                 /* overhead for each extent insertion */
905                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
906                 /* determine the appropriate chunk size used by osc_extent. */
907                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
908                                           ocd->ocd_grant_blkbits);
909                 /* max_pages_per_rpc must be chunk aligned */
910                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
911                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
912                                              ~chunk_mask) & chunk_mask;
913                 /* determine maximum extent size, in #pages */
914                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
915                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
916                 if (cli->cl_max_extent_pages == 0)
917                         cli->cl_max_extent_pages = 1;
918         } else {
919                 cli->cl_grant_extent_tax = 0;
920                 cli->cl_chunkbits = PAGE_SHIFT;
921                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
922         }
923         spin_unlock(&cli->cl_loi_list_lock);
924
925         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
926                 "chunk bits: %d cl_max_extent_pages: %d\n",
927                 cli_name(cli),
928                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
929                 cli->cl_max_extent_pages);
930
931         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
932             list_empty(&cli->cl_grant_shrink_list))
933                 osc_add_shrink_grant(cli);
934 }
935 EXPORT_SYMBOL(osc_init_grant);
936
937 /* We assume that the reason this OSC got a short read is because it read
938  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
939  * via the LOV, and it _knows_ it's reading inside the file, it's just that
940  * this stripe never got written at or beyond this stripe offset yet. */
941 static void handle_short_read(int nob_read, size_t page_count,
942                               struct brw_page **pga)
943 {
944         char *ptr;
945         int i = 0;
946
947         /* skip bytes read OK */
948         while (nob_read > 0) {
949                 LASSERT (page_count > 0);
950
951                 if (pga[i]->count > nob_read) {
952                         /* EOF inside this page */
953                         ptr = kmap(pga[i]->pg) +
954                                 (pga[i]->off & ~PAGE_MASK);
955                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
956                         kunmap(pga[i]->pg);
957                         page_count--;
958                         i++;
959                         break;
960                 }
961
962                 nob_read -= pga[i]->count;
963                 page_count--;
964                 i++;
965         }
966
967         /* zero remaining pages */
968         while (page_count-- > 0) {
969                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
970                 memset(ptr, 0, pga[i]->count);
971                 kunmap(pga[i]->pg);
972                 i++;
973         }
974 }
975
976 static int check_write_rcs(struct ptlrpc_request *req,
977                            int requested_nob, int niocount,
978                            size_t page_count, struct brw_page **pga)
979 {
980         int     i;
981         __u32   *remote_rcs;
982
983         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
984                                                   sizeof(*remote_rcs) *
985                                                   niocount);
986         if (remote_rcs == NULL) {
987                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
988                 return(-EPROTO);
989         }
990
991         /* return error if any niobuf was in error */
992         for (i = 0; i < niocount; i++) {
993                 if ((int)remote_rcs[i] < 0)
994                         return(remote_rcs[i]);
995
996                 if (remote_rcs[i] != 0) {
997                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
998                                 i, remote_rcs[i], req);
999                         return(-EPROTO);
1000                 }
1001         }
1002         if (req->rq_bulk != NULL &&
1003             req->rq_bulk->bd_nob_transferred != requested_nob) {
1004                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1005                        req->rq_bulk->bd_nob_transferred, requested_nob);
1006                 return(-EPROTO);
1007         }
1008
1009         return (0);
1010 }
1011
1012 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1013 {
1014         if (p1->flag != p2->flag) {
1015                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1016                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1017                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1018
1019                 /* warn if we try to combine flags that we don't know to be
1020                  * safe to combine */
1021                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1022                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1023                               "report this at https://jira.hpdd.intel.com/\n",
1024                               p1->flag, p2->flag);
1025                 }
1026                 return 0;
1027         }
1028
1029         return (p1->off + p1->count == p2->off);
1030 }
1031
1032 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1033                              struct brw_page **pga, int opc,
1034                              enum cksum_types cksum_type)
1035 {
1036         u32                             cksum;
1037         int                             i = 0;
1038         struct cfs_crypto_hash_desc     *hdesc;
1039         unsigned int                    bufsize;
1040         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1041
1042         LASSERT(pg_count > 0);
1043
1044         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1045         if (IS_ERR(hdesc)) {
1046                 CERROR("Unable to initialize checksum hash %s\n",
1047                        cfs_crypto_hash_name(cfs_alg));
1048                 return PTR_ERR(hdesc);
1049         }
1050
1051         while (nob > 0 && pg_count > 0) {
1052                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1053
1054                 /* corrupt the data before we compute the checksum, to
1055                  * simulate an OST->client data error */
1056                 if (i == 0 && opc == OST_READ &&
1057                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1058                         unsigned char *ptr = kmap(pga[i]->pg);
1059                         int off = pga[i]->off & ~PAGE_MASK;
1060
1061                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1062                         kunmap(pga[i]->pg);
1063                 }
1064                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1065                                             pga[i]->off & ~PAGE_MASK,
1066                                             count);
1067                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1068                                (int)(pga[i]->off & ~PAGE_MASK));
1069
1070                 nob -= pga[i]->count;
1071                 pg_count--;
1072                 i++;
1073         }
1074
1075         bufsize = sizeof(cksum);
1076         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1077
1078         /* For sending we only compute the wrong checksum instead
1079          * of corrupting the data so it is still correct on a redo */
1080         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1081                 cksum++;
1082
1083         return cksum;
1084 }
1085
1086 static int
1087 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1088                      u32 page_count, struct brw_page **pga,
1089                      struct ptlrpc_request **reqp, int resend)
1090 {
1091         struct ptlrpc_request   *req;
1092         struct ptlrpc_bulk_desc *desc;
1093         struct ost_body         *body;
1094         struct obd_ioobj        *ioobj;
1095         struct niobuf_remote    *niobuf;
1096         int niocount, i, requested_nob, opc, rc, short_io_size;
1097         struct osc_brw_async_args *aa;
1098         struct req_capsule      *pill;
1099         struct brw_page *pg_prev;
1100         void *short_io_buf;
1101
1102         ENTRY;
1103         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1104                 RETURN(-ENOMEM); /* Recoverable */
1105         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1106                 RETURN(-EINVAL); /* Fatal */
1107
1108         if ((cmd & OBD_BRW_WRITE) != 0) {
1109                 opc = OST_WRITE;
1110                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1111                                                 osc_rq_pool,
1112                                                 &RQF_OST_BRW_WRITE);
1113         } else {
1114                 opc = OST_READ;
1115                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1116         }
1117         if (req == NULL)
1118                 RETURN(-ENOMEM);
1119
1120         for (niocount = i = 1; i < page_count; i++) {
1121                 if (!can_merge_pages(pga[i - 1], pga[i]))
1122                         niocount++;
1123         }
1124
1125         pill = &req->rq_pill;
1126         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1127                              sizeof(*ioobj));
1128         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1129                              niocount * sizeof(*niobuf));
1130
1131         for (i = 0; i < page_count; i++)
1132                 short_io_size += pga[i]->count;
1133
1134         /* Check if we can do a short io. */
1135         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1136             imp_connect_shortio(cli->cl_import)))
1137                 short_io_size = 0;
1138
1139         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1140                              opc == OST_READ ? 0 : short_io_size);
1141         if (opc == OST_READ)
1142                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1143                                      short_io_size);
1144
1145         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1146         if (rc) {
1147                 ptlrpc_request_free(req);
1148                 RETURN(rc);
1149         }
1150         osc_set_io_portal(req);
1151
1152         ptlrpc_at_set_req_timeout(req);
1153         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1154          * retry logic */
1155         req->rq_no_retry_einprogress = 1;
1156
1157         if (short_io_size != 0) {
1158                 desc = NULL;
1159                 short_io_buf = NULL;
1160                 goto no_bulk;
1161         }
1162
1163         desc = ptlrpc_prep_bulk_imp(req, page_count,
1164                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1165                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1166                         PTLRPC_BULK_PUT_SINK) |
1167                         PTLRPC_BULK_BUF_KIOV,
1168                 OST_BULK_PORTAL,
1169                 &ptlrpc_bulk_kiov_pin_ops);
1170
1171         if (desc == NULL)
1172                 GOTO(out, rc = -ENOMEM);
1173         /* NB request now owns desc and will free it when it gets freed */
1174 no_bulk:
1175         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1176         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1177         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1178         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1179
1180         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1181
1182         obdo_to_ioobj(oa, ioobj);
1183         ioobj->ioo_bufcnt = niocount;
1184         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1185          * that might be send for this request.  The actual number is decided
1186          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1187          * "max - 1" for old client compatibility sending "0", and also so the
1188          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1189         if (desc != NULL)
1190                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1191         else /* short io */
1192                 ioobj_max_brw_set(ioobj, 0);
1193
1194         if (short_io_size != 0) {
1195                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1196                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1197                         body->oa.o_flags = 0;
1198                 }
1199                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1200                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1201                        short_io_size);
1202                 if (opc == OST_WRITE) {
1203                         short_io_buf = req_capsule_client_get(pill,
1204                                                               &RMF_SHORT_IO);
1205                         LASSERT(short_io_buf != NULL);
1206                 }
1207         }
1208
1209         LASSERT(page_count > 0);
1210         pg_prev = pga[0];
1211         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1212                 struct brw_page *pg = pga[i];
1213                 int poff = pg->off & ~PAGE_MASK;
1214
1215                 LASSERT(pg->count > 0);
1216                 /* make sure there is no gap in the middle of page array */
1217                 LASSERTF(page_count == 1 ||
1218                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1219                           ergo(i > 0 && i < page_count - 1,
1220                                poff == 0 && pg->count == PAGE_SIZE)   &&
1221                           ergo(i == page_count - 1, poff == 0)),
1222                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1223                          i, page_count, pg, pg->off, pg->count);
1224                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1225                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1226                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1227                          i, page_count,
1228                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1229                          pg_prev->pg, page_private(pg_prev->pg),
1230                          pg_prev->pg->index, pg_prev->off);
1231                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1232                         (pg->flag & OBD_BRW_SRVLOCK));
1233                 if (short_io_size != 0 && opc == OST_WRITE) {
1234                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1235
1236                         LASSERT(short_io_size >= requested_nob + pg->count);
1237                         memcpy(short_io_buf + requested_nob,
1238                                ptr + poff,
1239                                pg->count);
1240                         ll_kunmap_atomic(ptr, KM_USER0);
1241                 } else if (short_io_size == 0) {
1242                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1243                                                          pg->count);
1244                 }
1245                 requested_nob += pg->count;
1246
1247                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1248                         niobuf--;
1249                         niobuf->rnb_len += pg->count;
1250                 } else {
1251                         niobuf->rnb_offset = pg->off;
1252                         niobuf->rnb_len    = pg->count;
1253                         niobuf->rnb_flags  = pg->flag;
1254                 }
1255                 pg_prev = pg;
1256         }
1257
1258         LASSERTF((void *)(niobuf - niocount) ==
1259                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1260                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1261                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1262
1263         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1264         if (resend) {
1265                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1267                         body->oa.o_flags = 0;
1268                 }
1269                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1270         }
1271
1272         if (osc_should_shrink_grant(cli))
1273                 osc_shrink_grant_local(cli, &body->oa);
1274
1275         /* size[REQ_REC_OFF] still sizeof (*body) */
1276         if (opc == OST_WRITE) {
1277                 if (cli->cl_checksum &&
1278                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1279                         /* store cl_cksum_type in a local variable since
1280                          * it can be changed via lprocfs */
1281                         enum cksum_types cksum_type = cli->cl_cksum_type;
1282
1283                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284                                 body->oa.o_flags = 0;
1285
1286                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1287                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1288                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1289                                                              page_count, pga,
1290                                                              OST_WRITE,
1291                                                              cksum_type);
1292                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1293                                body->oa.o_cksum);
1294                         /* save this in 'oa', too, for later checking */
1295                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                         oa->o_flags |= cksum_type_pack(cksum_type);
1297                 } else {
1298                         /* clear out the checksum flag, in case this is a
1299                          * resend but cl_checksum is no longer set. b=11238 */
1300                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1301                 }
1302                 oa->o_cksum = body->oa.o_cksum;
1303                 /* 1 RC per niobuf */
1304                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1305                                      sizeof(__u32) * niocount);
1306         } else {
1307                 if (cli->cl_checksum &&
1308                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1309                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1310                                 body->oa.o_flags = 0;
1311                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1312                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1313                 }
1314
1315                 /* Client cksum has been already copied to wire obdo in previous
1316                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1317                  * resent due to cksum error, this will allow Server to
1318                  * check+dump pages on its side */
1319         }
1320         ptlrpc_request_set_replen(req);
1321
1322         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1323         aa = ptlrpc_req_async_args(req);
1324         aa->aa_oa = oa;
1325         aa->aa_requested_nob = requested_nob;
1326         aa->aa_nio_count = niocount;
1327         aa->aa_page_count = page_count;
1328         aa->aa_resends = 0;
1329         aa->aa_ppga = pga;
1330         aa->aa_cli = cli;
1331         INIT_LIST_HEAD(&aa->aa_oaps);
1332
1333         *reqp = req;
1334         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1335         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1336                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1337                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1338         RETURN(0);
1339
1340  out:
1341         ptlrpc_req_finished(req);
1342         RETURN(rc);
1343 }
1344
1345 char dbgcksum_file_name[PATH_MAX];
1346
1347 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1348                                 struct brw_page **pga, __u32 server_cksum,
1349                                 __u32 client_cksum)
1350 {
1351         struct file *filp;
1352         int rc, i;
1353         unsigned int len;
1354         char *buf;
1355         mm_segment_t oldfs;
1356
1357         /* will only keep dump of pages on first error for the same range in
1358          * file/fid, not during the resends/retries. */
1359         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1360                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1361                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1362                   libcfs_debug_file_path_arr :
1363                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1364                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1365                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1366                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1367                  pga[0]->off,
1368                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1369                  client_cksum, server_cksum);
1370         filp = filp_open(dbgcksum_file_name,
1371                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1372         if (IS_ERR(filp)) {
1373                 rc = PTR_ERR(filp);
1374                 if (rc == -EEXIST)
1375                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1376                                "checksum error: rc = %d\n", dbgcksum_file_name,
1377                                rc);
1378                 else
1379                         CERROR("%s: can't open to dump pages with checksum "
1380                                "error: rc = %d\n", dbgcksum_file_name, rc);
1381                 return;
1382         }
1383
1384         oldfs = get_fs();
1385         set_fs(KERNEL_DS);
1386         for (i = 0; i < page_count; i++) {
1387                 len = pga[i]->count;
1388                 buf = kmap(pga[i]->pg);
1389                 while (len != 0) {
1390                         rc = vfs_write(filp, (__force const char __user *)buf,
1391                                        len, &filp->f_pos);
1392                         if (rc < 0) {
1393                                 CERROR("%s: wanted to write %u but got %d "
1394                                        "error\n", dbgcksum_file_name, len, rc);
1395                                 break;
1396                         }
1397                         len -= rc;
1398                         buf += rc;
1399                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1400                                dbgcksum_file_name, rc);
1401                 }
1402                 kunmap(pga[i]->pg);
1403         }
1404         set_fs(oldfs);
1405
1406         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1407         if (rc)
1408                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1409         filp_close(filp, NULL);
1410         return;
1411 }
1412
1413 static int
1414 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum,
1416                                 struct osc_brw_async_args *aa)
1417 {
1418         __u32 new_cksum;
1419         char *msg;
1420         enum cksum_types cksum_type;
1421
1422         if (server_cksum == client_cksum) {
1423                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1424                 return 0;
1425         }
1426
1427         if (aa->aa_cli->cl_checksum_dump)
1428                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1429                                     server_cksum, client_cksum);
1430
1431         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1432                                        oa->o_flags : 0);
1433         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1434                                       aa->aa_ppga, OST_WRITE, cksum_type);
1435
1436         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1437                 msg = "the server did not use the checksum type specified in "
1438                       "the original request - likely a protocol problem";
1439         else if (new_cksum == server_cksum)
1440                 msg = "changed on the client after we checksummed it - "
1441                       "likely false positive due to mmap IO (bug 11742)";
1442         else if (new_cksum == client_cksum)
1443                 msg = "changed in transit before arrival at OST";
1444         else
1445                 msg = "changed in transit AND doesn't match the original - "
1446                       "likely false positive due to mmap IO (bug 11742)";
1447
1448         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1449                            DFID " object "DOSTID" extent [%llu-%llu], original "
1450                            "client csum %x (type %x), server csum %x (type %x),"
1451                            " client csum now %x\n",
1452                            aa->aa_cli->cl_import->imp_obd->obd_name,
1453                            msg, libcfs_nid2str(peer->nid),
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1456                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1457                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1458                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1459                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1460                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1461                            server_cksum, cksum_type, new_cksum);
1462         return 1;
1463 }
1464
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1467 {
1468         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469         const struct lnet_process_id *peer =
1470                         &req->rq_import->imp_connection->c_peer;
1471         struct client_obd *cli = aa->aa_cli;
1472         struct ost_body *body;
1473         u32 client_cksum = 0;
1474         ENTRY;
1475
1476         if (rc < 0 && rc != -EDQUOT) {
1477                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1478                 RETURN(rc);
1479         }
1480
1481         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1482         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1483         if (body == NULL) {
1484                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1485                 RETURN(-EPROTO);
1486         }
1487
1488         /* set/clear over quota flag for a uid/gid/projid */
1489         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1490             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1491                 unsigned qid[LL_MAXQUOTAS] = {
1492                                          body->oa.o_uid, body->oa.o_gid,
1493                                          body->oa.o_projid };
1494                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1495                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1496                        body->oa.o_valid, body->oa.o_flags);
1497                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1498                                        body->oa.o_flags);
1499         }
1500
1501         osc_update_grant(cli, body);
1502
1503         if (rc < 0)
1504                 RETURN(rc);
1505
1506         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         RETURN(-EPROTO);
1513                 }
1514
1515                 if (req->rq_bulk != NULL &&
1516                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         RETURN(-EAGAIN);
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa))
1522                         RETURN(-EAGAIN);
1523
1524                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1525                                      aa->aa_page_count, aa->aa_ppga);
1526                 GOTO(out, rc);
1527         }
1528
1529         /* The rest of this function executes only for OST_READs */
1530
1531         if (req->rq_bulk == NULL) {
1532                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1533                                           RCL_SERVER);
1534                 LASSERT(rc == req->rq_status);
1535         } else {
1536                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1537                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1538         }
1539         if (rc < 0)
1540                 GOTO(out, rc = -EAGAIN);
1541
1542         if (rc > aa->aa_requested_nob) {
1543                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1544                        aa->aa_requested_nob);
1545                 RETURN(-EPROTO);
1546         }
1547
1548         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1549                 CERROR ("Unexpected rc %d (%d transferred)\n",
1550                         rc, req->rq_bulk->bd_nob_transferred);
1551                 return (-EPROTO);
1552         }
1553
1554         if (req->rq_bulk == NULL) {
1555                 /* short io */
1556                 int nob, pg_count, i = 0;
1557                 unsigned char *buf;
1558
1559                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1560                 pg_count = aa->aa_page_count;
1561                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1562                                                    rc);
1563                 nob = rc;
1564                 while (nob > 0 && pg_count > 0) {
1565                         unsigned char *ptr;
1566                         int count = aa->aa_ppga[i]->count > nob ?
1567                                     nob : aa->aa_ppga[i]->count;
1568
1569                         CDEBUG(D_CACHE, "page %p count %d\n",
1570                                aa->aa_ppga[i]->pg, count);
1571                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1572                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1573                                count);
1574                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1575
1576                         buf += count;
1577                         nob -= count;
1578                         i++;
1579                         pg_count--;
1580                 }
1581         }
1582
1583         if (rc < aa->aa_requested_nob)
1584                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1585
1586         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1587                 static int cksum_counter;
1588                 u32        server_cksum = body->oa.o_cksum;
1589                 char      *via = "";
1590                 char      *router = "";
1591                 enum cksum_types cksum_type;
1592
1593                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1594                                                body->oa.o_flags : 0);
1595                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1596                                                  aa->aa_ppga, OST_READ,
1597                                                  cksum_type);
1598
1599                 if (req->rq_bulk != NULL &&
1600                     peer->nid != req->rq_bulk->bd_sender) {
1601                         via = " via ";
1602                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1603                 }
1604
1605                 if (server_cksum != client_cksum) {
1606                         struct ost_body *clbody;
1607                         u32 page_count = aa->aa_page_count;
1608
1609                         clbody = req_capsule_client_get(&req->rq_pill,
1610                                                         &RMF_OST_BODY);
1611                         if (cli->cl_checksum_dump)
1612                                 dump_all_bulk_pages(&clbody->oa, page_count,
1613                                                     aa->aa_ppga, server_cksum,
1614                                                     client_cksum);
1615
1616                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1617                                            "%s%s%s inode "DFID" object "DOSTID
1618                                            " extent [%llu-%llu], client %x, "
1619                                            "server %x, cksum_type %x\n",
1620                                            req->rq_import->imp_obd->obd_name,
1621                                            libcfs_nid2str(peer->nid),
1622                                            via, router,
1623                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1624                                                 clbody->oa.o_parent_seq : 0ULL,
1625                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1626                                                 clbody->oa.o_parent_oid : 0,
1627                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1628                                                 clbody->oa.o_parent_ver : 0,
1629                                            POSTID(&body->oa.o_oi),
1630                                            aa->aa_ppga[0]->off,
1631                                            aa->aa_ppga[page_count-1]->off +
1632                                            aa->aa_ppga[page_count-1]->count - 1,
1633                                            client_cksum, server_cksum,
1634                                            cksum_type);
1635                         cksum_counter = 0;
1636                         aa->aa_oa->o_cksum = client_cksum;
1637                         rc = -EAGAIN;
1638                 } else {
1639                         cksum_counter++;
1640                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1641                         rc = 0;
1642                 }
1643         } else if (unlikely(client_cksum)) {
1644                 static int cksum_missed;
1645
1646                 cksum_missed++;
1647                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1648                         CERROR("Checksum %u requested from %s but not sent\n",
1649                                cksum_missed, libcfs_nid2str(peer->nid));
1650         } else {
1651                 rc = 0;
1652         }
1653 out:
1654         if (rc >= 0)
1655                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1656                                      aa->aa_oa, &body->oa);
1657
1658         RETURN(rc);
1659 }
1660
1661 static int osc_brw_redo_request(struct ptlrpc_request *request,
1662                                 struct osc_brw_async_args *aa, int rc)
1663 {
1664         struct ptlrpc_request *new_req;
1665         struct osc_brw_async_args *new_aa;
1666         struct osc_async_page *oap;
1667         ENTRY;
1668
1669         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1670                   "redo for recoverable error %d", rc);
1671
1672         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1673                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1674                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1675                                   aa->aa_ppga, &new_req, 1);
1676         if (rc)
1677                 RETURN(rc);
1678
1679         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request != NULL) {
1681                         LASSERTF(request == oap->oap_request,
1682                                  "request %p != oap_request %p\n",
1683                                  request, oap->oap_request);
1684                         if (oap->oap_interrupted) {
1685                                 ptlrpc_req_finished(new_req);
1686                                 RETURN(-EINTR);
1687                         }
1688                 }
1689         }
1690         /* New request takes over pga and oaps from old request.
1691          * Note that copying a list_head doesn't work, need to move it... */
1692         aa->aa_resends++;
1693         new_req->rq_interpret_reply = request->rq_interpret_reply;
1694         new_req->rq_async_args = request->rq_async_args;
1695         new_req->rq_commit_cb = request->rq_commit_cb;
1696         /* cap resend delay to the current request timeout, this is similar to
1697          * what ptlrpc does (see after_reply()) */
1698         if (aa->aa_resends > new_req->rq_timeout)
1699                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1700         else
1701                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1702         new_req->rq_generation_set = 1;
1703         new_req->rq_import_generation = request->rq_import_generation;
1704
1705         new_aa = ptlrpc_req_async_args(new_req);
1706
1707         INIT_LIST_HEAD(&new_aa->aa_oaps);
1708         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1709         INIT_LIST_HEAD(&new_aa->aa_exts);
1710         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1711         new_aa->aa_resends = aa->aa_resends;
1712
1713         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1714                 if (oap->oap_request) {
1715                         ptlrpc_req_finished(oap->oap_request);
1716                         oap->oap_request = ptlrpc_request_addref(new_req);
1717                 }
1718         }
1719
1720         /* XXX: This code will run into problem if we're going to support
1721          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1722          * and wait for all of them to be finished. We should inherit request
1723          * set from old request. */
1724         ptlrpcd_add_req(new_req);
1725
1726         DEBUG_REQ(D_INFO, new_req, "new request");
1727         RETURN(0);
1728 }
1729
1730 /*
1731  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1732  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1733  * fine for our small page arrays and doesn't require allocation.  its an
1734  * insertion sort that swaps elements that are strides apart, shrinking the
1735  * stride down until its '1' and the array is sorted.
1736  */
1737 static void sort_brw_pages(struct brw_page **array, int num)
1738 {
1739         int stride, i, j;
1740         struct brw_page *tmp;
1741
1742         if (num == 1)
1743                 return;
1744         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1745                 ;
1746
1747         do {
1748                 stride /= 3;
1749                 for (i = stride ; i < num ; i++) {
1750                         tmp = array[i];
1751                         j = i;
1752                         while (j >= stride && array[j - stride]->off > tmp->off) {
1753                                 array[j] = array[j - stride];
1754                                 j -= stride;
1755                         }
1756                         array[j] = tmp;
1757                 }
1758         } while (stride > 1);
1759 }
1760
1761 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1762 {
1763         LASSERT(ppga != NULL);
1764         OBD_FREE(ppga, sizeof(*ppga) * count);
1765 }
1766
1767 static int brw_interpret(const struct lu_env *env,
1768                          struct ptlrpc_request *req, void *data, int rc)
1769 {
1770         struct osc_brw_async_args *aa = data;
1771         struct osc_extent *ext;
1772         struct osc_extent *tmp;
1773         struct client_obd *cli = aa->aa_cli;
1774         unsigned long           transferred = 0;
1775         ENTRY;
1776
1777         rc = osc_brw_fini_request(req, rc);
1778         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1779         /* When server return -EINPROGRESS, client should always retry
1780          * regardless of the number of times the bulk was resent already. */
1781         if (osc_recoverable_error(rc)) {
1782                 if (req->rq_import_generation !=
1783                     req->rq_import->imp_generation) {
1784                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1785                                ""DOSTID", rc = %d.\n",
1786                                req->rq_import->imp_obd->obd_name,
1787                                POSTID(&aa->aa_oa->o_oi), rc);
1788                 } else if (rc == -EINPROGRESS ||
1789                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1790                         rc = osc_brw_redo_request(req, aa, rc);
1791                 } else {
1792                         CERROR("%s: too many resent retries for object: "
1793                                "%llu:%llu, rc = %d.\n",
1794                                req->rq_import->imp_obd->obd_name,
1795                                POSTID(&aa->aa_oa->o_oi), rc);
1796                 }
1797
1798                 if (rc == 0)
1799                         RETURN(0);
1800                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1801                         rc = -EIO;
1802         }
1803
1804         if (rc == 0) {
1805                 struct obdo *oa = aa->aa_oa;
1806                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1807                 unsigned long valid = 0;
1808                 struct cl_object *obj;
1809                 struct osc_async_page *last;
1810
1811                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1812                 obj = osc2cl(last->oap_obj);
1813
1814                 cl_object_attr_lock(obj);
1815                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1816                         attr->cat_blocks = oa->o_blocks;
1817                         valid |= CAT_BLOCKS;
1818                 }
1819                 if (oa->o_valid & OBD_MD_FLMTIME) {
1820                         attr->cat_mtime = oa->o_mtime;
1821                         valid |= CAT_MTIME;
1822                 }
1823                 if (oa->o_valid & OBD_MD_FLATIME) {
1824                         attr->cat_atime = oa->o_atime;
1825                         valid |= CAT_ATIME;
1826                 }
1827                 if (oa->o_valid & OBD_MD_FLCTIME) {
1828                         attr->cat_ctime = oa->o_ctime;
1829                         valid |= CAT_CTIME;
1830                 }
1831
1832                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1833                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1834                         loff_t last_off = last->oap_count + last->oap_obj_off +
1835                                 last->oap_page_off;
1836
1837                         /* Change file size if this is an out of quota or
1838                          * direct IO write and it extends the file size */
1839                         if (loi->loi_lvb.lvb_size < last_off) {
1840                                 attr->cat_size = last_off;
1841                                 valid |= CAT_SIZE;
1842                         }
1843                         /* Extend KMS if it's not a lockless write */
1844                         if (loi->loi_kms < last_off &&
1845                             oap2osc_page(last)->ops_srvlock == 0) {
1846                                 attr->cat_kms = last_off;
1847                                 valid |= CAT_KMS;
1848                         }
1849                 }
1850
1851                 if (valid != 0)
1852                         cl_object_attr_update(env, obj, attr, valid);
1853                 cl_object_attr_unlock(obj);
1854         }
1855         OBDO_FREE(aa->aa_oa);
1856
1857         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1858                 osc_inc_unstable_pages(req);
1859
1860         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1861                 list_del_init(&ext->oe_link);
1862                 osc_extent_finish(env, ext, 1, rc);
1863         }
1864         LASSERT(list_empty(&aa->aa_exts));
1865         LASSERT(list_empty(&aa->aa_oaps));
1866
1867         transferred = (req->rq_bulk == NULL ? /* short io */
1868                        aa->aa_requested_nob :
1869                        req->rq_bulk->bd_nob_transferred);
1870
1871         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1872         ptlrpc_lprocfs_brw(req, transferred);
1873
1874         spin_lock(&cli->cl_loi_list_lock);
1875         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1876          * is called so we know whether to go to sync BRWs or wait for more
1877          * RPCs to complete */
1878         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1879                 cli->cl_w_in_flight--;
1880         else
1881                 cli->cl_r_in_flight--;
1882         osc_wake_cache_waiters(cli);
1883         spin_unlock(&cli->cl_loi_list_lock);
1884
1885         osc_io_unplug(env, cli, NULL);
1886         RETURN(rc);
1887 }
1888
1889 static void brw_commit(struct ptlrpc_request *req)
1890 {
1891         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1892          * this called via the rq_commit_cb, I need to ensure
1893          * osc_dec_unstable_pages is still called. Otherwise unstable
1894          * pages may be leaked. */
1895         spin_lock(&req->rq_lock);
1896         if (likely(req->rq_unstable)) {
1897                 req->rq_unstable = 0;
1898                 spin_unlock(&req->rq_lock);
1899
1900                 osc_dec_unstable_pages(req);
1901         } else {
1902                 req->rq_committed = 1;
1903                 spin_unlock(&req->rq_lock);
1904         }
1905 }
1906
1907 /**
1908  * Build an RPC by the list of extent @ext_list. The caller must ensure
1909  * that the total pages in this list are NOT over max pages per RPC.
1910  * Extents in the list must be in OES_RPC state.
1911  */
1912 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1913                   struct list_head *ext_list, int cmd)
1914 {
1915         struct ptlrpc_request           *req = NULL;
1916         struct osc_extent               *ext;
1917         struct brw_page                 **pga = NULL;
1918         struct osc_brw_async_args       *aa = NULL;
1919         struct obdo                     *oa = NULL;
1920         struct osc_async_page           *oap;
1921         struct osc_object               *obj = NULL;
1922         struct cl_req_attr              *crattr = NULL;
1923         loff_t                          starting_offset = OBD_OBJECT_EOF;
1924         loff_t                          ending_offset = 0;
1925         int                             mpflag = 0;
1926         int                             mem_tight = 0;
1927         int                             page_count = 0;
1928         bool                            soft_sync = false;
1929         bool                            interrupted = false;
1930         int                             i;
1931         int                             grant = 0;
1932         int                             rc;
1933         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1934         struct ost_body                 *body;
1935         ENTRY;
1936         LASSERT(!list_empty(ext_list));
1937
1938         /* add pages into rpc_list to build BRW rpc */
1939         list_for_each_entry(ext, ext_list, oe_link) {
1940                 LASSERT(ext->oe_state == OES_RPC);
1941                 mem_tight |= ext->oe_memalloc;
1942                 grant += ext->oe_grants;
1943                 page_count += ext->oe_nr_pages;
1944                 if (obj == NULL)
1945                         obj = ext->oe_obj;
1946         }
1947
1948         soft_sync = osc_over_unstable_soft_limit(cli);
1949         if (mem_tight)
1950                 mpflag = cfs_memory_pressure_get_and_set();
1951
1952         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1953         if (pga == NULL)
1954                 GOTO(out, rc = -ENOMEM);
1955
1956         OBDO_ALLOC(oa);
1957         if (oa == NULL)
1958                 GOTO(out, rc = -ENOMEM);
1959
1960         i = 0;
1961         list_for_each_entry(ext, ext_list, oe_link) {
1962                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1963                         if (mem_tight)
1964                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1965                         if (soft_sync)
1966                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1967                         pga[i] = &oap->oap_brw_page;
1968                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1969                         i++;
1970
1971                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1972                         if (starting_offset == OBD_OBJECT_EOF ||
1973                             starting_offset > oap->oap_obj_off)
1974                                 starting_offset = oap->oap_obj_off;
1975                         else
1976                                 LASSERT(oap->oap_page_off == 0);
1977                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1978                                 ending_offset = oap->oap_obj_off +
1979                                                 oap->oap_count;
1980                         else
1981                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1982                                         PAGE_SIZE);
1983                         if (oap->oap_interrupted)
1984                                 interrupted = true;
1985                 }
1986         }
1987
1988         /* first page in the list */
1989         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1990
1991         crattr = &osc_env_info(env)->oti_req_attr;
1992         memset(crattr, 0, sizeof(*crattr));
1993         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1994         crattr->cra_flags = ~0ULL;
1995         crattr->cra_page = oap2cl_page(oap);
1996         crattr->cra_oa = oa;
1997         cl_req_attr_set(env, osc2cl(obj), crattr);
1998
1999         if (cmd == OBD_BRW_WRITE)
2000                 oa->o_grant_used = grant;
2001
2002         sort_brw_pages(pga, page_count);
2003         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2004         if (rc != 0) {
2005                 CERROR("prep_req failed: %d\n", rc);
2006                 GOTO(out, rc);
2007         }
2008
2009         req->rq_commit_cb = brw_commit;
2010         req->rq_interpret_reply = brw_interpret;
2011         req->rq_memalloc = mem_tight != 0;
2012         oap->oap_request = ptlrpc_request_addref(req);
2013         if (interrupted && !req->rq_intr)
2014                 ptlrpc_mark_interrupted(req);
2015
2016         /* Need to update the timestamps after the request is built in case
2017          * we race with setattr (locally or in queue at OST).  If OST gets
2018          * later setattr before earlier BRW (as determined by the request xid),
2019          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2020          * way to do this in a single call.  bug 10150 */
2021         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2022         crattr->cra_oa = &body->oa;
2023         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2024         cl_req_attr_set(env, osc2cl(obj), crattr);
2025         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2026
2027         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2028         aa = ptlrpc_req_async_args(req);
2029         INIT_LIST_HEAD(&aa->aa_oaps);
2030         list_splice_init(&rpc_list, &aa->aa_oaps);
2031         INIT_LIST_HEAD(&aa->aa_exts);
2032         list_splice_init(ext_list, &aa->aa_exts);
2033
2034         spin_lock(&cli->cl_loi_list_lock);
2035         starting_offset >>= PAGE_SHIFT;
2036         if (cmd == OBD_BRW_READ) {
2037                 cli->cl_r_in_flight++;
2038                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2039                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2040                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2041                                       starting_offset + 1);
2042         } else {
2043                 cli->cl_w_in_flight++;
2044                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2045                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2046                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2047                                       starting_offset + 1);
2048         }
2049         spin_unlock(&cli->cl_loi_list_lock);
2050
2051         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2052                   page_count, aa, cli->cl_r_in_flight,
2053                   cli->cl_w_in_flight);
2054         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2055
2056         ptlrpcd_add_req(req);
2057         rc = 0;
2058         EXIT;
2059
2060 out:
2061         if (mem_tight != 0)
2062                 cfs_memory_pressure_restore(mpflag);
2063
2064         if (rc != 0) {
2065                 LASSERT(req == NULL);
2066
2067                 if (oa)
2068                         OBDO_FREE(oa);
2069                 if (pga)
2070                         OBD_FREE(pga, sizeof(*pga) * page_count);
2071                 /* this should happen rarely and is pretty bad, it makes the
2072                  * pending list not follow the dirty order */
2073                 while (!list_empty(ext_list)) {
2074                         ext = list_entry(ext_list->next, struct osc_extent,
2075                                          oe_link);
2076                         list_del_init(&ext->oe_link);
2077                         osc_extent_finish(env, ext, 0, rc);
2078                 }
2079         }
2080         RETURN(rc);
2081 }
2082
2083 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2084 {
2085         int set = 0;
2086
2087         LASSERT(lock != NULL);
2088
2089         lock_res_and_lock(lock);
2090
2091         if (lock->l_ast_data == NULL)
2092                 lock->l_ast_data = data;
2093         if (lock->l_ast_data == data)
2094                 set = 1;
2095
2096         unlock_res_and_lock(lock);
2097
2098         return set;
2099 }
2100
2101 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2102                      void *cookie, struct lustre_handle *lockh,
2103                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2104                      int errcode)
2105 {
2106         bool intent = *flags & LDLM_FL_HAS_INTENT;
2107         int rc;
2108         ENTRY;
2109
2110         /* The request was created before ldlm_cli_enqueue call. */
2111         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2112                 struct ldlm_reply *rep;
2113
2114                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2115                 LASSERT(rep != NULL);
2116
2117                 rep->lock_policy_res1 =
2118                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2119                 if (rep->lock_policy_res1)
2120                         errcode = rep->lock_policy_res1;
2121                 if (!speculative)
2122                         *flags |= LDLM_FL_LVB_READY;
2123         } else if (errcode == ELDLM_OK) {
2124                 *flags |= LDLM_FL_LVB_READY;
2125         }
2126
2127         /* Call the update callback. */
2128         rc = (*upcall)(cookie, lockh, errcode);
2129
2130         /* release the reference taken in ldlm_cli_enqueue() */
2131         if (errcode == ELDLM_LOCK_MATCHED)
2132                 errcode = ELDLM_OK;
2133         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2134                 ldlm_lock_decref(lockh, mode);
2135
2136         RETURN(rc);
2137 }
2138
2139 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2140                           struct osc_enqueue_args *aa, int rc)
2141 {
2142         struct ldlm_lock *lock;
2143         struct lustre_handle *lockh = &aa->oa_lockh;
2144         enum ldlm_mode mode = aa->oa_mode;
2145         struct ost_lvb *lvb = aa->oa_lvb;
2146         __u32 lvb_len = sizeof(*lvb);
2147         __u64 flags = 0;
2148
2149         ENTRY;
2150
2151         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2152          * be valid. */
2153         lock = ldlm_handle2lock(lockh);
2154         LASSERTF(lock != NULL,
2155                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2156                  lockh->cookie, req, aa);
2157
2158         /* Take an additional reference so that a blocking AST that
2159          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2160          * to arrive after an upcall has been executed by
2161          * osc_enqueue_fini(). */
2162         ldlm_lock_addref(lockh, mode);
2163
2164         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2165         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2166
2167         /* Let CP AST to grant the lock first. */
2168         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2169
2170         if (aa->oa_speculative) {
2171                 LASSERT(aa->oa_lvb == NULL);
2172                 LASSERT(aa->oa_flags == NULL);
2173                 aa->oa_flags = &flags;
2174         }
2175
2176         /* Complete obtaining the lock procedure. */
2177         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2178                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2179                                    lockh, rc);
2180         /* Complete osc stuff. */
2181         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2182                               aa->oa_flags, aa->oa_speculative, rc);
2183
2184         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2185
2186         ldlm_lock_decref(lockh, mode);
2187         LDLM_LOCK_PUT(lock);
2188         RETURN(rc);
2189 }
2190
2191 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2192
2193 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2194  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2195  * other synchronous requests, however keeping some locks and trying to obtain
2196  * others may take a considerable amount of time in a case of ost failure; and
2197  * when other sync requests do not get released lock from a client, the client
2198  * is evicted from the cluster -- such scenarious make the life difficult, so
2199  * release locks just after they are obtained. */
2200 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2201                      __u64 *flags, union ldlm_policy_data *policy,
2202                      struct ost_lvb *lvb, int kms_valid,
2203                      osc_enqueue_upcall_f upcall, void *cookie,
2204                      struct ldlm_enqueue_info *einfo,
2205                      struct ptlrpc_request_set *rqset, int async,
2206                      bool speculative)
2207 {
2208         struct obd_device *obd = exp->exp_obd;
2209         struct lustre_handle lockh = { 0 };
2210         struct ptlrpc_request *req = NULL;
2211         int intent = *flags & LDLM_FL_HAS_INTENT;
2212         __u64 match_flags = *flags;
2213         enum ldlm_mode mode;
2214         int rc;
2215         ENTRY;
2216
2217         /* Filesystem lock extents are extended to page boundaries so that
2218          * dealing with the page cache is a little smoother.  */
2219         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2220         policy->l_extent.end |= ~PAGE_MASK;
2221
2222         /*
2223          * kms is not valid when either object is completely fresh (so that no
2224          * locks are cached), or object was evicted. In the latter case cached
2225          * lock cannot be used, because it would prime inode state with
2226          * potentially stale LVB.
2227          */
2228         if (!kms_valid)
2229                 goto no_match;
2230
2231         /* Next, search for already existing extent locks that will cover us */
2232         /* If we're trying to read, we also search for an existing PW lock.  The
2233          * VFS and page cache already protect us locally, so lots of readers/
2234          * writers can share a single PW lock.
2235          *
2236          * There are problems with conversion deadlocks, so instead of
2237          * converting a read lock to a write lock, we'll just enqueue a new
2238          * one.
2239          *
2240          * At some point we should cancel the read lock instead of making them
2241          * send us a blocking callback, but there are problems with canceling
2242          * locks out from other users right now, too. */
2243         mode = einfo->ei_mode;
2244         if (einfo->ei_mode == LCK_PR)
2245                 mode |= LCK_PW;
2246         /* Normal lock requests must wait for the LVB to be ready before
2247          * matching a lock; speculative lock requests do not need to,
2248          * because they will not actually use the lock. */
2249         if (!speculative)
2250                 match_flags |= LDLM_FL_LVB_READY;
2251         if (intent != 0)
2252                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2253         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2254                                einfo->ei_type, policy, mode, &lockh, 0);
2255         if (mode) {
2256                 struct ldlm_lock *matched;
2257
2258                 if (*flags & LDLM_FL_TEST_LOCK)
2259                         RETURN(ELDLM_OK);
2260
2261                 matched = ldlm_handle2lock(&lockh);
2262                 if (speculative) {
2263                         /* This DLM lock request is speculative, and does not
2264                          * have an associated IO request. Therefore if there
2265                          * is already a DLM lock, it wll just inform the
2266                          * caller to cancel the request for this stripe.*/
2267                         lock_res_and_lock(matched);
2268                         if (ldlm_extent_equal(&policy->l_extent,
2269                             &matched->l_policy_data.l_extent))
2270                                 rc = -EEXIST;
2271                         else
2272                                 rc = -ECANCELED;
2273                         unlock_res_and_lock(matched);
2274
2275                         ldlm_lock_decref(&lockh, mode);
2276                         LDLM_LOCK_PUT(matched);
2277                         RETURN(rc);
2278                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2279                         *flags |= LDLM_FL_LVB_READY;
2280
2281                         /* We already have a lock, and it's referenced. */
2282                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2283
2284                         ldlm_lock_decref(&lockh, mode);
2285                         LDLM_LOCK_PUT(matched);
2286                         RETURN(ELDLM_OK);
2287                 } else {
2288                         ldlm_lock_decref(&lockh, mode);
2289                         LDLM_LOCK_PUT(matched);
2290                 }
2291         }
2292
2293 no_match:
2294         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2295                 RETURN(-ENOLCK);
2296
2297         if (intent) {
2298                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2299                                            &RQF_LDLM_ENQUEUE_LVB);
2300                 if (req == NULL)
2301                         RETURN(-ENOMEM);
2302
2303                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2304                 if (rc) {
2305                         ptlrpc_request_free(req);
2306                         RETURN(rc);
2307                 }
2308
2309                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2310                                      sizeof *lvb);
2311                 ptlrpc_request_set_replen(req);
2312         }
2313
2314         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2315         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2316
2317         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2318                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2319         if (async) {
2320                 if (!rc) {
2321                         struct osc_enqueue_args *aa;
2322                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2323                         aa = ptlrpc_req_async_args(req);
2324                         aa->oa_exp         = exp;
2325                         aa->oa_mode        = einfo->ei_mode;
2326                         aa->oa_type        = einfo->ei_type;
2327                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2328                         aa->oa_upcall      = upcall;
2329                         aa->oa_cookie      = cookie;
2330                         aa->oa_speculative = speculative;
2331                         if (!speculative) {
2332                                 aa->oa_flags  = flags;
2333                                 aa->oa_lvb    = lvb;
2334                         } else {
2335                                 /* speculative locks are essentially to enqueue
2336                                  * a DLM lock  in advance, so we don't care
2337                                  * about the result of the enqueue. */
2338                                 aa->oa_lvb    = NULL;
2339                                 aa->oa_flags  = NULL;
2340                         }
2341
2342                         req->rq_interpret_reply =
2343                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2344                         if (rqset == PTLRPCD_SET)
2345                                 ptlrpcd_add_req(req);
2346                         else
2347                                 ptlrpc_set_add_req(rqset, req);
2348                 } else if (intent) {
2349                         ptlrpc_req_finished(req);
2350                 }
2351                 RETURN(rc);
2352         }
2353
2354         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2355                               flags, speculative, rc);
2356         if (intent)
2357                 ptlrpc_req_finished(req);
2358
2359         RETURN(rc);
2360 }
2361
2362 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2363                    enum ldlm_type type, union ldlm_policy_data *policy,
2364                    enum ldlm_mode mode, __u64 *flags, void *data,
2365                    struct lustre_handle *lockh, int unref)
2366 {
2367         struct obd_device *obd = exp->exp_obd;
2368         __u64 lflags = *flags;
2369         enum ldlm_mode rc;
2370         ENTRY;
2371
2372         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2373                 RETURN(-EIO);
2374
2375         /* Filesystem lock extents are extended to page boundaries so that
2376          * dealing with the page cache is a little smoother */
2377         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2378         policy->l_extent.end |= ~PAGE_MASK;
2379
2380         /* Next, search for already existing extent locks that will cover us */
2381         /* If we're trying to read, we also search for an existing PW lock.  The
2382          * VFS and page cache already protect us locally, so lots of readers/
2383          * writers can share a single PW lock. */
2384         rc = mode;
2385         if (mode == LCK_PR)
2386                 rc |= LCK_PW;
2387         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2388                              res_id, type, policy, rc, lockh, unref);
2389         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2390                 RETURN(rc);
2391
2392         if (data != NULL) {
2393                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2394
2395                 LASSERT(lock != NULL);
2396                 if (!osc_set_lock_data(lock, data)) {
2397                         ldlm_lock_decref(lockh, rc);
2398                         rc = 0;
2399                 }
2400                 LDLM_LOCK_PUT(lock);
2401         }
2402         RETURN(rc);
2403 }
2404
2405 static int osc_statfs_interpret(const struct lu_env *env,
2406                                 struct ptlrpc_request *req,
2407                                 struct osc_async_args *aa, int rc)
2408 {
2409         struct obd_statfs *msfs;
2410         ENTRY;
2411
2412         if (rc == -EBADR)
2413                 /* The request has in fact never been sent
2414                  * due to issues at a higher level (LOV).
2415                  * Exit immediately since the caller is
2416                  * aware of the problem and takes care
2417                  * of the clean up */
2418                  RETURN(rc);
2419
2420         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2421             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2422                 GOTO(out, rc = 0);
2423
2424         if (rc != 0)
2425                 GOTO(out, rc);
2426
2427         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2428         if (msfs == NULL) {
2429                 GOTO(out, rc = -EPROTO);
2430         }
2431
2432         *aa->aa_oi->oi_osfs = *msfs;
2433 out:
2434         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2435         RETURN(rc);
2436 }
2437
2438 static int osc_statfs_async(struct obd_export *exp,
2439                             struct obd_info *oinfo, __u64 max_age,
2440                             struct ptlrpc_request_set *rqset)
2441 {
2442         struct obd_device     *obd = class_exp2obd(exp);
2443         struct ptlrpc_request *req;
2444         struct osc_async_args *aa;
2445         int                    rc;
2446         ENTRY;
2447
2448         /* We could possibly pass max_age in the request (as an absolute
2449          * timestamp or a "seconds.usec ago") so the target can avoid doing
2450          * extra calls into the filesystem if that isn't necessary (e.g.
2451          * during mount that would help a bit).  Having relative timestamps
2452          * is not so great if request processing is slow, while absolute
2453          * timestamps are not ideal because they need time synchronization. */
2454         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2455         if (req == NULL)
2456                 RETURN(-ENOMEM);
2457
2458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2459         if (rc) {
2460                 ptlrpc_request_free(req);
2461                 RETURN(rc);
2462         }
2463         ptlrpc_request_set_replen(req);
2464         req->rq_request_portal = OST_CREATE_PORTAL;
2465         ptlrpc_at_set_req_timeout(req);
2466
2467         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2468                 /* procfs requests not want stat in wait for avoid deadlock */
2469                 req->rq_no_resend = 1;
2470                 req->rq_no_delay = 1;
2471         }
2472
2473         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2474         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2475         aa = ptlrpc_req_async_args(req);
2476         aa->aa_oi = oinfo;
2477
2478         ptlrpc_set_add_req(rqset, req);
2479         RETURN(0);
2480 }
2481
2482 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2483                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2484 {
2485         struct obd_device     *obd = class_exp2obd(exp);
2486         struct obd_statfs     *msfs;
2487         struct ptlrpc_request *req;
2488         struct obd_import     *imp = NULL;
2489         int rc;
2490         ENTRY;
2491
2492         /*Since the request might also come from lprocfs, so we need
2493          *sync this with client_disconnect_export Bug15684*/
2494         down_read(&obd->u.cli.cl_sem);
2495         if (obd->u.cli.cl_import)
2496                 imp = class_import_get(obd->u.cli.cl_import);
2497         up_read(&obd->u.cli.cl_sem);
2498         if (!imp)
2499                 RETURN(-ENODEV);
2500
2501         /* We could possibly pass max_age in the request (as an absolute
2502          * timestamp or a "seconds.usec ago") so the target can avoid doing
2503          * extra calls into the filesystem if that isn't necessary (e.g.
2504          * during mount that would help a bit).  Having relative timestamps
2505          * is not so great if request processing is slow, while absolute
2506          * timestamps are not ideal because they need time synchronization. */
2507         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2508
2509         class_import_put(imp);
2510
2511         if (req == NULL)
2512                 RETURN(-ENOMEM);
2513
2514         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2515         if (rc) {
2516                 ptlrpc_request_free(req);
2517                 RETURN(rc);
2518         }
2519         ptlrpc_request_set_replen(req);
2520         req->rq_request_portal = OST_CREATE_PORTAL;
2521         ptlrpc_at_set_req_timeout(req);
2522
2523         if (flags & OBD_STATFS_NODELAY) {
2524                 /* procfs requests not want stat in wait for avoid deadlock */
2525                 req->rq_no_resend = 1;
2526                 req->rq_no_delay = 1;
2527         }
2528
2529         rc = ptlrpc_queue_wait(req);
2530         if (rc)
2531                 GOTO(out, rc);
2532
2533         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2534         if (msfs == NULL) {
2535                 GOTO(out, rc = -EPROTO);
2536         }
2537
2538         *osfs = *msfs;
2539
2540         EXIT;
2541  out:
2542         ptlrpc_req_finished(req);
2543         return rc;
2544 }
2545
2546 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2547                          void *karg, void __user *uarg)
2548 {
2549         struct obd_device *obd = exp->exp_obd;
2550         struct obd_ioctl_data *data = karg;
2551         int err = 0;
2552         ENTRY;
2553
2554         if (!try_module_get(THIS_MODULE)) {
2555                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2556                        module_name(THIS_MODULE));
2557                 return -EINVAL;
2558         }
2559         switch (cmd) {
2560         case OBD_IOC_CLIENT_RECOVER:
2561                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2562                                             data->ioc_inlbuf1, 0);
2563                 if (err > 0)
2564                         err = 0;
2565                 GOTO(out, err);
2566         case IOC_OSC_SET_ACTIVE:
2567                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2568                                                data->ioc_offset);
2569                 GOTO(out, err);
2570         case OBD_IOC_PING_TARGET:
2571                 err = ptlrpc_obd_ping(obd);
2572                 GOTO(out, err);
2573         default:
2574                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2575                        cmd, current_comm());
2576                 GOTO(out, err = -ENOTTY);
2577         }
2578 out:
2579         module_put(THIS_MODULE);
2580         return err;
2581 }
2582
2583 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2584                        u32 keylen, void *key, u32 vallen, void *val,
2585                        struct ptlrpc_request_set *set)
2586 {
2587         struct ptlrpc_request *req;
2588         struct obd_device     *obd = exp->exp_obd;
2589         struct obd_import     *imp = class_exp2cliimp(exp);
2590         char                  *tmp;
2591         int                    rc;
2592         ENTRY;
2593
2594         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2595
2596         if (KEY_IS(KEY_CHECKSUM)) {
2597                 if (vallen != sizeof(int))
2598                         RETURN(-EINVAL);
2599                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2600                 RETURN(0);
2601         }
2602
2603         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2604                 sptlrpc_conf_client_adapt(obd);
2605                 RETURN(0);
2606         }
2607
2608         if (KEY_IS(KEY_FLUSH_CTX)) {
2609                 sptlrpc_import_flush_my_ctx(imp);
2610                 RETURN(0);
2611         }
2612
2613         if (KEY_IS(KEY_CACHE_SET)) {
2614                 struct client_obd *cli = &obd->u.cli;
2615
2616                 LASSERT(cli->cl_cache == NULL); /* only once */
2617                 cli->cl_cache = (struct cl_client_cache *)val;
2618                 cl_cache_incref(cli->cl_cache);
2619                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2620
2621                 /* add this osc into entity list */
2622                 LASSERT(list_empty(&cli->cl_lru_osc));
2623                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2624                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2625                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2626
2627                 RETURN(0);
2628         }
2629
2630         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2631                 struct client_obd *cli = &obd->u.cli;
2632                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2633                 long target = *(long *)val;
2634
2635                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2636                 *(long *)val -= nr;
2637                 RETURN(0);
2638         }
2639
2640         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2641                 RETURN(-EINVAL);
2642
2643         /* We pass all other commands directly to OST. Since nobody calls osc
2644            methods directly and everybody is supposed to go through LOV, we
2645            assume lov checked invalid values for us.
2646            The only recognised values so far are evict_by_nid and mds_conn.
2647            Even if something bad goes through, we'd get a -EINVAL from OST
2648            anyway. */
2649
2650         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2651                                                 &RQF_OST_SET_GRANT_INFO :
2652                                                 &RQF_OBD_SET_INFO);
2653         if (req == NULL)
2654                 RETURN(-ENOMEM);
2655
2656         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2657                              RCL_CLIENT, keylen);
2658         if (!KEY_IS(KEY_GRANT_SHRINK))
2659                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2660                                      RCL_CLIENT, vallen);
2661         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2662         if (rc) {
2663                 ptlrpc_request_free(req);
2664                 RETURN(rc);
2665         }
2666
2667         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2668         memcpy(tmp, key, keylen);
2669         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2670                                                         &RMF_OST_BODY :
2671                                                         &RMF_SETINFO_VAL);
2672         memcpy(tmp, val, vallen);
2673
2674         if (KEY_IS(KEY_GRANT_SHRINK)) {
2675                 struct osc_grant_args *aa;
2676                 struct obdo *oa;
2677
2678                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2679                 aa = ptlrpc_req_async_args(req);
2680                 OBDO_ALLOC(oa);
2681                 if (!oa) {
2682                         ptlrpc_req_finished(req);
2683                         RETURN(-ENOMEM);
2684                 }
2685                 *oa = ((struct ost_body *)val)->oa;
2686                 aa->aa_oa = oa;
2687                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2688         }
2689
2690         ptlrpc_request_set_replen(req);
2691         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2692                 LASSERT(set != NULL);
2693                 ptlrpc_set_add_req(set, req);
2694                 ptlrpc_check_set(NULL, set);
2695         } else {
2696                 ptlrpcd_add_req(req);
2697         }
2698
2699         RETURN(0);
2700 }
2701 EXPORT_SYMBOL(osc_set_info_async);
2702
2703 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2704                   struct obd_device *obd, struct obd_uuid *cluuid,
2705                   struct obd_connect_data *data, void *localdata)
2706 {
2707         struct client_obd *cli = &obd->u.cli;
2708
2709         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2710                 long lost_grant;
2711                 long grant;
2712
2713                 spin_lock(&cli->cl_loi_list_lock);
2714                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2715                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2716                         grant += cli->cl_dirty_grant;
2717                 else
2718                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2719                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2720                 lost_grant = cli->cl_lost_grant;
2721                 cli->cl_lost_grant = 0;
2722                 spin_unlock(&cli->cl_loi_list_lock);
2723
2724                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2725                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2726                        data->ocd_version, data->ocd_grant, lost_grant);
2727         }
2728
2729         RETURN(0);
2730 }
2731 EXPORT_SYMBOL(osc_reconnect);
2732
2733 int osc_disconnect(struct obd_export *exp)
2734 {
2735         struct obd_device *obd = class_exp2obd(exp);
2736         int rc;
2737
2738         rc = client_disconnect_export(exp);
2739         /**
2740          * Initially we put del_shrink_grant before disconnect_export, but it
2741          * causes the following problem if setup (connect) and cleanup
2742          * (disconnect) are tangled together.
2743          *      connect p1                     disconnect p2
2744          *   ptlrpc_connect_import
2745          *     ...............               class_manual_cleanup
2746          *                                     osc_disconnect
2747          *                                     del_shrink_grant
2748          *   ptlrpc_connect_interrupt
2749          *     init_grant_shrink
2750          *   add this client to shrink list
2751          *                                      cleanup_osc
2752          * Bang! pinger trigger the shrink.
2753          * So the osc should be disconnected from the shrink list, after we
2754          * are sure the import has been destroyed. BUG18662
2755          */
2756         if (obd->u.cli.cl_import == NULL)
2757                 osc_del_shrink_grant(&obd->u.cli);
2758         return rc;
2759 }
2760 EXPORT_SYMBOL(osc_disconnect);
2761
2762 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2763                                  struct hlist_node *hnode, void *arg)
2764 {
2765         struct lu_env *env = arg;
2766         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2767         struct ldlm_lock *lock;
2768         struct osc_object *osc = NULL;
2769         ENTRY;
2770
2771         lock_res(res);
2772         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2773                 if (lock->l_ast_data != NULL && osc == NULL) {
2774                         osc = lock->l_ast_data;
2775                         cl_object_get(osc2cl(osc));
2776                 }
2777
2778                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2779                  * by the 2nd round of ldlm_namespace_clean() call in
2780                  * osc_import_event(). */
2781                 ldlm_clear_cleaned(lock);
2782         }
2783         unlock_res(res);
2784
2785         if (osc != NULL) {
2786                 osc_object_invalidate(env, osc);
2787                 cl_object_put(env, osc2cl(osc));
2788         }
2789
2790         RETURN(0);
2791 }
2792 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2793
2794 static int osc_import_event(struct obd_device *obd,
2795                             struct obd_import *imp,
2796                             enum obd_import_event event)
2797 {
2798         struct client_obd *cli;
2799         int rc = 0;
2800
2801         ENTRY;
2802         LASSERT(imp->imp_obd == obd);
2803
2804         switch (event) {
2805         case IMP_EVENT_DISCON: {
2806                 cli = &obd->u.cli;
2807                 spin_lock(&cli->cl_loi_list_lock);
2808                 cli->cl_avail_grant = 0;
2809                 cli->cl_lost_grant = 0;
2810                 spin_unlock(&cli->cl_loi_list_lock);
2811                 break;
2812         }
2813         case IMP_EVENT_INACTIVE: {
2814                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2815                 break;
2816         }
2817         case IMP_EVENT_INVALIDATE: {
2818                 struct ldlm_namespace *ns = obd->obd_namespace;
2819                 struct lu_env         *env;
2820                 __u16                  refcheck;
2821
2822                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2823
2824                 env = cl_env_get(&refcheck);
2825                 if (!IS_ERR(env)) {
2826                         osc_io_unplug(env, &obd->u.cli, NULL);
2827
2828                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2829                                                  osc_ldlm_resource_invalidate,
2830                                                  env, 0);
2831                         cl_env_put(env, &refcheck);
2832
2833                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2834                 } else
2835                         rc = PTR_ERR(env);
2836                 break;
2837         }
2838         case IMP_EVENT_ACTIVE: {
2839                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2840                 break;
2841         }
2842         case IMP_EVENT_OCD: {
2843                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2844
2845                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2846                         osc_init_grant(&obd->u.cli, ocd);
2847
2848                 /* See bug 7198 */
2849                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2850                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2851
2852                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2853                 break;
2854         }
2855         case IMP_EVENT_DEACTIVATE: {
2856                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2857                 break;
2858         }
2859         case IMP_EVENT_ACTIVATE: {
2860                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2861                 break;
2862         }
2863         default:
2864                 CERROR("Unknown import event %d\n", event);
2865                 LBUG();
2866         }
2867         RETURN(rc);
2868 }
2869
2870 /**
2871  * Determine whether the lock can be canceled before replaying the lock
2872  * during recovery, see bug16774 for detailed information.
2873  *
2874  * \retval zero the lock can't be canceled
2875  * \retval other ok to cancel
2876  */
2877 static int osc_cancel_weight(struct ldlm_lock *lock)
2878 {
2879         /*
2880          * Cancel all unused and granted extent lock.
2881          */
2882         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2883             lock->l_granted_mode == lock->l_req_mode &&
2884             osc_ldlm_weigh_ast(lock) == 0)
2885                 RETURN(1);
2886
2887         RETURN(0);
2888 }
2889
2890 static int brw_queue_work(const struct lu_env *env, void *data)
2891 {
2892         struct client_obd *cli = data;
2893
2894         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2895
2896         osc_io_unplug(env, cli, NULL);
2897         RETURN(0);
2898 }
2899
2900 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2901 {
2902         struct client_obd *cli = &obd->u.cli;
2903         void *handler;
2904         int rc;
2905
2906         ENTRY;
2907
2908         rc = ptlrpcd_addref();
2909         if (rc)
2910                 RETURN(rc);
2911
2912         rc = client_obd_setup(obd, lcfg);
2913         if (rc)
2914                 GOTO(out_ptlrpcd, rc);
2915
2916
2917         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2918         if (IS_ERR(handler))
2919                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2920         cli->cl_writeback_work = handler;
2921
2922         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2923         if (IS_ERR(handler))
2924                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2925         cli->cl_lru_work = handler;
2926
2927         rc = osc_quota_setup(obd);
2928         if (rc)
2929                 GOTO(out_ptlrpcd_work, rc);
2930
2931         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2932
2933         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2934         RETURN(rc);
2935
2936 out_ptlrpcd_work:
2937         if (cli->cl_writeback_work != NULL) {
2938                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2939                 cli->cl_writeback_work = NULL;
2940         }
2941         if (cli->cl_lru_work != NULL) {
2942                 ptlrpcd_destroy_work(cli->cl_lru_work);
2943                 cli->cl_lru_work = NULL;
2944         }
2945         client_obd_cleanup(obd);
2946 out_ptlrpcd:
2947         ptlrpcd_decref();
2948         RETURN(rc);
2949 }
2950 EXPORT_SYMBOL(osc_setup_common);
2951
2952 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2953 {
2954         struct client_obd *cli = &obd->u.cli;
2955         struct obd_type   *type;
2956         int                adding;
2957         int                added;
2958         int                req_count;
2959         int                rc;
2960
2961         ENTRY;
2962
2963         rc = osc_setup_common(obd, lcfg);
2964         if (rc < 0)
2965                 RETURN(rc);
2966
2967 #ifdef CONFIG_PROC_FS
2968         obd->obd_vars = lprocfs_osc_obd_vars;
2969 #endif
2970         /* If this is true then both client (osc) and server (osp) are on the
2971          * same node. The osp layer if loaded first will register the osc proc
2972          * directory. In that case this obd_device will be attached its proc
2973          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2974          */
2975         type = class_search_type(LUSTRE_OSP_NAME);
2976         if (type && type->typ_procsym) {
2977                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2978                                                        type->typ_procsym,
2979                                                        obd->obd_vars, obd);
2980                 if (IS_ERR(obd->obd_proc_entry)) {
2981                         rc = PTR_ERR(obd->obd_proc_entry);
2982                         CERROR("error %d setting up lprocfs for %s\n", rc,
2983                                obd->obd_name);
2984                         obd->obd_proc_entry = NULL;
2985                 }
2986         }
2987
2988         rc = lprocfs_obd_setup(obd, false);
2989         if (!rc) {
2990                 /* If the basic OSC proc tree construction succeeded then
2991                  * lets do the rest.
2992                  */
2993                 lproc_osc_attach_seqstat(obd);
2994                 sptlrpc_lprocfs_cliobd_attach(obd);
2995                 ptlrpc_lprocfs_register_obd(obd);
2996         }
2997
2998         /*
2999          * We try to control the total number of requests with a upper limit
3000          * osc_reqpool_maxreqcount. There might be some race which will cause
3001          * over-limit allocation, but it is fine.
3002          */
3003         req_count = atomic_read(&osc_pool_req_count);
3004         if (req_count < osc_reqpool_maxreqcount) {
3005                 adding = cli->cl_max_rpcs_in_flight + 2;
3006                 if (req_count + adding > osc_reqpool_maxreqcount)
3007                         adding = osc_reqpool_maxreqcount - req_count;
3008
3009                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3010                 atomic_add(added, &osc_pool_req_count);
3011         }
3012
3013         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3014         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3015
3016         spin_lock(&osc_shrink_lock);
3017         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3018         spin_unlock(&osc_shrink_lock);
3019
3020         RETURN(0);
3021 }
3022
3023 int osc_precleanup_common(struct obd_device *obd)
3024 {
3025         struct client_obd *cli = &obd->u.cli;
3026         ENTRY;
3027
3028         /* LU-464
3029          * for echo client, export may be on zombie list, wait for
3030          * zombie thread to cull it, because cli.cl_import will be
3031          * cleared in client_disconnect_export():
3032          *   class_export_destroy() -> obd_cleanup() ->
3033          *   echo_device_free() -> echo_client_cleanup() ->
3034          *   obd_disconnect() -> osc_disconnect() ->
3035          *   client_disconnect_export()
3036          */
3037         obd_zombie_barrier();
3038         if (cli->cl_writeback_work) {
3039                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3040                 cli->cl_writeback_work = NULL;
3041         }
3042
3043         if (cli->cl_lru_work) {
3044                 ptlrpcd_destroy_work(cli->cl_lru_work);
3045                 cli->cl_lru_work = NULL;
3046         }
3047
3048         obd_cleanup_client_import(obd);
3049         RETURN(0);
3050 }
3051 EXPORT_SYMBOL(osc_precleanup_common);
3052
3053 static int osc_precleanup(struct obd_device *obd)
3054 {
3055         ENTRY;
3056
3057         osc_precleanup_common(obd);
3058
3059         ptlrpc_lprocfs_unregister_obd(obd);
3060         lprocfs_obd_cleanup(obd);
3061         RETURN(0);
3062 }
3063
3064 int osc_cleanup_common(struct obd_device *obd)
3065 {
3066         struct client_obd *cli = &obd->u.cli;
3067         int rc;
3068
3069         ENTRY;
3070
3071         spin_lock(&osc_shrink_lock);
3072         list_del(&cli->cl_shrink_list);
3073         spin_unlock(&osc_shrink_lock);
3074
3075         /* lru cleanup */
3076         if (cli->cl_cache != NULL) {
3077                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3078                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3079                 list_del_init(&cli->cl_lru_osc);
3080                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3081                 cli->cl_lru_left = NULL;
3082                 cl_cache_decref(cli->cl_cache);
3083                 cli->cl_cache = NULL;
3084         }
3085
3086         /* free memory of osc quota cache */
3087         osc_quota_cleanup(obd);
3088
3089         rc = client_obd_cleanup(obd);
3090
3091         ptlrpcd_decref();
3092         RETURN(rc);
3093 }
3094 EXPORT_SYMBOL(osc_cleanup_common);
3095
3096 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3097 {
3098         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3099         return rc > 0 ? 0: rc;
3100 }
3101
3102 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3103 {
3104         return osc_process_config_base(obd, buf);
3105 }
3106
3107 static struct obd_ops osc_obd_ops = {
3108         .o_owner                = THIS_MODULE,
3109         .o_setup                = osc_setup,
3110         .o_precleanup           = osc_precleanup,
3111         .o_cleanup              = osc_cleanup_common,
3112         .o_add_conn             = client_import_add_conn,
3113         .o_del_conn             = client_import_del_conn,
3114         .o_connect              = client_connect_import,
3115         .o_reconnect            = osc_reconnect,
3116         .o_disconnect           = osc_disconnect,
3117         .o_statfs               = osc_statfs,
3118         .o_statfs_async         = osc_statfs_async,
3119         .o_create               = osc_create,
3120         .o_destroy              = osc_destroy,
3121         .o_getattr              = osc_getattr,
3122         .o_setattr              = osc_setattr,
3123         .o_iocontrol            = osc_iocontrol,
3124         .o_set_info_async       = osc_set_info_async,
3125         .o_import_event         = osc_import_event,
3126         .o_process_config       = osc_process_config,
3127         .o_quotactl             = osc_quotactl,
3128 };
3129
3130 static struct shrinker *osc_cache_shrinker;
3131 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3132 DEFINE_SPINLOCK(osc_shrink_lock);
3133
3134 #ifndef HAVE_SHRINKER_COUNT
3135 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3136 {
3137         struct shrink_control scv = {
3138                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3139                 .gfp_mask   = shrink_param(sc, gfp_mask)
3140         };
3141 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3142         struct shrinker *shrinker = NULL;
3143 #endif
3144
3145         (void)osc_cache_shrink_scan(shrinker, &scv);
3146
3147         return osc_cache_shrink_count(shrinker, &scv);
3148 }
3149 #endif
3150
3151 static int __init osc_init(void)
3152 {
3153         bool enable_proc = true;
3154         struct obd_type *type;
3155         unsigned int reqpool_size;
3156         unsigned int reqsize;
3157         int rc;
3158         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3159                          osc_cache_shrink_count, osc_cache_shrink_scan);
3160         ENTRY;
3161
3162         /* print an address of _any_ initialized kernel symbol from this
3163          * module, to allow debugging with gdb that doesn't support data
3164          * symbols from modules.*/
3165         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3166
3167         rc = lu_kmem_init(osc_caches);
3168         if (rc)
3169                 RETURN(rc);
3170
3171         type = class_search_type(LUSTRE_OSP_NAME);
3172         if (type != NULL && type->typ_procsym != NULL)
3173                 enable_proc = false;
3174
3175         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3176                                  LUSTRE_OSC_NAME, &osc_device_type);
3177         if (rc)
3178                 GOTO(out_kmem, rc);
3179
3180         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3181
3182         /* This is obviously too much memory, only prevent overflow here */
3183         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3184                 GOTO(out_type, rc = -EINVAL);
3185
3186         reqpool_size = osc_reqpool_mem_max << 20;
3187
3188         reqsize = 1;
3189         while (reqsize < OST_IO_MAXREQSIZE)
3190                 reqsize = reqsize << 1;
3191
3192         /*
3193          * We don't enlarge the request count in OSC pool according to
3194          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3195          * tried after normal allocation failed. So a small OSC pool won't
3196          * cause much performance degression in most of cases.
3197          */
3198         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3199
3200         atomic_set(&osc_pool_req_count, 0);
3201         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3202                                           ptlrpc_add_rqs_to_pool);
3203
3204         if (osc_rq_pool != NULL)
3205                 GOTO(out, rc);
3206         rc = -ENOMEM;
3207 out_type:
3208         class_unregister_type(LUSTRE_OSC_NAME);
3209 out_kmem:
3210         lu_kmem_fini(osc_caches);
3211 out:
3212         RETURN(rc);
3213 }
3214
3215 static void __exit osc_exit(void)
3216 {
3217         remove_shrinker(osc_cache_shrinker);
3218         class_unregister_type(LUSTRE_OSC_NAME);
3219         lu_kmem_fini(osc_caches);
3220         ptlrpc_free_rq_pool(osc_rq_pool);
3221 }
3222
3223 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3224 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3225 MODULE_VERSION(LUSTRE_VERSION_STRING);
3226 MODULE_LICENSE("GPL");
3227
3228 module_init(osc_init);
3229 module_exit(osc_exit);