Whamcloud - gitweb
LU-1757 brw: Fix short i/o and enable for mdc
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 #define osc_grant_args osc_brw_async_args
62
63 struct osc_setattr_args {
64         struct obdo             *sa_oa;
65         obd_enqueue_update_f     sa_upcall;
66         void                    *sa_cookie;
67 };
68
69 struct osc_fsync_args {
70         struct osc_object       *fa_obj;
71         struct obdo             *fa_oa;
72         obd_enqueue_update_f    fa_upcall;
73         void                    *fa_cookie;
74 };
75
76 struct osc_ladvise_args {
77         struct obdo             *la_oa;
78         obd_enqueue_update_f     la_upcall;
79         void                    *la_cookie;
80 };
81
82 static void osc_release_ppga(struct brw_page **ppga, size_t count);
83 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
84                          void *data, int rc);
85
86 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
87 {
88         struct ost_body *body;
89
90         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
91         LASSERT(body);
92
93         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
94 }
95
96 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
97                        struct obdo *oa)
98 {
99         struct ptlrpc_request   *req;
100         struct ost_body         *body;
101         int                      rc;
102
103         ENTRY;
104         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
105         if (req == NULL)
106                 RETURN(-ENOMEM);
107
108         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
109         if (rc) {
110                 ptlrpc_request_free(req);
111                 RETURN(rc);
112         }
113
114         osc_pack_req_body(req, oa);
115
116         ptlrpc_request_set_replen(req);
117
118         rc = ptlrpc_queue_wait(req);
119         if (rc)
120                 GOTO(out, rc);
121
122         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
123         if (body == NULL)
124                 GOTO(out, rc = -EPROTO);
125
126         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
127         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
128
129         oa->o_blksize = cli_brw_size(exp->exp_obd);
130         oa->o_valid |= OBD_MD_FLBLKSZ;
131
132         EXIT;
133 out:
134         ptlrpc_req_finished(req);
135
136         return rc;
137 }
138
139 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
140                        struct obdo *oa)
141 {
142         struct ptlrpc_request   *req;
143         struct ost_body         *body;
144         int                      rc;
145
146         ENTRY;
147         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
148
149         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
150         if (req == NULL)
151                 RETURN(-ENOMEM);
152
153         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
154         if (rc) {
155                 ptlrpc_request_free(req);
156                 RETURN(rc);
157         }
158
159         osc_pack_req_body(req, oa);
160
161         ptlrpc_request_set_replen(req);
162
163         rc = ptlrpc_queue_wait(req);
164         if (rc)
165                 GOTO(out, rc);
166
167         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
168         if (body == NULL)
169                 GOTO(out, rc = -EPROTO);
170
171         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
172
173         EXIT;
174 out:
175         ptlrpc_req_finished(req);
176
177         RETURN(rc);
178 }
179
180 static int osc_setattr_interpret(const struct lu_env *env,
181                                  struct ptlrpc_request *req,
182                                  struct osc_setattr_args *sa, int rc)
183 {
184         struct ost_body *body;
185         ENTRY;
186
187         if (rc != 0)
188                 GOTO(out, rc);
189
190         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191         if (body == NULL)
192                 GOTO(out, rc = -EPROTO);
193
194         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
195                              &body->oa);
196 out:
197         rc = sa->sa_upcall(sa->sa_cookie, rc);
198         RETURN(rc);
199 }
200
201 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
202                       obd_enqueue_update_f upcall, void *cookie,
203                       struct ptlrpc_request_set *rqset)
204 {
205         struct ptlrpc_request   *req;
206         struct osc_setattr_args *sa;
207         int                      rc;
208
209         ENTRY;
210
211         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
212         if (req == NULL)
213                 RETURN(-ENOMEM);
214
215         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
216         if (rc) {
217                 ptlrpc_request_free(req);
218                 RETURN(rc);
219         }
220
221         osc_pack_req_body(req, oa);
222
223         ptlrpc_request_set_replen(req);
224
225         /* do mds to ost setattr asynchronously */
226         if (!rqset) {
227                 /* Do not wait for response. */
228                 ptlrpcd_add_req(req);
229         } else {
230                 req->rq_interpret_reply =
231                         (ptlrpc_interpterer_t)osc_setattr_interpret;
232
233                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
234                 sa = ptlrpc_req_async_args(req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
323         la = ptlrpc_req_async_args(req);
324         la->la_oa = oa;
325         la->la_upcall = upcall;
326         la->la_cookie = cookie;
327
328         if (rqset == PTLRPCD_SET)
329                 ptlrpcd_add_req(req);
330         else
331                 ptlrpc_set_add_req(rqset, req);
332
333         RETURN(0);
334 }
335
336 static int osc_create(const struct lu_env *env, struct obd_export *exp,
337                       struct obdo *oa)
338 {
339         struct ptlrpc_request *req;
340         struct ost_body       *body;
341         int                    rc;
342         ENTRY;
343
344         LASSERT(oa != NULL);
345         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
346         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
347
348         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
349         if (req == NULL)
350                 GOTO(out, rc = -ENOMEM);
351
352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
353         if (rc) {
354                 ptlrpc_request_free(req);
355                 GOTO(out, rc);
356         }
357
358         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359         LASSERT(body);
360
361         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
362
363         ptlrpc_request_set_replen(req);
364
365         rc = ptlrpc_queue_wait(req);
366         if (rc)
367                 GOTO(out_req, rc);
368
369         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
370         if (body == NULL)
371                 GOTO(out_req, rc = -EPROTO);
372
373         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
375
376         oa->o_blksize = cli_brw_size(exp->exp_obd);
377         oa->o_valid |= OBD_MD_FLBLKSZ;
378
379         CDEBUG(D_HA, "transno: %lld\n",
380                lustre_msg_get_transno(req->rq_repmsg));
381 out_req:
382         ptlrpc_req_finished(req);
383 out:
384         RETURN(rc);
385 }
386
387 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
388                    obd_enqueue_update_f upcall, void *cookie)
389 {
390         struct ptlrpc_request *req;
391         struct osc_setattr_args *sa;
392         struct obd_import *imp = class_exp2cliimp(exp);
393         struct ost_body *body;
394         int rc;
395
396         ENTRY;
397
398         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
399         if (req == NULL)
400                 RETURN(-ENOMEM);
401
402         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
403         if (rc < 0) {
404                 ptlrpc_request_free(req);
405                 RETURN(rc);
406         }
407
408         osc_set_io_portal(req);
409
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413
414         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
415
416         ptlrpc_request_set_replen(req);
417
418         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
419         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
420         sa = ptlrpc_req_async_args(req);
421         sa->sa_oa = oa;
422         sa->sa_upcall = upcall;
423         sa->sa_cookie = cookie;
424
425         ptlrpcd_add_req(req);
426
427         RETURN(0);
428 }
429 EXPORT_SYMBOL(osc_punch_send);
430
431 static int osc_sync_interpret(const struct lu_env *env,
432                               struct ptlrpc_request *req,
433                               void *arg, int rc)
434 {
435         struct osc_fsync_args   *fa = arg;
436         struct ost_body         *body;
437         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
438         unsigned long           valid = 0;
439         struct cl_object        *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *data,
551                                  int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676
677                 nrpages = cli->cl_max_pages_per_rpc;
678                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
679                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
680                 oa->o_undirty = nrpages << PAGE_SHIFT;
681                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
682                                  GRANT_PARAM)) {
683                         int nrextents;
684
685                         /* take extent tax into account when asking for more
686                          * grant space */
687                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
688                                      cli->cl_max_extent_pages;
689                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
690                 }
691         }
692         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
693         oa->o_dropped = cli->cl_lost_grant;
694         cli->cl_lost_grant = 0;
695         spin_unlock(&cli->cl_loi_list_lock);
696         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
697                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
698 }
699
700 void osc_update_next_shrink(struct client_obd *cli)
701 {
702         cli->cl_next_shrink_grant = ktime_get_seconds() +
703                                     cli->cl_grant_shrink_interval;
704
705         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
706                cli->cl_next_shrink_grant);
707 }
708
709 static void __osc_update_grant(struct client_obd *cli, u64 grant)
710 {
711         spin_lock(&cli->cl_loi_list_lock);
712         cli->cl_avail_grant += grant;
713         spin_unlock(&cli->cl_loi_list_lock);
714 }
715
716 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
717 {
718         if (body->oa.o_valid & OBD_MD_FLGRANT) {
719                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
720                 __osc_update_grant(cli, body->oa.o_grant);
721         }
722 }
723
724 static int osc_shrink_grant_interpret(const struct lu_env *env,
725                                       struct ptlrpc_request *req,
726                                       void *aa, int rc)
727 {
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
730         struct ost_body *body;
731
732         if (rc != 0) {
733                 __osc_update_grant(cli, oa->o_grant);
734                 GOTO(out, rc);
735         }
736
737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
738         LASSERT(body);
739         osc_update_grant(cli, body);
740 out:
741         OBDO_FREE(oa);
742         return rc;
743 }
744
745 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
746 {
747         spin_lock(&cli->cl_loi_list_lock);
748         oa->o_grant = cli->cl_avail_grant / 4;
749         cli->cl_avail_grant -= oa->o_grant;
750         spin_unlock(&cli->cl_loi_list_lock);
751         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
752                 oa->o_valid |= OBD_MD_FLFLAGS;
753                 oa->o_flags = 0;
754         }
755         oa->o_flags |= OBD_FL_SHRINK_GRANT;
756         osc_update_next_shrink(cli);
757 }
758
759 /* Shrink the current grant, either from some large amount to enough for a
760  * full set of in-flight RPCs, or if we have already shrunk to that limit
761  * then to enough for a single RPC.  This avoids keeping more grant than
762  * needed, and avoids shrinking the grant piecemeal. */
763 static int osc_shrink_grant(struct client_obd *cli)
764 {
765         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
766                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
767
768         spin_lock(&cli->cl_loi_list_lock);
769         if (cli->cl_avail_grant <= target_bytes)
770                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
771         spin_unlock(&cli->cl_loi_list_lock);
772
773         return osc_shrink_grant_to_target(cli, target_bytes);
774 }
775
776 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
777 {
778         int                     rc = 0;
779         struct ost_body        *body;
780         ENTRY;
781
782         spin_lock(&cli->cl_loi_list_lock);
783         /* Don't shrink if we are already above or below the desired limit
784          * We don't want to shrink below a single RPC, as that will negatively
785          * impact block allocation and long-term performance. */
786         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788
789         if (target_bytes >= cli->cl_avail_grant) {
790                 spin_unlock(&cli->cl_loi_list_lock);
791                 RETURN(0);
792         }
793         spin_unlock(&cli->cl_loi_list_lock);
794
795         OBD_ALLOC_PTR(body);
796         if (!body)
797                 RETURN(-ENOMEM);
798
799         osc_announce_cached(cli, &body->oa, 0);
800
801         spin_lock(&cli->cl_loi_list_lock);
802         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
803         cli->cl_avail_grant = target_bytes;
804         spin_unlock(&cli->cl_loi_list_lock);
805         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
806                 body->oa.o_valid |= OBD_MD_FLFLAGS;
807                 body->oa.o_flags = 0;
808         }
809         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
810         osc_update_next_shrink(cli);
811
812         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
813                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
814                                 sizeof(*body), body, NULL);
815         if (rc != 0)
816                 __osc_update_grant(cli, body->oa.o_grant);
817         OBD_FREE_PTR(body);
818         RETURN(rc);
819 }
820
821 static int osc_should_shrink_grant(struct client_obd *client)
822 {
823         time64_t next_shrink = client->cl_next_shrink_grant;
824
825         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
826              OBD_CONNECT_GRANT_SHRINK) == 0)
827                 return 0;
828
829         if (ktime_get_seconds() >= next_shrink - 5) {
830                 /* Get the current RPC size directly, instead of going via:
831                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
832                  * Keep comment here so that it can be found by searching. */
833                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
834
835                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
836                     client->cl_avail_grant > brw_size)
837                         return 1;
838                 else
839                         osc_update_next_shrink(client);
840         }
841         return 0;
842 }
843
844 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
845 {
846         struct client_obd *client;
847
848         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
849                 if (osc_should_shrink_grant(client))
850                         osc_shrink_grant(client);
851         }
852         return 0;
853 }
854
855 static int osc_add_shrink_grant(struct client_obd *client)
856 {
857         int rc;
858
859         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
860                                        TIMEOUT_GRANT,
861                                        osc_grant_shrink_grant_cb, NULL,
862                                        &client->cl_grant_shrink_list);
863         if (rc) {
864                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
865                 return rc;
866         }
867         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
868         osc_update_next_shrink(client);
869         return 0;
870 }
871
872 static int osc_del_shrink_grant(struct client_obd *client)
873 {
874         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
875                                          TIMEOUT_GRANT);
876 }
877
878 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
879 {
880         /*
881          * ocd_grant is the total grant amount we're expect to hold: if we've
882          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
883          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
884          * dirty.
885          *
886          * race is tolerable here: if we're evicted, but imp_state already
887          * left EVICTED state, then cl_dirty_pages must be 0 already.
888          */
889         spin_lock(&cli->cl_loi_list_lock);
890         cli->cl_avail_grant = ocd->ocd_grant;
891         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
892                 cli->cl_avail_grant -= cli->cl_reserved_grant;
893                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
894                         cli->cl_avail_grant -= cli->cl_dirty_grant;
895                 else
896                         cli->cl_avail_grant -=
897                                         cli->cl_dirty_pages << PAGE_SHIFT;
898         }
899
900         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
901                 u64 size;
902                 int chunk_mask;
903
904                 /* overhead for each extent insertion */
905                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
906                 /* determine the appropriate chunk size used by osc_extent. */
907                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
908                                           ocd->ocd_grant_blkbits);
909                 /* max_pages_per_rpc must be chunk aligned */
910                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
911                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
912                                              ~chunk_mask) & chunk_mask;
913                 /* determine maximum extent size, in #pages */
914                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
915                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
916                 if (cli->cl_max_extent_pages == 0)
917                         cli->cl_max_extent_pages = 1;
918         } else {
919                 cli->cl_grant_extent_tax = 0;
920                 cli->cl_chunkbits = PAGE_SHIFT;
921                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
922         }
923         spin_unlock(&cli->cl_loi_list_lock);
924
925         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
926                 "chunk bits: %d cl_max_extent_pages: %d\n",
927                 cli_name(cli),
928                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
929                 cli->cl_max_extent_pages);
930
931         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
932             list_empty(&cli->cl_grant_shrink_list))
933                 osc_add_shrink_grant(cli);
934 }
935 EXPORT_SYMBOL(osc_init_grant);
936
937 /* We assume that the reason this OSC got a short read is because it read
938  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
939  * via the LOV, and it _knows_ it's reading inside the file, it's just that
940  * this stripe never got written at or beyond this stripe offset yet. */
941 static void handle_short_read(int nob_read, size_t page_count,
942                               struct brw_page **pga)
943 {
944         char *ptr;
945         int i = 0;
946
947         /* skip bytes read OK */
948         while (nob_read > 0) {
949                 LASSERT (page_count > 0);
950
951                 if (pga[i]->count > nob_read) {
952                         /* EOF inside this page */
953                         ptr = kmap(pga[i]->pg) +
954                                 (pga[i]->off & ~PAGE_MASK);
955                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
956                         kunmap(pga[i]->pg);
957                         page_count--;
958                         i++;
959                         break;
960                 }
961
962                 nob_read -= pga[i]->count;
963                 page_count--;
964                 i++;
965         }
966
967         /* zero remaining pages */
968         while (page_count-- > 0) {
969                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
970                 memset(ptr, 0, pga[i]->count);
971                 kunmap(pga[i]->pg);
972                 i++;
973         }
974 }
975
976 static int check_write_rcs(struct ptlrpc_request *req,
977                            int requested_nob, int niocount,
978                            size_t page_count, struct brw_page **pga)
979 {
980         int     i;
981         __u32   *remote_rcs;
982
983         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
984                                                   sizeof(*remote_rcs) *
985                                                   niocount);
986         if (remote_rcs == NULL) {
987                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
988                 return(-EPROTO);
989         }
990
991         /* return error if any niobuf was in error */
992         for (i = 0; i < niocount; i++) {
993                 if ((int)remote_rcs[i] < 0)
994                         return(remote_rcs[i]);
995
996                 if (remote_rcs[i] != 0) {
997                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
998                                 i, remote_rcs[i], req);
999                         return(-EPROTO);
1000                 }
1001         }
1002         if (req->rq_bulk != NULL &&
1003             req->rq_bulk->bd_nob_transferred != requested_nob) {
1004                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1005                        req->rq_bulk->bd_nob_transferred, requested_nob);
1006                 return(-EPROTO);
1007         }
1008
1009         return (0);
1010 }
1011
1012 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1013 {
1014         if (p1->flag != p2->flag) {
1015                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1016                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1017                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1018
1019                 /* warn if we try to combine flags that we don't know to be
1020                  * safe to combine */
1021                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1022                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1023                               "report this at https://jira.hpdd.intel.com/\n",
1024                               p1->flag, p2->flag);
1025                 }
1026                 return 0;
1027         }
1028
1029         return (p1->off + p1->count == p2->off);
1030 }
1031
1032 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1033                              struct brw_page **pga, int opc,
1034                              enum cksum_types cksum_type)
1035 {
1036         u32                             cksum;
1037         int                             i = 0;
1038         struct cfs_crypto_hash_desc     *hdesc;
1039         unsigned int                    bufsize;
1040         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1041
1042         LASSERT(pg_count > 0);
1043
1044         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1045         if (IS_ERR(hdesc)) {
1046                 CERROR("Unable to initialize checksum hash %s\n",
1047                        cfs_crypto_hash_name(cfs_alg));
1048                 return PTR_ERR(hdesc);
1049         }
1050
1051         while (nob > 0 && pg_count > 0) {
1052                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1053
1054                 /* corrupt the data before we compute the checksum, to
1055                  * simulate an OST->client data error */
1056                 if (i == 0 && opc == OST_READ &&
1057                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1058                         unsigned char *ptr = kmap(pga[i]->pg);
1059                         int off = pga[i]->off & ~PAGE_MASK;
1060
1061                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1062                         kunmap(pga[i]->pg);
1063                 }
1064                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1065                                             pga[i]->off & ~PAGE_MASK,
1066                                             count);
1067                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1068                                (int)(pga[i]->off & ~PAGE_MASK));
1069
1070                 nob -= pga[i]->count;
1071                 pg_count--;
1072                 i++;
1073         }
1074
1075         bufsize = sizeof(cksum);
1076         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1077
1078         /* For sending we only compute the wrong checksum instead
1079          * of corrupting the data so it is still correct on a redo */
1080         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1081                 cksum++;
1082
1083         return cksum;
1084 }
1085
1086 static int
1087 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1088                      u32 page_count, struct brw_page **pga,
1089                      struct ptlrpc_request **reqp, int resend)
1090 {
1091         struct ptlrpc_request   *req;
1092         struct ptlrpc_bulk_desc *desc;
1093         struct ost_body         *body;
1094         struct obd_ioobj        *ioobj;
1095         struct niobuf_remote    *niobuf;
1096         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1097         struct osc_brw_async_args *aa;
1098         struct req_capsule      *pill;
1099         struct brw_page *pg_prev;
1100         void *short_io_buf;
1101
1102         ENTRY;
1103         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1104                 RETURN(-ENOMEM); /* Recoverable */
1105         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1106                 RETURN(-EINVAL); /* Fatal */
1107
1108         if ((cmd & OBD_BRW_WRITE) != 0) {
1109                 opc = OST_WRITE;
1110                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1111                                                 osc_rq_pool,
1112                                                 &RQF_OST_BRW_WRITE);
1113         } else {
1114                 opc = OST_READ;
1115                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1116         }
1117         if (req == NULL)
1118                 RETURN(-ENOMEM);
1119
1120         for (niocount = i = 1; i < page_count; i++) {
1121                 if (!can_merge_pages(pga[i - 1], pga[i]))
1122                         niocount++;
1123         }
1124
1125         pill = &req->rq_pill;
1126         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1127                              sizeof(*ioobj));
1128         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1129                              niocount * sizeof(*niobuf));
1130
1131         for (i = 0; i < page_count; i++)
1132                 short_io_size += pga[i]->count;
1133
1134         /* Check if we can do a short io. */
1135         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1136             imp_connect_shortio(cli->cl_import)))
1137                 short_io_size = 0;
1138
1139         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1140                              opc == OST_READ ? 0 : short_io_size);
1141         if (opc == OST_READ)
1142                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1143                                      short_io_size);
1144
1145         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1146         if (rc) {
1147                 ptlrpc_request_free(req);
1148                 RETURN(rc);
1149         }
1150         osc_set_io_portal(req);
1151
1152         ptlrpc_at_set_req_timeout(req);
1153         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1154          * retry logic */
1155         req->rq_no_retry_einprogress = 1;
1156
1157         if (short_io_size != 0) {
1158                 desc = NULL;
1159                 short_io_buf = NULL;
1160                 goto no_bulk;
1161         }
1162
1163         desc = ptlrpc_prep_bulk_imp(req, page_count,
1164                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1165                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1166                         PTLRPC_BULK_PUT_SINK) |
1167                         PTLRPC_BULK_BUF_KIOV,
1168                 OST_BULK_PORTAL,
1169                 &ptlrpc_bulk_kiov_pin_ops);
1170
1171         if (desc == NULL)
1172                 GOTO(out, rc = -ENOMEM);
1173         /* NB request now owns desc and will free it when it gets freed */
1174 no_bulk:
1175         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1176         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1177         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1178         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1179
1180         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1181
1182         obdo_to_ioobj(oa, ioobj);
1183         ioobj->ioo_bufcnt = niocount;
1184         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1185          * that might be send for this request.  The actual number is decided
1186          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1187          * "max - 1" for old client compatibility sending "0", and also so the
1188          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1189         if (desc != NULL)
1190                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1191         else /* short io */
1192                 ioobj_max_brw_set(ioobj, 0);
1193
1194         if (short_io_size != 0) {
1195                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1196                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1197                         body->oa.o_flags = 0;
1198                 }
1199                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1200                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1201                        short_io_size);
1202                 if (opc == OST_WRITE) {
1203                         short_io_buf = req_capsule_client_get(pill,
1204                                                               &RMF_SHORT_IO);
1205                         LASSERT(short_io_buf != NULL);
1206                 }
1207         }
1208
1209         LASSERT(page_count > 0);
1210         pg_prev = pga[0];
1211         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1212                 struct brw_page *pg = pga[i];
1213                 int poff = pg->off & ~PAGE_MASK;
1214
1215                 LASSERT(pg->count > 0);
1216                 /* make sure there is no gap in the middle of page array */
1217                 LASSERTF(page_count == 1 ||
1218                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1219                           ergo(i > 0 && i < page_count - 1,
1220                                poff == 0 && pg->count == PAGE_SIZE)   &&
1221                           ergo(i == page_count - 1, poff == 0)),
1222                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1223                          i, page_count, pg, pg->off, pg->count);
1224                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1225                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1226                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1227                          i, page_count,
1228                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1229                          pg_prev->pg, page_private(pg_prev->pg),
1230                          pg_prev->pg->index, pg_prev->off);
1231                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1232                         (pg->flag & OBD_BRW_SRVLOCK));
1233                 if (short_io_size != 0 && opc == OST_WRITE) {
1234                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1235
1236                         LASSERT(short_io_size >= requested_nob + pg->count);
1237                         memcpy(short_io_buf + requested_nob,
1238                                ptr + poff,
1239                                pg->count);
1240                         ll_kunmap_atomic(ptr, KM_USER0);
1241                 } else if (short_io_size == 0) {
1242                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1243                                                          pg->count);
1244                 }
1245                 requested_nob += pg->count;
1246
1247                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1248                         niobuf--;
1249                         niobuf->rnb_len += pg->count;
1250                 } else {
1251                         niobuf->rnb_offset = pg->off;
1252                         niobuf->rnb_len    = pg->count;
1253                         niobuf->rnb_flags  = pg->flag;
1254                 }
1255                 pg_prev = pg;
1256         }
1257
1258         LASSERTF((void *)(niobuf - niocount) ==
1259                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1260                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1261                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1262
1263         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1264         if (resend) {
1265                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1267                         body->oa.o_flags = 0;
1268                 }
1269                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1270         }
1271
1272         if (osc_should_shrink_grant(cli))
1273                 osc_shrink_grant_local(cli, &body->oa);
1274
1275         /* size[REQ_REC_OFF] still sizeof (*body) */
1276         if (opc == OST_WRITE) {
1277                 if (cli->cl_checksum &&
1278                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1279                         /* store cl_cksum_type in a local variable since
1280                          * it can be changed via lprocfs */
1281                         enum cksum_types cksum_type = cli->cl_cksum_type;
1282
1283                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284                                 body->oa.o_flags = 0;
1285
1286                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1287                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1288                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1289                                                              page_count, pga,
1290                                                              OST_WRITE,
1291                                                              cksum_type);
1292                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1293                                body->oa.o_cksum);
1294                         /* save this in 'oa', too, for later checking */
1295                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                         oa->o_flags |= cksum_type_pack(cksum_type);
1297                 } else {
1298                         /* clear out the checksum flag, in case this is a
1299                          * resend but cl_checksum is no longer set. b=11238 */
1300                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1301                 }
1302                 oa->o_cksum = body->oa.o_cksum;
1303                 /* 1 RC per niobuf */
1304                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1305                                      sizeof(__u32) * niocount);
1306         } else {
1307                 if (cli->cl_checksum &&
1308                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1309                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1310                                 body->oa.o_flags = 0;
1311                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1312                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1313                 }
1314
1315                 /* Client cksum has been already copied to wire obdo in previous
1316                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1317                  * resent due to cksum error, this will allow Server to
1318                  * check+dump pages on its side */
1319         }
1320         ptlrpc_request_set_replen(req);
1321
1322         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1323         aa = ptlrpc_req_async_args(req);
1324         aa->aa_oa = oa;
1325         aa->aa_requested_nob = requested_nob;
1326         aa->aa_nio_count = niocount;
1327         aa->aa_page_count = page_count;
1328         aa->aa_resends = 0;
1329         aa->aa_ppga = pga;
1330         aa->aa_cli = cli;
1331         INIT_LIST_HEAD(&aa->aa_oaps);
1332
1333         *reqp = req;
1334         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1335         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1336                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1337                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1338         RETURN(0);
1339
1340  out:
1341         ptlrpc_req_finished(req);
1342         RETURN(rc);
1343 }
1344
1345 char dbgcksum_file_name[PATH_MAX];
1346
1347 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1348                                 struct brw_page **pga, __u32 server_cksum,
1349                                 __u32 client_cksum)
1350 {
1351         struct file *filp;
1352         int rc, i;
1353         unsigned int len;
1354         char *buf;
1355         mm_segment_t oldfs;
1356
1357         /* will only keep dump of pages on first error for the same range in
1358          * file/fid, not during the resends/retries. */
1359         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1360                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1361                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1362                   libcfs_debug_file_path_arr :
1363                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1364                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1365                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1366                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1367                  pga[0]->off,
1368                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1369                  client_cksum, server_cksum);
1370         filp = filp_open(dbgcksum_file_name,
1371                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1372         if (IS_ERR(filp)) {
1373                 rc = PTR_ERR(filp);
1374                 if (rc == -EEXIST)
1375                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1376                                "checksum error: rc = %d\n", dbgcksum_file_name,
1377                                rc);
1378                 else
1379                         CERROR("%s: can't open to dump pages with checksum "
1380                                "error: rc = %d\n", dbgcksum_file_name, rc);
1381                 return;
1382         }
1383
1384         oldfs = get_fs();
1385         set_fs(KERNEL_DS);
1386         for (i = 0; i < page_count; i++) {
1387                 len = pga[i]->count;
1388                 buf = kmap(pga[i]->pg);
1389                 while (len != 0) {
1390                         rc = vfs_write(filp, (__force const char __user *)buf,
1391                                        len, &filp->f_pos);
1392                         if (rc < 0) {
1393                                 CERROR("%s: wanted to write %u but got %d "
1394                                        "error\n", dbgcksum_file_name, len, rc);
1395                                 break;
1396                         }
1397                         len -= rc;
1398                         buf += rc;
1399                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1400                                dbgcksum_file_name, rc);
1401                 }
1402                 kunmap(pga[i]->pg);
1403         }
1404         set_fs(oldfs);
1405
1406         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1407         if (rc)
1408                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1409         filp_close(filp, NULL);
1410         return;
1411 }
1412
1413 static int
1414 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1415                                 __u32 client_cksum, __u32 server_cksum,
1416                                 struct osc_brw_async_args *aa)
1417 {
1418         __u32 new_cksum;
1419         char *msg;
1420         enum cksum_types cksum_type;
1421
1422         if (server_cksum == client_cksum) {
1423                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1424                 return 0;
1425         }
1426
1427         if (aa->aa_cli->cl_checksum_dump)
1428                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1429                                     server_cksum, client_cksum);
1430
1431         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1432                                        oa->o_flags : 0);
1433         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1434                                       aa->aa_ppga, OST_WRITE, cksum_type);
1435
1436         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1437                 msg = "the server did not use the checksum type specified in "
1438                       "the original request - likely a protocol problem";
1439         else if (new_cksum == server_cksum)
1440                 msg = "changed on the client after we checksummed it - "
1441                       "likely false positive due to mmap IO (bug 11742)";
1442         else if (new_cksum == client_cksum)
1443                 msg = "changed in transit before arrival at OST";
1444         else
1445                 msg = "changed in transit AND doesn't match the original - "
1446                       "likely false positive due to mmap IO (bug 11742)";
1447
1448         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1449                            DFID " object "DOSTID" extent [%llu-%llu], original "
1450                            "client csum %x (type %x), server csum %x (type %x),"
1451                            " client csum now %x\n",
1452                            aa->aa_cli->cl_import->imp_obd->obd_name,
1453                            msg, libcfs_nid2str(peer->nid),
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1456                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1457                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1458                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1459                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1460                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1461                            server_cksum, cksum_type, new_cksum);
1462         return 1;
1463 }
1464
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1467 {
1468         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469         const struct lnet_process_id *peer =
1470                         &req->rq_import->imp_connection->c_peer;
1471         struct client_obd *cli = aa->aa_cli;
1472         struct ost_body *body;
1473         u32 client_cksum = 0;
1474         ENTRY;
1475
1476         if (rc < 0 && rc != -EDQUOT) {
1477                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1478                 RETURN(rc);
1479         }
1480
1481         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1482         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1483         if (body == NULL) {
1484                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1485                 RETURN(-EPROTO);
1486         }
1487
1488         /* set/clear over quota flag for a uid/gid/projid */
1489         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1490             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1491                 unsigned qid[LL_MAXQUOTAS] = {
1492                                          body->oa.o_uid, body->oa.o_gid,
1493                                          body->oa.o_projid };
1494                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1495                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1496                        body->oa.o_valid, body->oa.o_flags);
1497                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1498                                        body->oa.o_flags);
1499         }
1500
1501         osc_update_grant(cli, body);
1502
1503         if (rc < 0)
1504                 RETURN(rc);
1505
1506         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         RETURN(-EPROTO);
1513                 }
1514
1515                 if (req->rq_bulk != NULL &&
1516                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         RETURN(-EAGAIN);
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa))
1522                         RETURN(-EAGAIN);
1523
1524                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1525                                      aa->aa_page_count, aa->aa_ppga);
1526                 GOTO(out, rc);
1527         }
1528
1529         /* The rest of this function executes only for OST_READs */
1530
1531         if (req->rq_bulk == NULL) {
1532                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1533                                           RCL_SERVER);
1534                 LASSERT(rc == req->rq_status);
1535         } else {
1536                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1537                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1538         }
1539         if (rc < 0)
1540                 GOTO(out, rc = -EAGAIN);
1541
1542         if (rc > aa->aa_requested_nob) {
1543                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1544                        aa->aa_requested_nob);
1545                 RETURN(-EPROTO);
1546         }
1547
1548         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1549                 CERROR ("Unexpected rc %d (%d transferred)\n",
1550                         rc, req->rq_bulk->bd_nob_transferred);
1551                 return (-EPROTO);
1552         }
1553
1554         if (req->rq_bulk == NULL) {
1555                 /* short io */
1556                 int nob, pg_count, i = 0;
1557                 unsigned char *buf;
1558
1559                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1560                 pg_count = aa->aa_page_count;
1561                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1562                                                    rc);
1563                 nob = rc;
1564                 while (nob > 0 && pg_count > 0) {
1565                         unsigned char *ptr;
1566                         int count = aa->aa_ppga[i]->count > nob ?
1567                                     nob : aa->aa_ppga[i]->count;
1568
1569                         CDEBUG(D_CACHE, "page %p count %d\n",
1570                                aa->aa_ppga[i]->pg, count);
1571                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1572                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1573                                count);
1574                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1575
1576                         buf += count;
1577                         nob -= count;
1578                         i++;
1579                         pg_count--;
1580                 }
1581         }
1582
1583         if (rc < aa->aa_requested_nob)
1584                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1585
1586         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1587                 static int cksum_counter;
1588                 u32        server_cksum = body->oa.o_cksum;
1589                 char      *via = "";
1590                 char      *router = "";
1591                 enum cksum_types cksum_type;
1592
1593                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1594                                                body->oa.o_flags : 0);
1595                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1596                                                  aa->aa_ppga, OST_READ,
1597                                                  cksum_type);
1598
1599                 if (req->rq_bulk != NULL &&
1600                     peer->nid != req->rq_bulk->bd_sender) {
1601                         via = " via ";
1602                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1603                 }
1604
1605                 if (server_cksum != client_cksum) {
1606                         struct ost_body *clbody;
1607                         u32 page_count = aa->aa_page_count;
1608
1609                         clbody = req_capsule_client_get(&req->rq_pill,
1610                                                         &RMF_OST_BODY);
1611                         if (cli->cl_checksum_dump)
1612                                 dump_all_bulk_pages(&clbody->oa, page_count,
1613                                                     aa->aa_ppga, server_cksum,
1614                                                     client_cksum);
1615
1616                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1617                                            "%s%s%s inode "DFID" object "DOSTID
1618                                            " extent [%llu-%llu], client %x, "
1619                                            "server %x, cksum_type %x\n",
1620                                            req->rq_import->imp_obd->obd_name,
1621                                            libcfs_nid2str(peer->nid),
1622                                            via, router,
1623                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1624                                                 clbody->oa.o_parent_seq : 0ULL,
1625                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1626                                                 clbody->oa.o_parent_oid : 0,
1627                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1628                                                 clbody->oa.o_parent_ver : 0,
1629                                            POSTID(&body->oa.o_oi),
1630                                            aa->aa_ppga[0]->off,
1631                                            aa->aa_ppga[page_count-1]->off +
1632                                            aa->aa_ppga[page_count-1]->count - 1,
1633                                            client_cksum, server_cksum,
1634                                            cksum_type);
1635                         cksum_counter = 0;
1636                         aa->aa_oa->o_cksum = client_cksum;
1637                         rc = -EAGAIN;
1638                 } else {
1639                         cksum_counter++;
1640                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1641                         rc = 0;
1642                 }
1643         } else if (unlikely(client_cksum)) {
1644                 static int cksum_missed;
1645
1646                 cksum_missed++;
1647                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1648                         CERROR("Checksum %u requested from %s but not sent\n",
1649                                cksum_missed, libcfs_nid2str(peer->nid));
1650         } else {
1651                 rc = 0;
1652         }
1653 out:
1654         if (rc >= 0)
1655                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1656                                      aa->aa_oa, &body->oa);
1657
1658         RETURN(rc);
1659 }
1660
1661 static int osc_brw_redo_request(struct ptlrpc_request *request,
1662                                 struct osc_brw_async_args *aa, int rc)
1663 {
1664         struct ptlrpc_request *new_req;
1665         struct osc_brw_async_args *new_aa;
1666         struct osc_async_page *oap;
1667         ENTRY;
1668
1669         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1670                   "redo for recoverable error %d", rc);
1671
1672         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1673                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1674                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1675                                   aa->aa_ppga, &new_req, 1);
1676         if (rc)
1677                 RETURN(rc);
1678
1679         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request != NULL) {
1681                         LASSERTF(request == oap->oap_request,
1682                                  "request %p != oap_request %p\n",
1683                                  request, oap->oap_request);
1684                         if (oap->oap_interrupted) {
1685                                 ptlrpc_req_finished(new_req);
1686                                 RETURN(-EINTR);
1687                         }
1688                 }
1689         }
1690         /* New request takes over pga and oaps from old request.
1691          * Note that copying a list_head doesn't work, need to move it... */
1692         aa->aa_resends++;
1693         new_req->rq_interpret_reply = request->rq_interpret_reply;
1694         new_req->rq_async_args = request->rq_async_args;
1695         new_req->rq_commit_cb = request->rq_commit_cb;
1696         /* cap resend delay to the current request timeout, this is similar to
1697          * what ptlrpc does (see after_reply()) */
1698         if (aa->aa_resends > new_req->rq_timeout)
1699                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1700         else
1701                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1702         new_req->rq_generation_set = 1;
1703         new_req->rq_import_generation = request->rq_import_generation;
1704
1705         new_aa = ptlrpc_req_async_args(new_req);
1706
1707         INIT_LIST_HEAD(&new_aa->aa_oaps);
1708         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1709         INIT_LIST_HEAD(&new_aa->aa_exts);
1710         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1711         new_aa->aa_resends = aa->aa_resends;
1712
1713         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1714                 if (oap->oap_request) {
1715                         ptlrpc_req_finished(oap->oap_request);
1716                         oap->oap_request = ptlrpc_request_addref(new_req);
1717                 }
1718         }
1719
1720         /* XXX: This code will run into problem if we're going to support
1721          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1722          * and wait for all of them to be finished. We should inherit request
1723          * set from old request. */
1724         ptlrpcd_add_req(new_req);
1725
1726         DEBUG_REQ(D_INFO, new_req, "new request");
1727         RETURN(0);
1728 }
1729
1730 /*
1731  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1732  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1733  * fine for our small page arrays and doesn't require allocation.  its an
1734  * insertion sort that swaps elements that are strides apart, shrinking the
1735  * stride down until its '1' and the array is sorted.
1736  */
1737 static void sort_brw_pages(struct brw_page **array, int num)
1738 {
1739         int stride, i, j;
1740         struct brw_page *tmp;
1741
1742         if (num == 1)
1743                 return;
1744         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1745                 ;
1746
1747         do {
1748                 stride /= 3;
1749                 for (i = stride ; i < num ; i++) {
1750                         tmp = array[i];
1751                         j = i;
1752                         while (j >= stride && array[j - stride]->off > tmp->off) {
1753                                 array[j] = array[j - stride];
1754                                 j -= stride;
1755                         }
1756                         array[j] = tmp;
1757                 }
1758         } while (stride > 1);
1759 }
1760
1761 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1762 {
1763         LASSERT(ppga != NULL);
1764         OBD_FREE(ppga, sizeof(*ppga) * count);
1765 }
1766
1767 static int brw_interpret(const struct lu_env *env,
1768                          struct ptlrpc_request *req, void *data, int rc)
1769 {
1770         struct osc_brw_async_args *aa = data;
1771         struct osc_extent *ext;
1772         struct osc_extent *tmp;
1773         struct client_obd *cli = aa->aa_cli;
1774         unsigned long           transferred = 0;
1775         ENTRY;
1776
1777         rc = osc_brw_fini_request(req, rc);
1778         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1779         /* When server return -EINPROGRESS, client should always retry
1780          * regardless of the number of times the bulk was resent already. */
1781         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1782                 if (req->rq_import_generation !=
1783                     req->rq_import->imp_generation) {
1784                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1785                                ""DOSTID", rc = %d.\n",
1786                                req->rq_import->imp_obd->obd_name,
1787                                POSTID(&aa->aa_oa->o_oi), rc);
1788                 } else if (rc == -EINPROGRESS ||
1789                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1790                         rc = osc_brw_redo_request(req, aa, rc);
1791                 } else {
1792                         CERROR("%s: too many resent retries for object: "
1793                                "%llu:%llu, rc = %d.\n",
1794                                req->rq_import->imp_obd->obd_name,
1795                                POSTID(&aa->aa_oa->o_oi), rc);
1796                 }
1797
1798                 if (rc == 0)
1799                         RETURN(0);
1800                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1801                         rc = -EIO;
1802         }
1803
1804         if (rc == 0) {
1805                 struct obdo *oa = aa->aa_oa;
1806                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1807                 unsigned long valid = 0;
1808                 struct cl_object *obj;
1809                 struct osc_async_page *last;
1810
1811                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1812                 obj = osc2cl(last->oap_obj);
1813
1814                 cl_object_attr_lock(obj);
1815                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1816                         attr->cat_blocks = oa->o_blocks;
1817                         valid |= CAT_BLOCKS;
1818                 }
1819                 if (oa->o_valid & OBD_MD_FLMTIME) {
1820                         attr->cat_mtime = oa->o_mtime;
1821                         valid |= CAT_MTIME;
1822                 }
1823                 if (oa->o_valid & OBD_MD_FLATIME) {
1824                         attr->cat_atime = oa->o_atime;
1825                         valid |= CAT_ATIME;
1826                 }
1827                 if (oa->o_valid & OBD_MD_FLCTIME) {
1828                         attr->cat_ctime = oa->o_ctime;
1829                         valid |= CAT_CTIME;
1830                 }
1831
1832                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1833                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1834                         loff_t last_off = last->oap_count + last->oap_obj_off +
1835                                 last->oap_page_off;
1836
1837                         /* Change file size if this is an out of quota or
1838                          * direct IO write and it extends the file size */
1839                         if (loi->loi_lvb.lvb_size < last_off) {
1840                                 attr->cat_size = last_off;
1841                                 valid |= CAT_SIZE;
1842                         }
1843                         /* Extend KMS if it's not a lockless write */
1844                         if (loi->loi_kms < last_off &&
1845                             oap2osc_page(last)->ops_srvlock == 0) {
1846                                 attr->cat_kms = last_off;
1847                                 valid |= CAT_KMS;
1848                         }
1849                 }
1850
1851                 if (valid != 0)
1852                         cl_object_attr_update(env, obj, attr, valid);
1853                 cl_object_attr_unlock(obj);
1854         }
1855         OBDO_FREE(aa->aa_oa);
1856
1857         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1858                 osc_inc_unstable_pages(req);
1859
1860         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1861                 list_del_init(&ext->oe_link);
1862                 osc_extent_finish(env, ext, 1,
1863                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
1864         }
1865         LASSERT(list_empty(&aa->aa_exts));
1866         LASSERT(list_empty(&aa->aa_oaps));
1867
1868         transferred = (req->rq_bulk == NULL ? /* short io */
1869                        aa->aa_requested_nob :
1870                        req->rq_bulk->bd_nob_transferred);
1871
1872         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1873         ptlrpc_lprocfs_brw(req, transferred);
1874
1875         spin_lock(&cli->cl_loi_list_lock);
1876         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1877          * is called so we know whether to go to sync BRWs or wait for more
1878          * RPCs to complete */
1879         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1880                 cli->cl_w_in_flight--;
1881         else
1882                 cli->cl_r_in_flight--;
1883         osc_wake_cache_waiters(cli);
1884         spin_unlock(&cli->cl_loi_list_lock);
1885
1886         osc_io_unplug(env, cli, NULL);
1887         RETURN(rc);
1888 }
1889
1890 static void brw_commit(struct ptlrpc_request *req)
1891 {
1892         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1893          * this called via the rq_commit_cb, I need to ensure
1894          * osc_dec_unstable_pages is still called. Otherwise unstable
1895          * pages may be leaked. */
1896         spin_lock(&req->rq_lock);
1897         if (likely(req->rq_unstable)) {
1898                 req->rq_unstable = 0;
1899                 spin_unlock(&req->rq_lock);
1900
1901                 osc_dec_unstable_pages(req);
1902         } else {
1903                 req->rq_committed = 1;
1904                 spin_unlock(&req->rq_lock);
1905         }
1906 }
1907
1908 /**
1909  * Build an RPC by the list of extent @ext_list. The caller must ensure
1910  * that the total pages in this list are NOT over max pages per RPC.
1911  * Extents in the list must be in OES_RPC state.
1912  */
1913 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1914                   struct list_head *ext_list, int cmd)
1915 {
1916         struct ptlrpc_request           *req = NULL;
1917         struct osc_extent               *ext;
1918         struct brw_page                 **pga = NULL;
1919         struct osc_brw_async_args       *aa = NULL;
1920         struct obdo                     *oa = NULL;
1921         struct osc_async_page           *oap;
1922         struct osc_object               *obj = NULL;
1923         struct cl_req_attr              *crattr = NULL;
1924         loff_t                          starting_offset = OBD_OBJECT_EOF;
1925         loff_t                          ending_offset = 0;
1926         int                             mpflag = 0;
1927         int                             mem_tight = 0;
1928         int                             page_count = 0;
1929         bool                            soft_sync = false;
1930         bool                            interrupted = false;
1931         bool                            ndelay = false;
1932         int                             i;
1933         int                             grant = 0;
1934         int                             rc;
1935         __u32                           layout_version = 0;
1936         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1937         struct ost_body                 *body;
1938         ENTRY;
1939         LASSERT(!list_empty(ext_list));
1940
1941         /* add pages into rpc_list to build BRW rpc */
1942         list_for_each_entry(ext, ext_list, oe_link) {
1943                 LASSERT(ext->oe_state == OES_RPC);
1944                 mem_tight |= ext->oe_memalloc;
1945                 grant += ext->oe_grants;
1946                 page_count += ext->oe_nr_pages;
1947                 layout_version = MAX(layout_version, ext->oe_layout_version);
1948                 if (obj == NULL)
1949                         obj = ext->oe_obj;
1950         }
1951
1952         soft_sync = osc_over_unstable_soft_limit(cli);
1953         if (mem_tight)
1954                 mpflag = cfs_memory_pressure_get_and_set();
1955
1956         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1957         if (pga == NULL)
1958                 GOTO(out, rc = -ENOMEM);
1959
1960         OBDO_ALLOC(oa);
1961         if (oa == NULL)
1962                 GOTO(out, rc = -ENOMEM);
1963
1964         i = 0;
1965         list_for_each_entry(ext, ext_list, oe_link) {
1966                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1967                         if (mem_tight)
1968                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1969                         if (soft_sync)
1970                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1971                         pga[i] = &oap->oap_brw_page;
1972                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1973                         i++;
1974
1975                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1976                         if (starting_offset == OBD_OBJECT_EOF ||
1977                             starting_offset > oap->oap_obj_off)
1978                                 starting_offset = oap->oap_obj_off;
1979                         else
1980                                 LASSERT(oap->oap_page_off == 0);
1981                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1982                                 ending_offset = oap->oap_obj_off +
1983                                                 oap->oap_count;
1984                         else
1985                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1986                                         PAGE_SIZE);
1987                         if (oap->oap_interrupted)
1988                                 interrupted = true;
1989                 }
1990                 if (ext->oe_ndelay)
1991                         ndelay = true;
1992         }
1993
1994         /* first page in the list */
1995         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1996
1997         crattr = &osc_env_info(env)->oti_req_attr;
1998         memset(crattr, 0, sizeof(*crattr));
1999         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2000         crattr->cra_flags = ~0ULL;
2001         crattr->cra_page = oap2cl_page(oap);
2002         crattr->cra_oa = oa;
2003         cl_req_attr_set(env, osc2cl(obj), crattr);
2004
2005         if (cmd == OBD_BRW_WRITE) {
2006                 oa->o_grant_used = grant;
2007                 if (layout_version > 0) {
2008                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2009                                PFID(&oa->o_oi.oi_fid), layout_version);
2010
2011                         oa->o_layout_version = layout_version;
2012                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2013                 }
2014         }
2015
2016         sort_brw_pages(pga, page_count);
2017         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2018         if (rc != 0) {
2019                 CERROR("prep_req failed: %d\n", rc);
2020                 GOTO(out, rc);
2021         }
2022
2023         req->rq_commit_cb = brw_commit;
2024         req->rq_interpret_reply = brw_interpret;
2025         req->rq_memalloc = mem_tight != 0;
2026         oap->oap_request = ptlrpc_request_addref(req);
2027         if (interrupted && !req->rq_intr)
2028                 ptlrpc_mark_interrupted(req);
2029         if (ndelay) {
2030                 req->rq_no_resend = req->rq_no_delay = 1;
2031                 /* probably set a shorter timeout value.
2032                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2033                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2034         }
2035
2036         /* Need to update the timestamps after the request is built in case
2037          * we race with setattr (locally or in queue at OST).  If OST gets
2038          * later setattr before earlier BRW (as determined by the request xid),
2039          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2040          * way to do this in a single call.  bug 10150 */
2041         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2042         crattr->cra_oa = &body->oa;
2043         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2044         cl_req_attr_set(env, osc2cl(obj), crattr);
2045         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2046
2047         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2048         aa = ptlrpc_req_async_args(req);
2049         INIT_LIST_HEAD(&aa->aa_oaps);
2050         list_splice_init(&rpc_list, &aa->aa_oaps);
2051         INIT_LIST_HEAD(&aa->aa_exts);
2052         list_splice_init(ext_list, &aa->aa_exts);
2053
2054         spin_lock(&cli->cl_loi_list_lock);
2055         starting_offset >>= PAGE_SHIFT;
2056         if (cmd == OBD_BRW_READ) {
2057                 cli->cl_r_in_flight++;
2058                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2059                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2060                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2061                                       starting_offset + 1);
2062         } else {
2063                 cli->cl_w_in_flight++;
2064                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2065                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2066                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2067                                       starting_offset + 1);
2068         }
2069         spin_unlock(&cli->cl_loi_list_lock);
2070
2071         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2072                   page_count, aa, cli->cl_r_in_flight,
2073                   cli->cl_w_in_flight);
2074         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2075
2076         ptlrpcd_add_req(req);
2077         rc = 0;
2078         EXIT;
2079
2080 out:
2081         if (mem_tight != 0)
2082                 cfs_memory_pressure_restore(mpflag);
2083
2084         if (rc != 0) {
2085                 LASSERT(req == NULL);
2086
2087                 if (oa)
2088                         OBDO_FREE(oa);
2089                 if (pga)
2090                         OBD_FREE(pga, sizeof(*pga) * page_count);
2091                 /* this should happen rarely and is pretty bad, it makes the
2092                  * pending list not follow the dirty order */
2093                 while (!list_empty(ext_list)) {
2094                         ext = list_entry(ext_list->next, struct osc_extent,
2095                                          oe_link);
2096                         list_del_init(&ext->oe_link);
2097                         osc_extent_finish(env, ext, 0, rc);
2098                 }
2099         }
2100         RETURN(rc);
2101 }
2102
2103 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2104 {
2105         int set = 0;
2106
2107         LASSERT(lock != NULL);
2108
2109         lock_res_and_lock(lock);
2110
2111         if (lock->l_ast_data == NULL)
2112                 lock->l_ast_data = data;
2113         if (lock->l_ast_data == data)
2114                 set = 1;
2115
2116         unlock_res_and_lock(lock);
2117
2118         return set;
2119 }
2120
2121 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2122                      void *cookie, struct lustre_handle *lockh,
2123                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2124                      int errcode)
2125 {
2126         bool intent = *flags & LDLM_FL_HAS_INTENT;
2127         int rc;
2128         ENTRY;
2129
2130         /* The request was created before ldlm_cli_enqueue call. */
2131         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2132                 struct ldlm_reply *rep;
2133
2134                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2135                 LASSERT(rep != NULL);
2136
2137                 rep->lock_policy_res1 =
2138                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2139                 if (rep->lock_policy_res1)
2140                         errcode = rep->lock_policy_res1;
2141                 if (!speculative)
2142                         *flags |= LDLM_FL_LVB_READY;
2143         } else if (errcode == ELDLM_OK) {
2144                 *flags |= LDLM_FL_LVB_READY;
2145         }
2146
2147         /* Call the update callback. */
2148         rc = (*upcall)(cookie, lockh, errcode);
2149
2150         /* release the reference taken in ldlm_cli_enqueue() */
2151         if (errcode == ELDLM_LOCK_MATCHED)
2152                 errcode = ELDLM_OK;
2153         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2154                 ldlm_lock_decref(lockh, mode);
2155
2156         RETURN(rc);
2157 }
2158
2159 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2160                           struct osc_enqueue_args *aa, int rc)
2161 {
2162         struct ldlm_lock *lock;
2163         struct lustre_handle *lockh = &aa->oa_lockh;
2164         enum ldlm_mode mode = aa->oa_mode;
2165         struct ost_lvb *lvb = aa->oa_lvb;
2166         __u32 lvb_len = sizeof(*lvb);
2167         __u64 flags = 0;
2168
2169         ENTRY;
2170
2171         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2172          * be valid. */
2173         lock = ldlm_handle2lock(lockh);
2174         LASSERTF(lock != NULL,
2175                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2176                  lockh->cookie, req, aa);
2177
2178         /* Take an additional reference so that a blocking AST that
2179          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2180          * to arrive after an upcall has been executed by
2181          * osc_enqueue_fini(). */
2182         ldlm_lock_addref(lockh, mode);
2183
2184         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2185         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2186
2187         /* Let CP AST to grant the lock first. */
2188         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2189
2190         if (aa->oa_speculative) {
2191                 LASSERT(aa->oa_lvb == NULL);
2192                 LASSERT(aa->oa_flags == NULL);
2193                 aa->oa_flags = &flags;
2194         }
2195
2196         /* Complete obtaining the lock procedure. */
2197         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2198                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2199                                    lockh, rc);
2200         /* Complete osc stuff. */
2201         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2202                               aa->oa_flags, aa->oa_speculative, rc);
2203
2204         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2205
2206         ldlm_lock_decref(lockh, mode);
2207         LDLM_LOCK_PUT(lock);
2208         RETURN(rc);
2209 }
2210
2211 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2212
2213 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2214  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2215  * other synchronous requests, however keeping some locks and trying to obtain
2216  * others may take a considerable amount of time in a case of ost failure; and
2217  * when other sync requests do not get released lock from a client, the client
2218  * is evicted from the cluster -- such scenarious make the life difficult, so
2219  * release locks just after they are obtained. */
2220 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2221                      __u64 *flags, union ldlm_policy_data *policy,
2222                      struct ost_lvb *lvb, int kms_valid,
2223                      osc_enqueue_upcall_f upcall, void *cookie,
2224                      struct ldlm_enqueue_info *einfo,
2225                      struct ptlrpc_request_set *rqset, int async,
2226                      bool speculative)
2227 {
2228         struct obd_device *obd = exp->exp_obd;
2229         struct lustre_handle lockh = { 0 };
2230         struct ptlrpc_request *req = NULL;
2231         int intent = *flags & LDLM_FL_HAS_INTENT;
2232         __u64 match_flags = *flags;
2233         enum ldlm_mode mode;
2234         int rc;
2235         ENTRY;
2236
2237         /* Filesystem lock extents are extended to page boundaries so that
2238          * dealing with the page cache is a little smoother.  */
2239         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2240         policy->l_extent.end |= ~PAGE_MASK;
2241
2242         /*
2243          * kms is not valid when either object is completely fresh (so that no
2244          * locks are cached), or object was evicted. In the latter case cached
2245          * lock cannot be used, because it would prime inode state with
2246          * potentially stale LVB.
2247          */
2248         if (!kms_valid)
2249                 goto no_match;
2250
2251         /* Next, search for already existing extent locks that will cover us */
2252         /* If we're trying to read, we also search for an existing PW lock.  The
2253          * VFS and page cache already protect us locally, so lots of readers/
2254          * writers can share a single PW lock.
2255          *
2256          * There are problems with conversion deadlocks, so instead of
2257          * converting a read lock to a write lock, we'll just enqueue a new
2258          * one.
2259          *
2260          * At some point we should cancel the read lock instead of making them
2261          * send us a blocking callback, but there are problems with canceling
2262          * locks out from other users right now, too. */
2263         mode = einfo->ei_mode;
2264         if (einfo->ei_mode == LCK_PR)
2265                 mode |= LCK_PW;
2266         /* Normal lock requests must wait for the LVB to be ready before
2267          * matching a lock; speculative lock requests do not need to,
2268          * because they will not actually use the lock. */
2269         if (!speculative)
2270                 match_flags |= LDLM_FL_LVB_READY;
2271         if (intent != 0)
2272                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2273         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2274                                einfo->ei_type, policy, mode, &lockh, 0);
2275         if (mode) {
2276                 struct ldlm_lock *matched;
2277
2278                 if (*flags & LDLM_FL_TEST_LOCK)
2279                         RETURN(ELDLM_OK);
2280
2281                 matched = ldlm_handle2lock(&lockh);
2282                 if (speculative) {
2283                         /* This DLM lock request is speculative, and does not
2284                          * have an associated IO request. Therefore if there
2285                          * is already a DLM lock, it wll just inform the
2286                          * caller to cancel the request for this stripe.*/
2287                         lock_res_and_lock(matched);
2288                         if (ldlm_extent_equal(&policy->l_extent,
2289                             &matched->l_policy_data.l_extent))
2290                                 rc = -EEXIST;
2291                         else
2292                                 rc = -ECANCELED;
2293                         unlock_res_and_lock(matched);
2294
2295                         ldlm_lock_decref(&lockh, mode);
2296                         LDLM_LOCK_PUT(matched);
2297                         RETURN(rc);
2298                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2299                         *flags |= LDLM_FL_LVB_READY;
2300
2301                         /* We already have a lock, and it's referenced. */
2302                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2303
2304                         ldlm_lock_decref(&lockh, mode);
2305                         LDLM_LOCK_PUT(matched);
2306                         RETURN(ELDLM_OK);
2307                 } else {
2308                         ldlm_lock_decref(&lockh, mode);
2309                         LDLM_LOCK_PUT(matched);
2310                 }
2311         }
2312
2313 no_match:
2314         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2315                 RETURN(-ENOLCK);
2316
2317         if (intent) {
2318                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2319                                            &RQF_LDLM_ENQUEUE_LVB);
2320                 if (req == NULL)
2321                         RETURN(-ENOMEM);
2322
2323                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2324                 if (rc) {
2325                         ptlrpc_request_free(req);
2326                         RETURN(rc);
2327                 }
2328
2329                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2330                                      sizeof *lvb);
2331                 ptlrpc_request_set_replen(req);
2332         }
2333
2334         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2335         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2336
2337         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2338                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2339         if (async) {
2340                 if (!rc) {
2341                         struct osc_enqueue_args *aa;
2342                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2343                         aa = ptlrpc_req_async_args(req);
2344                         aa->oa_exp         = exp;
2345                         aa->oa_mode        = einfo->ei_mode;
2346                         aa->oa_type        = einfo->ei_type;
2347                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2348                         aa->oa_upcall      = upcall;
2349                         aa->oa_cookie      = cookie;
2350                         aa->oa_speculative = speculative;
2351                         if (!speculative) {
2352                                 aa->oa_flags  = flags;
2353                                 aa->oa_lvb    = lvb;
2354                         } else {
2355                                 /* speculative locks are essentially to enqueue
2356                                  * a DLM lock  in advance, so we don't care
2357                                  * about the result of the enqueue. */
2358                                 aa->oa_lvb    = NULL;
2359                                 aa->oa_flags  = NULL;
2360                         }
2361
2362                         req->rq_interpret_reply =
2363                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2364                         if (rqset == PTLRPCD_SET)
2365                                 ptlrpcd_add_req(req);
2366                         else
2367                                 ptlrpc_set_add_req(rqset, req);
2368                 } else if (intent) {
2369                         ptlrpc_req_finished(req);
2370                 }
2371                 RETURN(rc);
2372         }
2373
2374         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2375                               flags, speculative, rc);
2376         if (intent)
2377                 ptlrpc_req_finished(req);
2378
2379         RETURN(rc);
2380 }
2381
2382 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2383                    enum ldlm_type type, union ldlm_policy_data *policy,
2384                    enum ldlm_mode mode, __u64 *flags, void *data,
2385                    struct lustre_handle *lockh, int unref)
2386 {
2387         struct obd_device *obd = exp->exp_obd;
2388         __u64 lflags = *flags;
2389         enum ldlm_mode rc;
2390         ENTRY;
2391
2392         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2393                 RETURN(-EIO);
2394
2395         /* Filesystem lock extents are extended to page boundaries so that
2396          * dealing with the page cache is a little smoother */
2397         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2398         policy->l_extent.end |= ~PAGE_MASK;
2399
2400         /* Next, search for already existing extent locks that will cover us */
2401         /* If we're trying to read, we also search for an existing PW lock.  The
2402          * VFS and page cache already protect us locally, so lots of readers/
2403          * writers can share a single PW lock. */
2404         rc = mode;
2405         if (mode == LCK_PR)
2406                 rc |= LCK_PW;
2407         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2408                              res_id, type, policy, rc, lockh, unref);
2409         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2410                 RETURN(rc);
2411
2412         if (data != NULL) {
2413                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2414
2415                 LASSERT(lock != NULL);
2416                 if (!osc_set_lock_data(lock, data)) {
2417                         ldlm_lock_decref(lockh, rc);
2418                         rc = 0;
2419                 }
2420                 LDLM_LOCK_PUT(lock);
2421         }
2422         RETURN(rc);
2423 }
2424
2425 static int osc_statfs_interpret(const struct lu_env *env,
2426                                 struct ptlrpc_request *req,
2427                                 struct osc_async_args *aa, int rc)
2428 {
2429         struct obd_statfs *msfs;
2430         ENTRY;
2431
2432         if (rc == -EBADR)
2433                 /* The request has in fact never been sent
2434                  * due to issues at a higher level (LOV).
2435                  * Exit immediately since the caller is
2436                  * aware of the problem and takes care
2437                  * of the clean up */
2438                  RETURN(rc);
2439
2440         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2441             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2442                 GOTO(out, rc = 0);
2443
2444         if (rc != 0)
2445                 GOTO(out, rc);
2446
2447         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2448         if (msfs == NULL) {
2449                 GOTO(out, rc = -EPROTO);
2450         }
2451
2452         *aa->aa_oi->oi_osfs = *msfs;
2453 out:
2454         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2455         RETURN(rc);
2456 }
2457
2458 static int osc_statfs_async(struct obd_export *exp,
2459                             struct obd_info *oinfo, __u64 max_age,
2460                             struct ptlrpc_request_set *rqset)
2461 {
2462         struct obd_device     *obd = class_exp2obd(exp);
2463         struct ptlrpc_request *req;
2464         struct osc_async_args *aa;
2465         int                    rc;
2466         ENTRY;
2467
2468         /* We could possibly pass max_age in the request (as an absolute
2469          * timestamp or a "seconds.usec ago") so the target can avoid doing
2470          * extra calls into the filesystem if that isn't necessary (e.g.
2471          * during mount that would help a bit).  Having relative timestamps
2472          * is not so great if request processing is slow, while absolute
2473          * timestamps are not ideal because they need time synchronization. */
2474         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2475         if (req == NULL)
2476                 RETURN(-ENOMEM);
2477
2478         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2479         if (rc) {
2480                 ptlrpc_request_free(req);
2481                 RETURN(rc);
2482         }
2483         ptlrpc_request_set_replen(req);
2484         req->rq_request_portal = OST_CREATE_PORTAL;
2485         ptlrpc_at_set_req_timeout(req);
2486
2487         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2488                 /* procfs requests not want stat in wait for avoid deadlock */
2489                 req->rq_no_resend = 1;
2490                 req->rq_no_delay = 1;
2491         }
2492
2493         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2494         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2495         aa = ptlrpc_req_async_args(req);
2496         aa->aa_oi = oinfo;
2497
2498         ptlrpc_set_add_req(rqset, req);
2499         RETURN(0);
2500 }
2501
2502 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2503                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2504 {
2505         struct obd_device     *obd = class_exp2obd(exp);
2506         struct obd_statfs     *msfs;
2507         struct ptlrpc_request *req;
2508         struct obd_import     *imp = NULL;
2509         int rc;
2510         ENTRY;
2511
2512         /*Since the request might also come from lprocfs, so we need
2513          *sync this with client_disconnect_export Bug15684*/
2514         down_read(&obd->u.cli.cl_sem);
2515         if (obd->u.cli.cl_import)
2516                 imp = class_import_get(obd->u.cli.cl_import);
2517         up_read(&obd->u.cli.cl_sem);
2518         if (!imp)
2519                 RETURN(-ENODEV);
2520
2521         /* We could possibly pass max_age in the request (as an absolute
2522          * timestamp or a "seconds.usec ago") so the target can avoid doing
2523          * extra calls into the filesystem if that isn't necessary (e.g.
2524          * during mount that would help a bit).  Having relative timestamps
2525          * is not so great if request processing is slow, while absolute
2526          * timestamps are not ideal because they need time synchronization. */
2527         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2528
2529         class_import_put(imp);
2530
2531         if (req == NULL)
2532                 RETURN(-ENOMEM);
2533
2534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2535         if (rc) {
2536                 ptlrpc_request_free(req);
2537                 RETURN(rc);
2538         }
2539         ptlrpc_request_set_replen(req);
2540         req->rq_request_portal = OST_CREATE_PORTAL;
2541         ptlrpc_at_set_req_timeout(req);
2542
2543         if (flags & OBD_STATFS_NODELAY) {
2544                 /* procfs requests not want stat in wait for avoid deadlock */
2545                 req->rq_no_resend = 1;
2546                 req->rq_no_delay = 1;
2547         }
2548
2549         rc = ptlrpc_queue_wait(req);
2550         if (rc)
2551                 GOTO(out, rc);
2552
2553         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2554         if (msfs == NULL) {
2555                 GOTO(out, rc = -EPROTO);
2556         }
2557
2558         *osfs = *msfs;
2559
2560         EXIT;
2561  out:
2562         ptlrpc_req_finished(req);
2563         return rc;
2564 }
2565
2566 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2567                          void *karg, void __user *uarg)
2568 {
2569         struct obd_device *obd = exp->exp_obd;
2570         struct obd_ioctl_data *data = karg;
2571         int err = 0;
2572         ENTRY;
2573
2574         if (!try_module_get(THIS_MODULE)) {
2575                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2576                        module_name(THIS_MODULE));
2577                 return -EINVAL;
2578         }
2579         switch (cmd) {
2580         case OBD_IOC_CLIENT_RECOVER:
2581                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2582                                             data->ioc_inlbuf1, 0);
2583                 if (err > 0)
2584                         err = 0;
2585                 GOTO(out, err);
2586         case IOC_OSC_SET_ACTIVE:
2587                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2588                                                data->ioc_offset);
2589                 GOTO(out, err);
2590         case OBD_IOC_PING_TARGET:
2591                 err = ptlrpc_obd_ping(obd);
2592                 GOTO(out, err);
2593         default:
2594                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2595                        cmd, current_comm());
2596                 GOTO(out, err = -ENOTTY);
2597         }
2598 out:
2599         module_put(THIS_MODULE);
2600         return err;
2601 }
2602
2603 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2604                        u32 keylen, void *key, u32 vallen, void *val,
2605                        struct ptlrpc_request_set *set)
2606 {
2607         struct ptlrpc_request *req;
2608         struct obd_device     *obd = exp->exp_obd;
2609         struct obd_import     *imp = class_exp2cliimp(exp);
2610         char                  *tmp;
2611         int                    rc;
2612         ENTRY;
2613
2614         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2615
2616         if (KEY_IS(KEY_CHECKSUM)) {
2617                 if (vallen != sizeof(int))
2618                         RETURN(-EINVAL);
2619                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2620                 RETURN(0);
2621         }
2622
2623         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2624                 sptlrpc_conf_client_adapt(obd);
2625                 RETURN(0);
2626         }
2627
2628         if (KEY_IS(KEY_FLUSH_CTX)) {
2629                 sptlrpc_import_flush_my_ctx(imp);
2630                 RETURN(0);
2631         }
2632
2633         if (KEY_IS(KEY_CACHE_SET)) {
2634                 struct client_obd *cli = &obd->u.cli;
2635
2636                 LASSERT(cli->cl_cache == NULL); /* only once */
2637                 cli->cl_cache = (struct cl_client_cache *)val;
2638                 cl_cache_incref(cli->cl_cache);
2639                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2640
2641                 /* add this osc into entity list */
2642                 LASSERT(list_empty(&cli->cl_lru_osc));
2643                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2644                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2645                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2646
2647                 RETURN(0);
2648         }
2649
2650         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2651                 struct client_obd *cli = &obd->u.cli;
2652                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2653                 long target = *(long *)val;
2654
2655                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2656                 *(long *)val -= nr;
2657                 RETURN(0);
2658         }
2659
2660         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2661                 RETURN(-EINVAL);
2662
2663         /* We pass all other commands directly to OST. Since nobody calls osc
2664            methods directly and everybody is supposed to go through LOV, we
2665            assume lov checked invalid values for us.
2666            The only recognised values so far are evict_by_nid and mds_conn.
2667            Even if something bad goes through, we'd get a -EINVAL from OST
2668            anyway. */
2669
2670         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2671                                                 &RQF_OST_SET_GRANT_INFO :
2672                                                 &RQF_OBD_SET_INFO);
2673         if (req == NULL)
2674                 RETURN(-ENOMEM);
2675
2676         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2677                              RCL_CLIENT, keylen);
2678         if (!KEY_IS(KEY_GRANT_SHRINK))
2679                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2680                                      RCL_CLIENT, vallen);
2681         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2682         if (rc) {
2683                 ptlrpc_request_free(req);
2684                 RETURN(rc);
2685         }
2686
2687         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2688         memcpy(tmp, key, keylen);
2689         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2690                                                         &RMF_OST_BODY :
2691                                                         &RMF_SETINFO_VAL);
2692         memcpy(tmp, val, vallen);
2693
2694         if (KEY_IS(KEY_GRANT_SHRINK)) {
2695                 struct osc_grant_args *aa;
2696                 struct obdo *oa;
2697
2698                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2699                 aa = ptlrpc_req_async_args(req);
2700                 OBDO_ALLOC(oa);
2701                 if (!oa) {
2702                         ptlrpc_req_finished(req);
2703                         RETURN(-ENOMEM);
2704                 }
2705                 *oa = ((struct ost_body *)val)->oa;
2706                 aa->aa_oa = oa;
2707                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2708         }
2709
2710         ptlrpc_request_set_replen(req);
2711         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2712                 LASSERT(set != NULL);
2713                 ptlrpc_set_add_req(set, req);
2714                 ptlrpc_check_set(NULL, set);
2715         } else {
2716                 ptlrpcd_add_req(req);
2717         }
2718
2719         RETURN(0);
2720 }
2721 EXPORT_SYMBOL(osc_set_info_async);
2722
2723 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2724                   struct obd_device *obd, struct obd_uuid *cluuid,
2725                   struct obd_connect_data *data, void *localdata)
2726 {
2727         struct client_obd *cli = &obd->u.cli;
2728
2729         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2730                 long lost_grant;
2731                 long grant;
2732
2733                 spin_lock(&cli->cl_loi_list_lock);
2734                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2735                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2736                         grant += cli->cl_dirty_grant;
2737                 else
2738                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2739                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2740                 lost_grant = cli->cl_lost_grant;
2741                 cli->cl_lost_grant = 0;
2742                 spin_unlock(&cli->cl_loi_list_lock);
2743
2744                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2745                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2746                        data->ocd_version, data->ocd_grant, lost_grant);
2747         }
2748
2749         RETURN(0);
2750 }
2751 EXPORT_SYMBOL(osc_reconnect);
2752
2753 int osc_disconnect(struct obd_export *exp)
2754 {
2755         struct obd_device *obd = class_exp2obd(exp);
2756         int rc;
2757
2758         rc = client_disconnect_export(exp);
2759         /**
2760          * Initially we put del_shrink_grant before disconnect_export, but it
2761          * causes the following problem if setup (connect) and cleanup
2762          * (disconnect) are tangled together.
2763          *      connect p1                     disconnect p2
2764          *   ptlrpc_connect_import
2765          *     ...............               class_manual_cleanup
2766          *                                     osc_disconnect
2767          *                                     del_shrink_grant
2768          *   ptlrpc_connect_interrupt
2769          *     init_grant_shrink
2770          *   add this client to shrink list
2771          *                                      cleanup_osc
2772          * Bang! pinger trigger the shrink.
2773          * So the osc should be disconnected from the shrink list, after we
2774          * are sure the import has been destroyed. BUG18662
2775          */
2776         if (obd->u.cli.cl_import == NULL)
2777                 osc_del_shrink_grant(&obd->u.cli);
2778         return rc;
2779 }
2780 EXPORT_SYMBOL(osc_disconnect);
2781
2782 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2783                                  struct hlist_node *hnode, void *arg)
2784 {
2785         struct lu_env *env = arg;
2786         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2787         struct ldlm_lock *lock;
2788         struct osc_object *osc = NULL;
2789         ENTRY;
2790
2791         lock_res(res);
2792         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2793                 if (lock->l_ast_data != NULL && osc == NULL) {
2794                         osc = lock->l_ast_data;
2795                         cl_object_get(osc2cl(osc));
2796                 }
2797
2798                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2799                  * by the 2nd round of ldlm_namespace_clean() call in
2800                  * osc_import_event(). */
2801                 ldlm_clear_cleaned(lock);
2802         }
2803         unlock_res(res);
2804
2805         if (osc != NULL) {
2806                 osc_object_invalidate(env, osc);
2807                 cl_object_put(env, osc2cl(osc));
2808         }
2809
2810         RETURN(0);
2811 }
2812 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2813
2814 static int osc_import_event(struct obd_device *obd,
2815                             struct obd_import *imp,
2816                             enum obd_import_event event)
2817 {
2818         struct client_obd *cli;
2819         int rc = 0;
2820
2821         ENTRY;
2822         LASSERT(imp->imp_obd == obd);
2823
2824         switch (event) {
2825         case IMP_EVENT_DISCON: {
2826                 cli = &obd->u.cli;
2827                 spin_lock(&cli->cl_loi_list_lock);
2828                 cli->cl_avail_grant = 0;
2829                 cli->cl_lost_grant = 0;
2830                 spin_unlock(&cli->cl_loi_list_lock);
2831                 break;
2832         }
2833         case IMP_EVENT_INACTIVE: {
2834                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2835                 break;
2836         }
2837         case IMP_EVENT_INVALIDATE: {
2838                 struct ldlm_namespace *ns = obd->obd_namespace;
2839                 struct lu_env         *env;
2840                 __u16                  refcheck;
2841
2842                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2843
2844                 env = cl_env_get(&refcheck);
2845                 if (!IS_ERR(env)) {
2846                         osc_io_unplug(env, &obd->u.cli, NULL);
2847
2848                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2849                                                  osc_ldlm_resource_invalidate,
2850                                                  env, 0);
2851                         cl_env_put(env, &refcheck);
2852
2853                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2854                 } else
2855                         rc = PTR_ERR(env);
2856                 break;
2857         }
2858         case IMP_EVENT_ACTIVE: {
2859                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2860                 break;
2861         }
2862         case IMP_EVENT_OCD: {
2863                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2864
2865                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2866                         osc_init_grant(&obd->u.cli, ocd);
2867
2868                 /* See bug 7198 */
2869                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2870                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2871
2872                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2873                 break;
2874         }
2875         case IMP_EVENT_DEACTIVATE: {
2876                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2877                 break;
2878         }
2879         case IMP_EVENT_ACTIVATE: {
2880                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2881                 break;
2882         }
2883         default:
2884                 CERROR("Unknown import event %d\n", event);
2885                 LBUG();
2886         }
2887         RETURN(rc);
2888 }
2889
2890 /**
2891  * Determine whether the lock can be canceled before replaying the lock
2892  * during recovery, see bug16774 for detailed information.
2893  *
2894  * \retval zero the lock can't be canceled
2895  * \retval other ok to cancel
2896  */
2897 static int osc_cancel_weight(struct ldlm_lock *lock)
2898 {
2899         /*
2900          * Cancel all unused and granted extent lock.
2901          */
2902         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2903             lock->l_granted_mode == lock->l_req_mode &&
2904             osc_ldlm_weigh_ast(lock) == 0)
2905                 RETURN(1);
2906
2907         RETURN(0);
2908 }
2909
2910 static int brw_queue_work(const struct lu_env *env, void *data)
2911 {
2912         struct client_obd *cli = data;
2913
2914         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2915
2916         osc_io_unplug(env, cli, NULL);
2917         RETURN(0);
2918 }
2919
2920 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2921 {
2922         struct client_obd *cli = &obd->u.cli;
2923         void *handler;
2924         int rc;
2925
2926         ENTRY;
2927
2928         rc = ptlrpcd_addref();
2929         if (rc)
2930                 RETURN(rc);
2931
2932         rc = client_obd_setup(obd, lcfg);
2933         if (rc)
2934                 GOTO(out_ptlrpcd, rc);
2935
2936
2937         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2938         if (IS_ERR(handler))
2939                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2940         cli->cl_writeback_work = handler;
2941
2942         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2943         if (IS_ERR(handler))
2944                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2945         cli->cl_lru_work = handler;
2946
2947         rc = osc_quota_setup(obd);
2948         if (rc)
2949                 GOTO(out_ptlrpcd_work, rc);
2950
2951         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2952
2953         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2954         RETURN(rc);
2955
2956 out_ptlrpcd_work:
2957         if (cli->cl_writeback_work != NULL) {
2958                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2959                 cli->cl_writeback_work = NULL;
2960         }
2961         if (cli->cl_lru_work != NULL) {
2962                 ptlrpcd_destroy_work(cli->cl_lru_work);
2963                 cli->cl_lru_work = NULL;
2964         }
2965         client_obd_cleanup(obd);
2966 out_ptlrpcd:
2967         ptlrpcd_decref();
2968         RETURN(rc);
2969 }
2970 EXPORT_SYMBOL(osc_setup_common);
2971
2972 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2973 {
2974         struct client_obd *cli = &obd->u.cli;
2975         struct obd_type   *type;
2976         int                adding;
2977         int                added;
2978         int                req_count;
2979         int                rc;
2980
2981         ENTRY;
2982
2983         rc = osc_setup_common(obd, lcfg);
2984         if (rc < 0)
2985                 RETURN(rc);
2986
2987 #ifdef CONFIG_PROC_FS
2988         obd->obd_vars = lprocfs_osc_obd_vars;
2989 #endif
2990         /* If this is true then both client (osc) and server (osp) are on the
2991          * same node. The osp layer if loaded first will register the osc proc
2992          * directory. In that case this obd_device will be attached its proc
2993          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2994          */
2995         type = class_search_type(LUSTRE_OSP_NAME);
2996         if (type && type->typ_procsym) {
2997                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2998                                                        type->typ_procsym,
2999                                                        obd->obd_vars, obd);
3000                 if (IS_ERR(obd->obd_proc_entry)) {
3001                         rc = PTR_ERR(obd->obd_proc_entry);
3002                         CERROR("error %d setting up lprocfs for %s\n", rc,
3003                                obd->obd_name);
3004                         obd->obd_proc_entry = NULL;
3005                 }
3006         }
3007
3008         rc = lprocfs_obd_setup(obd, false);
3009         if (!rc) {
3010                 /* If the basic OSC proc tree construction succeeded then
3011                  * lets do the rest.
3012                  */
3013                 lproc_osc_attach_seqstat(obd);
3014                 sptlrpc_lprocfs_cliobd_attach(obd);
3015                 ptlrpc_lprocfs_register_obd(obd);
3016         }
3017
3018         /*
3019          * We try to control the total number of requests with a upper limit
3020          * osc_reqpool_maxreqcount. There might be some race which will cause
3021          * over-limit allocation, but it is fine.
3022          */
3023         req_count = atomic_read(&osc_pool_req_count);
3024         if (req_count < osc_reqpool_maxreqcount) {
3025                 adding = cli->cl_max_rpcs_in_flight + 2;
3026                 if (req_count + adding > osc_reqpool_maxreqcount)
3027                         adding = osc_reqpool_maxreqcount - req_count;
3028
3029                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3030                 atomic_add(added, &osc_pool_req_count);
3031         }
3032
3033         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3034         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3035
3036         spin_lock(&osc_shrink_lock);
3037         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3038         spin_unlock(&osc_shrink_lock);
3039
3040         RETURN(0);
3041 }
3042
3043 int osc_precleanup_common(struct obd_device *obd)
3044 {
3045         struct client_obd *cli = &obd->u.cli;
3046         ENTRY;
3047
3048         /* LU-464
3049          * for echo client, export may be on zombie list, wait for
3050          * zombie thread to cull it, because cli.cl_import will be
3051          * cleared in client_disconnect_export():
3052          *   class_export_destroy() -> obd_cleanup() ->
3053          *   echo_device_free() -> echo_client_cleanup() ->
3054          *   obd_disconnect() -> osc_disconnect() ->
3055          *   client_disconnect_export()
3056          */
3057         obd_zombie_barrier();
3058         if (cli->cl_writeback_work) {
3059                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3060                 cli->cl_writeback_work = NULL;
3061         }
3062
3063         if (cli->cl_lru_work) {
3064                 ptlrpcd_destroy_work(cli->cl_lru_work);
3065                 cli->cl_lru_work = NULL;
3066         }
3067
3068         obd_cleanup_client_import(obd);
3069         RETURN(0);
3070 }
3071 EXPORT_SYMBOL(osc_precleanup_common);
3072
3073 static int osc_precleanup(struct obd_device *obd)
3074 {
3075         ENTRY;
3076
3077         osc_precleanup_common(obd);
3078
3079         ptlrpc_lprocfs_unregister_obd(obd);
3080         RETURN(0);
3081 }
3082
3083 int osc_cleanup_common(struct obd_device *obd)
3084 {
3085         struct client_obd *cli = &obd->u.cli;
3086         int rc;
3087
3088         ENTRY;
3089
3090         spin_lock(&osc_shrink_lock);
3091         list_del(&cli->cl_shrink_list);
3092         spin_unlock(&osc_shrink_lock);
3093
3094         /* lru cleanup */
3095         if (cli->cl_cache != NULL) {
3096                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3097                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3098                 list_del_init(&cli->cl_lru_osc);
3099                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3100                 cli->cl_lru_left = NULL;
3101                 cl_cache_decref(cli->cl_cache);
3102                 cli->cl_cache = NULL;
3103         }
3104
3105         /* free memory of osc quota cache */
3106         osc_quota_cleanup(obd);
3107
3108         rc = client_obd_cleanup(obd);
3109
3110         ptlrpcd_decref();
3111         RETURN(rc);
3112 }
3113 EXPORT_SYMBOL(osc_cleanup_common);
3114
3115 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3116 {
3117         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3118         return rc > 0 ? 0: rc;
3119 }
3120
3121 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3122 {
3123         return osc_process_config_base(obd, buf);
3124 }
3125
3126 static struct obd_ops osc_obd_ops = {
3127         .o_owner                = THIS_MODULE,
3128         .o_setup                = osc_setup,
3129         .o_precleanup           = osc_precleanup,
3130         .o_cleanup              = osc_cleanup_common,
3131         .o_add_conn             = client_import_add_conn,
3132         .o_del_conn             = client_import_del_conn,
3133         .o_connect              = client_connect_import,
3134         .o_reconnect            = osc_reconnect,
3135         .o_disconnect           = osc_disconnect,
3136         .o_statfs               = osc_statfs,
3137         .o_statfs_async         = osc_statfs_async,
3138         .o_create               = osc_create,
3139         .o_destroy              = osc_destroy,
3140         .o_getattr              = osc_getattr,
3141         .o_setattr              = osc_setattr,
3142         .o_iocontrol            = osc_iocontrol,
3143         .o_set_info_async       = osc_set_info_async,
3144         .o_import_event         = osc_import_event,
3145         .o_process_config       = osc_process_config,
3146         .o_quotactl             = osc_quotactl,
3147 };
3148
3149 static struct shrinker *osc_cache_shrinker;
3150 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3151 DEFINE_SPINLOCK(osc_shrink_lock);
3152
3153 #ifndef HAVE_SHRINKER_COUNT
3154 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3155 {
3156         struct shrink_control scv = {
3157                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3158                 .gfp_mask   = shrink_param(sc, gfp_mask)
3159         };
3160 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3161         struct shrinker *shrinker = NULL;
3162 #endif
3163
3164         (void)osc_cache_shrink_scan(shrinker, &scv);
3165
3166         return osc_cache_shrink_count(shrinker, &scv);
3167 }
3168 #endif
3169
3170 static int __init osc_init(void)
3171 {
3172         bool enable_proc = true;
3173         struct obd_type *type;
3174         unsigned int reqpool_size;
3175         unsigned int reqsize;
3176         int rc;
3177         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3178                          osc_cache_shrink_count, osc_cache_shrink_scan);
3179         ENTRY;
3180
3181         /* print an address of _any_ initialized kernel symbol from this
3182          * module, to allow debugging with gdb that doesn't support data
3183          * symbols from modules.*/
3184         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3185
3186         rc = lu_kmem_init(osc_caches);
3187         if (rc)
3188                 RETURN(rc);
3189
3190         type = class_search_type(LUSTRE_OSP_NAME);
3191         if (type != NULL && type->typ_procsym != NULL)
3192                 enable_proc = false;
3193
3194         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3195                                  LUSTRE_OSC_NAME, &osc_device_type);
3196         if (rc)
3197                 GOTO(out_kmem, rc);
3198
3199         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3200
3201         /* This is obviously too much memory, only prevent overflow here */
3202         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3203                 GOTO(out_type, rc = -EINVAL);
3204
3205         reqpool_size = osc_reqpool_mem_max << 20;
3206
3207         reqsize = 1;
3208         while (reqsize < OST_IO_MAXREQSIZE)
3209                 reqsize = reqsize << 1;
3210
3211         /*
3212          * We don't enlarge the request count in OSC pool according to
3213          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3214          * tried after normal allocation failed. So a small OSC pool won't
3215          * cause much performance degression in most of cases.
3216          */
3217         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3218
3219         atomic_set(&osc_pool_req_count, 0);
3220         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3221                                           ptlrpc_add_rqs_to_pool);
3222
3223         if (osc_rq_pool != NULL)
3224                 GOTO(out, rc);
3225         rc = -ENOMEM;
3226 out_type:
3227         class_unregister_type(LUSTRE_OSC_NAME);
3228 out_kmem:
3229         lu_kmem_fini(osc_caches);
3230 out:
3231         RETURN(rc);
3232 }
3233
3234 static void __exit osc_exit(void)
3235 {
3236         remove_shrinker(osc_cache_shrinker);
3237         class_unregister_type(LUSTRE_OSC_NAME);
3238         lu_kmem_fini(osc_caches);
3239         ptlrpc_free_rq_pool(osc_rq_pool);
3240 }
3241
3242 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3243 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3244 MODULE_VERSION(LUSTRE_VERSION_STRING);
3245 MODULE_LICENSE("GPL");
3246
3247 module_init(osc_init);
3248 module_exit(osc_exit);