Whamcloud - gitweb
9cb2c6da096cf4ea7a2ecbf0d727cc2260b6ae20
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <libcfs/libcfs.h>
36
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50
51 #include "osc_internal.h"
52
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
56
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
60
61 #define osc_grant_args osc_brw_async_args
62
63 struct osc_setattr_args {
64         struct obdo             *sa_oa;
65         obd_enqueue_update_f     sa_upcall;
66         void                    *sa_cookie;
67 };
68
69 struct osc_fsync_args {
70         struct osc_object       *fa_obj;
71         struct obdo             *fa_oa;
72         obd_enqueue_update_f    fa_upcall;
73         void                    *fa_cookie;
74 };
75
76 struct osc_ladvise_args {
77         struct obdo             *la_oa;
78         obd_enqueue_update_f     la_upcall;
79         void                    *la_cookie;
80 };
81
82 static void osc_release_ppga(struct brw_page **ppga, size_t count);
83 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
84                          void *data, int rc);
85
86 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
87 {
88         struct ost_body *body;
89
90         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
91         LASSERT(body);
92
93         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
94 }
95
96 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
97                        struct obdo *oa)
98 {
99         struct ptlrpc_request   *req;
100         struct ost_body         *body;
101         int                      rc;
102
103         ENTRY;
104         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
105         if (req == NULL)
106                 RETURN(-ENOMEM);
107
108         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
109         if (rc) {
110                 ptlrpc_request_free(req);
111                 RETURN(rc);
112         }
113
114         osc_pack_req_body(req, oa);
115
116         ptlrpc_request_set_replen(req);
117
118         rc = ptlrpc_queue_wait(req);
119         if (rc)
120                 GOTO(out, rc);
121
122         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
123         if (body == NULL)
124                 GOTO(out, rc = -EPROTO);
125
126         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
127         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
128
129         oa->o_blksize = cli_brw_size(exp->exp_obd);
130         oa->o_valid |= OBD_MD_FLBLKSZ;
131
132         EXIT;
133 out:
134         ptlrpc_req_finished(req);
135
136         return rc;
137 }
138
139 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
140                        struct obdo *oa)
141 {
142         struct ptlrpc_request   *req;
143         struct ost_body         *body;
144         int                      rc;
145
146         ENTRY;
147         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
148
149         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
150         if (req == NULL)
151                 RETURN(-ENOMEM);
152
153         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
154         if (rc) {
155                 ptlrpc_request_free(req);
156                 RETURN(rc);
157         }
158
159         osc_pack_req_body(req, oa);
160
161         ptlrpc_request_set_replen(req);
162
163         rc = ptlrpc_queue_wait(req);
164         if (rc)
165                 GOTO(out, rc);
166
167         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
168         if (body == NULL)
169                 GOTO(out, rc = -EPROTO);
170
171         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
172
173         EXIT;
174 out:
175         ptlrpc_req_finished(req);
176
177         RETURN(rc);
178 }
179
180 static int osc_setattr_interpret(const struct lu_env *env,
181                                  struct ptlrpc_request *req,
182                                  struct osc_setattr_args *sa, int rc)
183 {
184         struct ost_body *body;
185         ENTRY;
186
187         if (rc != 0)
188                 GOTO(out, rc);
189
190         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191         if (body == NULL)
192                 GOTO(out, rc = -EPROTO);
193
194         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
195                              &body->oa);
196 out:
197         rc = sa->sa_upcall(sa->sa_cookie, rc);
198         RETURN(rc);
199 }
200
201 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
202                       obd_enqueue_update_f upcall, void *cookie,
203                       struct ptlrpc_request_set *rqset)
204 {
205         struct ptlrpc_request   *req;
206         struct osc_setattr_args *sa;
207         int                      rc;
208
209         ENTRY;
210
211         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
212         if (req == NULL)
213                 RETURN(-ENOMEM);
214
215         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
216         if (rc) {
217                 ptlrpc_request_free(req);
218                 RETURN(rc);
219         }
220
221         osc_pack_req_body(req, oa);
222
223         ptlrpc_request_set_replen(req);
224
225         /* do mds to ost setattr asynchronously */
226         if (!rqset) {
227                 /* Do not wait for response. */
228                 ptlrpcd_add_req(req);
229         } else {
230                 req->rq_interpret_reply =
231                         (ptlrpc_interpterer_t)osc_setattr_interpret;
232
233                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
234                 sa = ptlrpc_req_async_args(req);
235                 sa->sa_oa = oa;
236                 sa->sa_upcall = upcall;
237                 sa->sa_cookie = cookie;
238
239                 if (rqset == PTLRPCD_SET)
240                         ptlrpcd_add_req(req);
241                 else
242                         ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
323         la = ptlrpc_req_async_args(req);
324         la->la_oa = oa;
325         la->la_upcall = upcall;
326         la->la_cookie = cookie;
327
328         if (rqset == PTLRPCD_SET)
329                 ptlrpcd_add_req(req);
330         else
331                 ptlrpc_set_add_req(rqset, req);
332
333         RETURN(0);
334 }
335
336 static int osc_create(const struct lu_env *env, struct obd_export *exp,
337                       struct obdo *oa)
338 {
339         struct ptlrpc_request *req;
340         struct ost_body       *body;
341         int                    rc;
342         ENTRY;
343
344         LASSERT(oa != NULL);
345         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
346         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
347
348         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
349         if (req == NULL)
350                 GOTO(out, rc = -ENOMEM);
351
352         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
353         if (rc) {
354                 ptlrpc_request_free(req);
355                 GOTO(out, rc);
356         }
357
358         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359         LASSERT(body);
360
361         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
362
363         ptlrpc_request_set_replen(req);
364
365         rc = ptlrpc_queue_wait(req);
366         if (rc)
367                 GOTO(out_req, rc);
368
369         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
370         if (body == NULL)
371                 GOTO(out_req, rc = -EPROTO);
372
373         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
375
376         oa->o_blksize = cli_brw_size(exp->exp_obd);
377         oa->o_valid |= OBD_MD_FLBLKSZ;
378
379         CDEBUG(D_HA, "transno: %lld\n",
380                lustre_msg_get_transno(req->rq_repmsg));
381 out_req:
382         ptlrpc_req_finished(req);
383 out:
384         RETURN(rc);
385 }
386
387 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
388                    obd_enqueue_update_f upcall, void *cookie)
389 {
390         struct ptlrpc_request *req;
391         struct osc_setattr_args *sa;
392         struct obd_import *imp = class_exp2cliimp(exp);
393         struct ost_body *body;
394         int rc;
395
396         ENTRY;
397
398         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
399         if (req == NULL)
400                 RETURN(-ENOMEM);
401
402         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
403         if (rc < 0) {
404                 ptlrpc_request_free(req);
405                 RETURN(rc);
406         }
407
408         osc_set_io_portal(req);
409
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413
414         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
415
416         ptlrpc_request_set_replen(req);
417
418         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
419         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
420         sa = ptlrpc_req_async_args(req);
421         sa->sa_oa = oa;
422         sa->sa_upcall = upcall;
423         sa->sa_cookie = cookie;
424
425         ptlrpcd_add_req(req);
426
427         RETURN(0);
428 }
429 EXPORT_SYMBOL(osc_punch_send);
430
431 static int osc_sync_interpret(const struct lu_env *env,
432                               struct ptlrpc_request *req,
433                               void *arg, int rc)
434 {
435         struct osc_fsync_args   *fa = arg;
436         struct ost_body         *body;
437         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
438         unsigned long           valid = 0;
439         struct cl_object        *obj;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oa = body->oa;
452         obj = osc2cl(fa->fa_obj);
453
454         /* Update osc object's blocks attribute */
455         cl_object_attr_lock(obj);
456         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457                 attr->cat_blocks = body->oa.o_blocks;
458                 valid |= CAT_BLOCKS;
459         }
460
461         if (valid != 0)
462                 cl_object_attr_update(env, obj, attr, valid);
463         cl_object_attr_unlock(obj);
464
465 out:
466         rc = fa->fa_upcall(fa->fa_cookie, rc);
467         RETURN(rc);
468 }
469
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471                   obd_enqueue_update_f upcall, void *cookie,
472                   struct ptlrpc_request_set *rqset)
473 {
474         struct obd_export     *exp = osc_export(obj);
475         struct ptlrpc_request *req;
476         struct ost_body       *body;
477         struct osc_fsync_args *fa;
478         int                    rc;
479         ENTRY;
480
481         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
482         if (req == NULL)
483                 RETURN(-ENOMEM);
484
485         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
486         if (rc) {
487                 ptlrpc_request_free(req);
488                 RETURN(rc);
489         }
490
491         /* overload the size and blocks fields in the oa with start/end */
492         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
493         LASSERT(body);
494         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
495
496         ptlrpc_request_set_replen(req);
497         req->rq_interpret_reply = osc_sync_interpret;
498
499         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500         fa = ptlrpc_req_async_args(req);
501         fa->fa_obj = obj;
502         fa->fa_oa = oa;
503         fa->fa_upcall = upcall;
504         fa->fa_cookie = cookie;
505
506         if (rqset == PTLRPCD_SET)
507                 ptlrpcd_add_req(req);
508         else
509                 ptlrpc_set_add_req(rqset, req);
510
511         RETURN (0);
512 }
513
514 /* Find and cancel locally locks matched by @mode in the resource found by
515  * @objid. Found locks are added into @cancel list. Returns the amount of
516  * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518                                    struct list_head *cancels,
519                                    enum ldlm_mode mode, __u64 lock_flags)
520 {
521         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522         struct ldlm_res_id res_id;
523         struct ldlm_resource *res;
524         int count;
525         ENTRY;
526
527         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528          * export) but disabled through procfs (flag in NS).
529          *
530          * This distinguishes from a case when ELC is not supported originally,
531          * when we still want to cancel locks in advance and just cancel them
532          * locally, without sending any RPC. */
533         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
534                 RETURN(0);
535
536         ostid_build_res_name(&oa->o_oi, &res_id);
537         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
538         if (IS_ERR(res))
539                 RETURN(0);
540
541         LDLM_RESOURCE_ADDREF(res);
542         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543                                            lock_flags, 0, NULL);
544         LDLM_RESOURCE_DELREF(res);
545         ldlm_resource_putref(res);
546         RETURN(count);
547 }
548
549 static int osc_destroy_interpret(const struct lu_env *env,
550                                  struct ptlrpc_request *req, void *data,
551                                  int rc)
552 {
553         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554
555         atomic_dec(&cli->cl_destroy_in_flight);
556         wake_up(&cli->cl_destroy_waitq);
557         return 0;
558 }
559
560 static int osc_can_send_destroy(struct client_obd *cli)
561 {
562         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563             cli->cl_max_rpcs_in_flight) {
564                 /* The destroy request can be sent */
565                 return 1;
566         }
567         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568             cli->cl_max_rpcs_in_flight) {
569                 /*
570                  * The counter has been modified between the two atomic
571                  * operations.
572                  */
573                 wake_up(&cli->cl_destroy_waitq);
574         }
575         return 0;
576 }
577
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
579                        struct obdo *oa)
580 {
581         struct client_obd     *cli = &exp->exp_obd->u.cli;
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct list_head       cancels = LIST_HEAD_INIT(cancels);
585         int rc, count;
586         ENTRY;
587
588         if (!oa) {
589                 CDEBUG(D_INFO, "oa NULL\n");
590                 RETURN(-EINVAL);
591         }
592
593         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594                                         LDLM_FL_DISCARD_DATA);
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
597         if (req == NULL) {
598                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
599                 RETURN(-ENOMEM);
600         }
601
602         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
603                                0, &cancels, count);
604         if (rc) {
605                 ptlrpc_request_free(req);
606                 RETURN(rc);
607         }
608
609         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610         ptlrpc_at_set_req_timeout(req);
611
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
615
616         ptlrpc_request_set_replen(req);
617
618         req->rq_interpret_reply = osc_destroy_interpret;
619         if (!osc_can_send_destroy(cli)) {
620                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
621
622                 /*
623                  * Wait until the number of on-going destroy RPCs drops
624                  * under max_rpc_in_flight
625                  */
626                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627                                             osc_can_send_destroy(cli), &lwi);
628                 if (rc) {
629                         ptlrpc_req_finished(req);
630                         RETURN(rc);
631                 }
632         }
633
634         /* Do not wait for response */
635         ptlrpcd_add_req(req);
636         RETURN(0);
637 }
638
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
640                                 long writing_bytes)
641 {
642         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
643
644         LASSERT(!(oa->o_valid & bits));
645
646         oa->o_valid |= bits;
647         spin_lock(&cli->cl_loi_list_lock);
648         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649                 oa->o_dirty = cli->cl_dirty_grant;
650         else
651                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653                      cli->cl_dirty_max_pages)) {
654                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655                        cli->cl_dirty_pages, cli->cl_dirty_transit,
656                        cli->cl_dirty_max_pages);
657                 oa->o_undirty = 0;
658         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659                             atomic_long_read(&obd_dirty_transit_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        atomic_long_read(&obd_dirty_transit_pages),
667                        obd_max_dirty_pages);
668                 oa->o_undirty = 0;
669         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670                             0x7fffffff)) {
671                 CERROR("dirty %lu - dirty_max %lu too big???\n",
672                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
673                 oa->o_undirty = 0;
674         } else {
675                 unsigned long nrpages;
676
677                 nrpages = cli->cl_max_pages_per_rpc;
678                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
679                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
680                 oa->o_undirty = nrpages << PAGE_SHIFT;
681                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
682                                  GRANT_PARAM)) {
683                         int nrextents;
684
685                         /* take extent tax into account when asking for more
686                          * grant space */
687                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
688                                      cli->cl_max_extent_pages;
689                         oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
690                 }
691         }
692         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
693         oa->o_dropped = cli->cl_lost_grant;
694         cli->cl_lost_grant = 0;
695         spin_unlock(&cli->cl_loi_list_lock);
696         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
697                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
698 }
699
700 void osc_update_next_shrink(struct client_obd *cli)
701 {
702         cli->cl_next_shrink_grant = ktime_get_seconds() +
703                                     cli->cl_grant_shrink_interval;
704
705         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
706                cli->cl_next_shrink_grant);
707 }
708
709 static void __osc_update_grant(struct client_obd *cli, u64 grant)
710 {
711         spin_lock(&cli->cl_loi_list_lock);
712         cli->cl_avail_grant += grant;
713         spin_unlock(&cli->cl_loi_list_lock);
714 }
715
716 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
717 {
718         if (body->oa.o_valid & OBD_MD_FLGRANT) {
719                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
720                 __osc_update_grant(cli, body->oa.o_grant);
721         }
722 }
723
724 static int osc_shrink_grant_interpret(const struct lu_env *env,
725                                       struct ptlrpc_request *req,
726                                       void *aa, int rc)
727 {
728         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
730         struct ost_body *body;
731
732         if (rc != 0) {
733                 __osc_update_grant(cli, oa->o_grant);
734                 GOTO(out, rc);
735         }
736
737         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
738         LASSERT(body);
739         osc_update_grant(cli, body);
740 out:
741         OBDO_FREE(oa);
742         return rc;
743 }
744
745 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
746 {
747         spin_lock(&cli->cl_loi_list_lock);
748         oa->o_grant = cli->cl_avail_grant / 4;
749         cli->cl_avail_grant -= oa->o_grant;
750         spin_unlock(&cli->cl_loi_list_lock);
751         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
752                 oa->o_valid |= OBD_MD_FLFLAGS;
753                 oa->o_flags = 0;
754         }
755         oa->o_flags |= OBD_FL_SHRINK_GRANT;
756         osc_update_next_shrink(cli);
757 }
758
759 /* Shrink the current grant, either from some large amount to enough for a
760  * full set of in-flight RPCs, or if we have already shrunk to that limit
761  * then to enough for a single RPC.  This avoids keeping more grant than
762  * needed, and avoids shrinking the grant piecemeal. */
763 static int osc_shrink_grant(struct client_obd *cli)
764 {
765         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
766                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
767
768         spin_lock(&cli->cl_loi_list_lock);
769         if (cli->cl_avail_grant <= target_bytes)
770                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
771         spin_unlock(&cli->cl_loi_list_lock);
772
773         return osc_shrink_grant_to_target(cli, target_bytes);
774 }
775
776 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
777 {
778         int                     rc = 0;
779         struct ost_body        *body;
780         ENTRY;
781
782         spin_lock(&cli->cl_loi_list_lock);
783         /* Don't shrink if we are already above or below the desired limit
784          * We don't want to shrink below a single RPC, as that will negatively
785          * impact block allocation and long-term performance. */
786         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
787                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788
789         if (target_bytes >= cli->cl_avail_grant) {
790                 spin_unlock(&cli->cl_loi_list_lock);
791                 RETURN(0);
792         }
793         spin_unlock(&cli->cl_loi_list_lock);
794
795         OBD_ALLOC_PTR(body);
796         if (!body)
797                 RETURN(-ENOMEM);
798
799         osc_announce_cached(cli, &body->oa, 0);
800
801         spin_lock(&cli->cl_loi_list_lock);
802         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
803         cli->cl_avail_grant = target_bytes;
804         spin_unlock(&cli->cl_loi_list_lock);
805         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
806                 body->oa.o_valid |= OBD_MD_FLFLAGS;
807                 body->oa.o_flags = 0;
808         }
809         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
810         osc_update_next_shrink(cli);
811
812         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
813                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
814                                 sizeof(*body), body, NULL);
815         if (rc != 0)
816                 __osc_update_grant(cli, body->oa.o_grant);
817         OBD_FREE_PTR(body);
818         RETURN(rc);
819 }
820
821 static int osc_should_shrink_grant(struct client_obd *client)
822 {
823         time64_t next_shrink = client->cl_next_shrink_grant;
824
825         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
826              OBD_CONNECT_GRANT_SHRINK) == 0)
827                 return 0;
828
829         if (ktime_get_seconds() >= next_shrink - 5) {
830                 /* Get the current RPC size directly, instead of going via:
831                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
832                  * Keep comment here so that it can be found by searching. */
833                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
834
835                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
836                     client->cl_avail_grant > brw_size)
837                         return 1;
838                 else
839                         osc_update_next_shrink(client);
840         }
841         return 0;
842 }
843
844 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
845 {
846         struct client_obd *client;
847
848         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
849                 if (osc_should_shrink_grant(client))
850                         osc_shrink_grant(client);
851         }
852         return 0;
853 }
854
855 static int osc_add_shrink_grant(struct client_obd *client)
856 {
857         int rc;
858
859         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
860                                        TIMEOUT_GRANT,
861                                        osc_grant_shrink_grant_cb, NULL,
862                                        &client->cl_grant_shrink_list);
863         if (rc) {
864                 CERROR("add grant client %s error %d\n", cli_name(client), rc);
865                 return rc;
866         }
867         CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
868         osc_update_next_shrink(client);
869         return 0;
870 }
871
872 static int osc_del_shrink_grant(struct client_obd *client)
873 {
874         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
875                                          TIMEOUT_GRANT);
876 }
877
878 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
879 {
880         /*
881          * ocd_grant is the total grant amount we're expect to hold: if we've
882          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
883          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
884          * dirty.
885          *
886          * race is tolerable here: if we're evicted, but imp_state already
887          * left EVICTED state, then cl_dirty_pages must be 0 already.
888          */
889         spin_lock(&cli->cl_loi_list_lock);
890         cli->cl_avail_grant = ocd->ocd_grant;
891         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
892                 cli->cl_avail_grant -= cli->cl_reserved_grant;
893                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
894                         cli->cl_avail_grant -= cli->cl_dirty_grant;
895                 else
896                         cli->cl_avail_grant -=
897                                         cli->cl_dirty_pages << PAGE_SHIFT;
898         }
899
900         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
901                 u64 size;
902                 int chunk_mask;
903
904                 /* overhead for each extent insertion */
905                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
906                 /* determine the appropriate chunk size used by osc_extent. */
907                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
908                                           ocd->ocd_grant_blkbits);
909                 /* max_pages_per_rpc must be chunk aligned */
910                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
911                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
912                                              ~chunk_mask) & chunk_mask;
913                 /* determine maximum extent size, in #pages */
914                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
915                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
916                 if (cli->cl_max_extent_pages == 0)
917                         cli->cl_max_extent_pages = 1;
918         } else {
919                 cli->cl_grant_extent_tax = 0;
920                 cli->cl_chunkbits = PAGE_SHIFT;
921                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
922         }
923         spin_unlock(&cli->cl_loi_list_lock);
924
925         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
926                 "chunk bits: %d cl_max_extent_pages: %d\n",
927                 cli_name(cli),
928                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
929                 cli->cl_max_extent_pages);
930
931         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
932             list_empty(&cli->cl_grant_shrink_list))
933                 osc_add_shrink_grant(cli);
934 }
935 EXPORT_SYMBOL(osc_init_grant);
936
937 /* We assume that the reason this OSC got a short read is because it read
938  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
939  * via the LOV, and it _knows_ it's reading inside the file, it's just that
940  * this stripe never got written at or beyond this stripe offset yet. */
941 static void handle_short_read(int nob_read, size_t page_count,
942                               struct brw_page **pga)
943 {
944         char *ptr;
945         int i = 0;
946
947         /* skip bytes read OK */
948         while (nob_read > 0) {
949                 LASSERT (page_count > 0);
950
951                 if (pga[i]->count > nob_read) {
952                         /* EOF inside this page */
953                         ptr = kmap(pga[i]->pg) +
954                                 (pga[i]->off & ~PAGE_MASK);
955                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
956                         kunmap(pga[i]->pg);
957                         page_count--;
958                         i++;
959                         break;
960                 }
961
962                 nob_read -= pga[i]->count;
963                 page_count--;
964                 i++;
965         }
966
967         /* zero remaining pages */
968         while (page_count-- > 0) {
969                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
970                 memset(ptr, 0, pga[i]->count);
971                 kunmap(pga[i]->pg);
972                 i++;
973         }
974 }
975
976 static int check_write_rcs(struct ptlrpc_request *req,
977                            int requested_nob, int niocount,
978                            size_t page_count, struct brw_page **pga)
979 {
980         int     i;
981         __u32   *remote_rcs;
982
983         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
984                                                   sizeof(*remote_rcs) *
985                                                   niocount);
986         if (remote_rcs == NULL) {
987                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
988                 return(-EPROTO);
989         }
990
991         /* return error if any niobuf was in error */
992         for (i = 0; i < niocount; i++) {
993                 if ((int)remote_rcs[i] < 0)
994                         return(remote_rcs[i]);
995
996                 if (remote_rcs[i] != 0) {
997                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
998                                 i, remote_rcs[i], req);
999                         return(-EPROTO);
1000                 }
1001         }
1002         if (req->rq_bulk != NULL &&
1003             req->rq_bulk->bd_nob_transferred != requested_nob) {
1004                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1005                        req->rq_bulk->bd_nob_transferred, requested_nob);
1006                 return(-EPROTO);
1007         }
1008
1009         return (0);
1010 }
1011
1012 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1013 {
1014         if (p1->flag != p2->flag) {
1015                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1016                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1017                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1018
1019                 /* warn if we try to combine flags that we don't know to be
1020                  * safe to combine */
1021                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1022                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1023                               "report this at https://jira.hpdd.intel.com/\n",
1024                               p1->flag, p2->flag);
1025                 }
1026                 return 0;
1027         }
1028
1029         return (p1->off + p1->count == p2->off);
1030 }
1031
1032 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1033                              struct brw_page **pga, int opc,
1034                              enum cksum_types cksum_type)
1035 {
1036         u32                             cksum;
1037         int                             i = 0;
1038         struct cfs_crypto_hash_desc     *hdesc;
1039         unsigned int                    bufsize;
1040         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1041
1042         LASSERT(pg_count > 0);
1043
1044         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1045         if (IS_ERR(hdesc)) {
1046                 CERROR("Unable to initialize checksum hash %s\n",
1047                        cfs_crypto_hash_name(cfs_alg));
1048                 return PTR_ERR(hdesc);
1049         }
1050
1051         while (nob > 0 && pg_count > 0) {
1052                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1053
1054                 /* corrupt the data before we compute the checksum, to
1055                  * simulate an OST->client data error */
1056                 if (i == 0 && opc == OST_READ &&
1057                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1058                         unsigned char *ptr = kmap(pga[i]->pg);
1059                         int off = pga[i]->off & ~PAGE_MASK;
1060
1061                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1062                         kunmap(pga[i]->pg);
1063                 }
1064                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1065                                             pga[i]->off & ~PAGE_MASK,
1066                                             count);
1067                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1068                                (int)(pga[i]->off & ~PAGE_MASK));
1069
1070                 nob -= pga[i]->count;
1071                 pg_count--;
1072                 i++;
1073         }
1074
1075         bufsize = sizeof(cksum);
1076         cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1077
1078         /* For sending we only compute the wrong checksum instead
1079          * of corrupting the data so it is still correct on a redo */
1080         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1081                 cksum++;
1082
1083         return cksum;
1084 }
1085
1086 static int
1087 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1088                      u32 page_count, struct brw_page **pga,
1089                      struct ptlrpc_request **reqp, int resend)
1090 {
1091         struct ptlrpc_request   *req;
1092         struct ptlrpc_bulk_desc *desc;
1093         struct ost_body         *body;
1094         struct obd_ioobj        *ioobj;
1095         struct niobuf_remote    *niobuf;
1096         int niocount, i, requested_nob, opc, rc, short_io_size;
1097         struct osc_brw_async_args *aa;
1098         struct req_capsule      *pill;
1099         struct brw_page *pg_prev;
1100         void *short_io_buf;
1101
1102         ENTRY;
1103         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1104                 RETURN(-ENOMEM); /* Recoverable */
1105         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1106                 RETURN(-EINVAL); /* Fatal */
1107
1108         if ((cmd & OBD_BRW_WRITE) != 0) {
1109                 opc = OST_WRITE;
1110                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1111                                                 osc_rq_pool,
1112                                                 &RQF_OST_BRW_WRITE);
1113         } else {
1114                 opc = OST_READ;
1115                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1116         }
1117         if (req == NULL)
1118                 RETURN(-ENOMEM);
1119
1120         for (niocount = i = 1; i < page_count; i++) {
1121                 if (!can_merge_pages(pga[i - 1], pga[i]))
1122                         niocount++;
1123         }
1124
1125         pill = &req->rq_pill;
1126         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1127                              sizeof(*ioobj));
1128         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1129                              niocount * sizeof(*niobuf));
1130
1131         for (i = 0; i < page_count; i++)
1132                 short_io_size += pga[i]->count;
1133
1134         /* Check if we can do a short io. */
1135         if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1136             imp_connect_shortio(cli->cl_import)))
1137                 short_io_size = 0;
1138
1139         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1140                              opc == OST_READ ? 0 : short_io_size);
1141         if (opc == OST_READ)
1142                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1143                                      short_io_size);
1144
1145         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1146         if (rc) {
1147                 ptlrpc_request_free(req);
1148                 RETURN(rc);
1149         }
1150         osc_set_io_portal(req);
1151
1152         ptlrpc_at_set_req_timeout(req);
1153         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1154          * retry logic */
1155         req->rq_no_retry_einprogress = 1;
1156
1157         if (short_io_size != 0) {
1158                 desc = NULL;
1159                 short_io_buf = NULL;
1160                 goto no_bulk;
1161         }
1162
1163         desc = ptlrpc_prep_bulk_imp(req, page_count,
1164                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1165                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1166                         PTLRPC_BULK_PUT_SINK) |
1167                         PTLRPC_BULK_BUF_KIOV,
1168                 OST_BULK_PORTAL,
1169                 &ptlrpc_bulk_kiov_pin_ops);
1170
1171         if (desc == NULL)
1172                 GOTO(out, rc = -ENOMEM);
1173         /* NB request now owns desc and will free it when it gets freed */
1174 no_bulk:
1175         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1176         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1177         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1178         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1179
1180         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1181
1182         obdo_to_ioobj(oa, ioobj);
1183         ioobj->ioo_bufcnt = niocount;
1184         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1185          * that might be send for this request.  The actual number is decided
1186          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1187          * "max - 1" for old client compatibility sending "0", and also so the
1188          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1189         if (desc != NULL)
1190                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1191         else /* short io */
1192                 ioobj_max_brw_set(ioobj, 0);
1193
1194         if (short_io_size != 0) {
1195                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1196                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1197                         body->oa.o_flags = 0;
1198                 }
1199                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1200                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1201                        short_io_size);
1202                 if (opc == OST_WRITE) {
1203                         short_io_buf = req_capsule_client_get(pill,
1204                                                               &RMF_SHORT_IO);
1205                         LASSERT(short_io_buf != NULL);
1206                 }
1207         }
1208
1209         LASSERT(page_count > 0);
1210         pg_prev = pga[0];
1211         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1212                 struct brw_page *pg = pga[i];
1213                 int poff = pg->off & ~PAGE_MASK;
1214
1215                 LASSERT(pg->count > 0);
1216                 /* make sure there is no gap in the middle of page array */
1217                 LASSERTF(page_count == 1 ||
1218                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1219                           ergo(i > 0 && i < page_count - 1,
1220                                poff == 0 && pg->count == PAGE_SIZE)   &&
1221                           ergo(i == page_count - 1, poff == 0)),
1222                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1223                          i, page_count, pg, pg->off, pg->count);
1224                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1225                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1226                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1227                          i, page_count,
1228                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1229                          pg_prev->pg, page_private(pg_prev->pg),
1230                          pg_prev->pg->index, pg_prev->off);
1231                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1232                         (pg->flag & OBD_BRW_SRVLOCK));
1233                 if (short_io_size != 0 && opc == OST_WRITE) {
1234                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1235
1236                         LASSERT(short_io_size >= requested_nob + pg->count);
1237                         memcpy(short_io_buf + requested_nob,
1238                                ptr + poff,
1239                                pg->count);
1240                         ll_kunmap_atomic(ptr, KM_USER0);
1241                 } else if (short_io_size == 0) {
1242                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1243                                                          pg->count);
1244                 }
1245                 requested_nob += pg->count;
1246
1247                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1248                         niobuf--;
1249                         niobuf->rnb_len += pg->count;
1250                 } else {
1251                         niobuf->rnb_offset = pg->off;
1252                         niobuf->rnb_len    = pg->count;
1253                         niobuf->rnb_flags  = pg->flag;
1254                 }
1255                 pg_prev = pg;
1256         }
1257
1258         LASSERTF((void *)(niobuf - niocount) ==
1259                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1260                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1261                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1262
1263         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1264         if (resend) {
1265                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1267                         body->oa.o_flags = 0;
1268                 }
1269                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1270         }
1271
1272         if (osc_should_shrink_grant(cli))
1273                 osc_shrink_grant_local(cli, &body->oa);
1274
1275         /* size[REQ_REC_OFF] still sizeof (*body) */
1276         if (opc == OST_WRITE) {
1277                 if (cli->cl_checksum &&
1278                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1279                         /* store cl_cksum_type in a local variable since
1280                          * it can be changed via lprocfs */
1281                         enum cksum_types cksum_type = cli->cl_cksum_type;
1282
1283                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1284                                 body->oa.o_flags = 0;
1285
1286                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1287                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1288                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1289                                                              page_count, pga,
1290                                                              OST_WRITE,
1291                                                              cksum_type);
1292                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1293                                body->oa.o_cksum);
1294                         /* save this in 'oa', too, for later checking */
1295                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                         oa->o_flags |= cksum_type_pack(cksum_type);
1297                 } else {
1298                         /* clear out the checksum flag, in case this is a
1299                          * resend but cl_checksum is no longer set. b=11238 */
1300                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1301                 }
1302                 oa->o_cksum = body->oa.o_cksum;
1303                 /* 1 RC per niobuf */
1304                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1305                                      sizeof(__u32) * niocount);
1306         } else {
1307                 if (cli->cl_checksum &&
1308                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1309                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1310                                 body->oa.o_flags = 0;
1311                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1312                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1313                 }
1314
1315                 /* Client cksum has been already copied to wire obdo in previous
1316                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1317                  * resent due to cksum error, this will allow Server to
1318                  * check+dump pages on its side */
1319         }
1320         ptlrpc_request_set_replen(req);
1321
1322         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1323         aa = ptlrpc_req_async_args(req);
1324         aa->aa_oa = oa;
1325         aa->aa_requested_nob = requested_nob;
1326         aa->aa_nio_count = niocount;
1327         aa->aa_page_count = page_count;
1328         aa->aa_resends = 0;
1329         aa->aa_ppga = pga;
1330         aa->aa_cli = cli;
1331         INIT_LIST_HEAD(&aa->aa_oaps);
1332
1333         *reqp = req;
1334         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1335         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1336                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1337                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1338         RETURN(0);
1339
1340  out:
1341         ptlrpc_req_finished(req);
1342         RETURN(rc);
1343 }
1344
1345 char dbgcksum_file_name[PATH_MAX];
1346
1347 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1348                                 struct brw_page **pga, __u32 server_cksum,
1349                                 __u32 client_cksum)
1350 {
1351         struct file *filp;
1352         int rc, i;
1353         unsigned int len;
1354         char *buf;
1355         mm_segment_t oldfs;
1356
1357         /* will only keep dump of pages on first error for the same range in
1358          * file/fid, not during the resends/retries. */
1359         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1360                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1361                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1362                   libcfs_debug_file_path_arr :
1363                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1364                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1365                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1366                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1367                  pga[0]->off,
1368                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1369                  client_cksum, server_cksum);
1370         filp = filp_open(dbgcksum_file_name,
1371                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1372         if (IS_ERR(filp)) {
1373                 rc = PTR_ERR(filp);
1374                 if (rc == -EEXIST)
1375                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1376                                "checksum error: rc = %d\n", dbgcksum_file_name,
1377                                rc);
1378                 else
1379                         CERROR("%s: can't open to dump pages with checksum "
1380                                "error: rc = %d\n", dbgcksum_file_name, rc);
1381                 return;
1382         }
1383
1384         oldfs = get_fs();
1385         set_fs(KERNEL_DS);
1386         for (i = 0; i < page_count; i++) {
1387                 len = pga[i]->count;
1388                 buf = kmap(pga[i]->pg);
1389                 while (len != 0) {
1390                         rc = vfs_write(filp, (__force const char __user *)buf,
1391                                        len, &filp->f_pos);
1392                         if (rc < 0) {
1393                                 CERROR("%s: wanted to write %u but got %d "
1394                                        "error\n", dbgcksum_file_name, len, rc);
1395                                 break;
1396                         }
1397                         len -= rc;
1398                         buf += rc;
1399                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1400                                dbgcksum_file_name, rc);
1401                 }
1402                 kunmap(pga[i]->pg);
1403         }
1404         set_fs(oldfs);
1405
1406         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1407         if (rc)
1408                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1409         filp_close(filp, NULL);
1410         return;
1411 }
1412
1413 static int
1414 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum,
1416                                 struct osc_brw_async_args *aa)
1417 {
1418         __u32 new_cksum;
1419         char *msg;
1420         enum cksum_types cksum_type;
1421
1422         if (server_cksum == client_cksum) {
1423                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1424                 return 0;
1425         }
1426
1427         if (aa->aa_cli->cl_checksum_dump)
1428                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1429                                     server_cksum, client_cksum);
1430
1431         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1432                                        oa->o_flags : 0);
1433         new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1434                                       aa->aa_ppga, OST_WRITE, cksum_type);
1435
1436         if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1437                 msg = "the server did not use the checksum type specified in "
1438                       "the original request - likely a protocol problem";
1439         else if (new_cksum == server_cksum)
1440                 msg = "changed on the client after we checksummed it - "
1441                       "likely false positive due to mmap IO (bug 11742)";
1442         else if (new_cksum == client_cksum)
1443                 msg = "changed in transit before arrival at OST";
1444         else
1445                 msg = "changed in transit AND doesn't match the original - "
1446                       "likely false positive due to mmap IO (bug 11742)";
1447
1448         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1449                            DFID " object "DOSTID" extent [%llu-%llu], original "
1450                            "client csum %x (type %x), server csum %x (type %x),"
1451                            " client csum now %x\n",
1452                            aa->aa_cli->cl_import->imp_obd->obd_name,
1453                            msg, libcfs_nid2str(peer->nid),
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1456                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1457                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1458                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1459                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1460                            client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1461                            server_cksum, cksum_type, new_cksum);
1462         return 1;
1463 }
1464
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1467 {
1468         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469         const struct lnet_process_id *peer =
1470                         &req->rq_import->imp_connection->c_peer;
1471         struct client_obd *cli = aa->aa_cli;
1472         struct ost_body *body;
1473         u32 client_cksum = 0;
1474         ENTRY;
1475
1476         if (rc < 0 && rc != -EDQUOT) {
1477                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1478                 RETURN(rc);
1479         }
1480
1481         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1482         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1483         if (body == NULL) {
1484                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1485                 RETURN(-EPROTO);
1486         }
1487
1488         /* set/clear over quota flag for a uid/gid/projid */
1489         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1490             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1491                 unsigned qid[LL_MAXQUOTAS] = {
1492                                          body->oa.o_uid, body->oa.o_gid,
1493                                          body->oa.o_projid };
1494                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1495                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1496                        body->oa.o_valid, body->oa.o_flags);
1497                        osc_quota_setdq(cli, qid, body->oa.o_valid,
1498                                        body->oa.o_flags);
1499         }
1500
1501         osc_update_grant(cli, body);
1502
1503         if (rc < 0)
1504                 RETURN(rc);
1505
1506         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         RETURN(-EPROTO);
1513                 }
1514
1515                 if (req->rq_bulk != NULL &&
1516                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         RETURN(-EAGAIN);
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa))
1522                         RETURN(-EAGAIN);
1523
1524                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1525                                      aa->aa_page_count, aa->aa_ppga);
1526                 GOTO(out, rc);
1527         }
1528
1529         /* The rest of this function executes only for OST_READs */
1530
1531         if (req->rq_bulk == NULL) {
1532                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1533                                           RCL_SERVER);
1534                 LASSERT(rc == req->rq_status);
1535         } else {
1536                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1537                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1538         }
1539         if (rc < 0)
1540                 GOTO(out, rc = -EAGAIN);
1541
1542         if (rc > aa->aa_requested_nob) {
1543                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1544                        aa->aa_requested_nob);
1545                 RETURN(-EPROTO);
1546         }
1547
1548         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1549                 CERROR ("Unexpected rc %d (%d transferred)\n",
1550                         rc, req->rq_bulk->bd_nob_transferred);
1551                 return (-EPROTO);
1552         }
1553
1554         if (req->rq_bulk == NULL) {
1555                 /* short io */
1556                 int nob, pg_count, i = 0;
1557                 unsigned char *buf;
1558
1559                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1560                 pg_count = aa->aa_page_count;
1561                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1562                                                    rc);
1563                 nob = rc;
1564                 while (nob > 0 && pg_count > 0) {
1565                         unsigned char *ptr;
1566                         int count = aa->aa_ppga[i]->count > nob ?
1567                                     nob : aa->aa_ppga[i]->count;
1568
1569                         CDEBUG(D_CACHE, "page %p count %d\n",
1570                                aa->aa_ppga[i]->pg, count);
1571                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1572                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1573                                count);
1574                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1575
1576                         buf += count;
1577                         nob -= count;
1578                         i++;
1579                         pg_count--;
1580                 }
1581         }
1582
1583         if (rc < aa->aa_requested_nob)
1584                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1585
1586         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1587                 static int cksum_counter;
1588                 u32        server_cksum = body->oa.o_cksum;
1589                 char      *via = "";
1590                 char      *router = "";
1591                 enum cksum_types cksum_type;
1592
1593                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1594                                                body->oa.o_flags : 0);
1595                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1596                                                  aa->aa_ppga, OST_READ,
1597                                                  cksum_type);
1598
1599                 if (req->rq_bulk != NULL &&
1600                     peer->nid != req->rq_bulk->bd_sender) {
1601                         via = " via ";
1602                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1603                 }
1604
1605                 if (server_cksum != client_cksum) {
1606                         struct ost_body *clbody;
1607                         u32 page_count = aa->aa_page_count;
1608
1609                         clbody = req_capsule_client_get(&req->rq_pill,
1610                                                         &RMF_OST_BODY);
1611                         if (cli->cl_checksum_dump)
1612                                 dump_all_bulk_pages(&clbody->oa, page_count,
1613                                                     aa->aa_ppga, server_cksum,
1614                                                     client_cksum);
1615
1616                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1617                                            "%s%s%s inode "DFID" object "DOSTID
1618                                            " extent [%llu-%llu], client %x, "
1619                                            "server %x, cksum_type %x\n",
1620                                            req->rq_import->imp_obd->obd_name,
1621                                            libcfs_nid2str(peer->nid),
1622                                            via, router,
1623                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1624                                                 clbody->oa.o_parent_seq : 0ULL,
1625                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1626                                                 clbody->oa.o_parent_oid : 0,
1627                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1628                                                 clbody->oa.o_parent_ver : 0,
1629                                            POSTID(&body->oa.o_oi),
1630                                            aa->aa_ppga[0]->off,
1631                                            aa->aa_ppga[page_count-1]->off +
1632                                            aa->aa_ppga[page_count-1]->count - 1,
1633                                            client_cksum, server_cksum,
1634                                            cksum_type);
1635                         cksum_counter = 0;
1636                         aa->aa_oa->o_cksum = client_cksum;
1637                         rc = -EAGAIN;
1638                 } else {
1639                         cksum_counter++;
1640                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1641                         rc = 0;
1642                 }
1643         } else if (unlikely(client_cksum)) {
1644                 static int cksum_missed;
1645
1646                 cksum_missed++;
1647                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1648                         CERROR("Checksum %u requested from %s but not sent\n",
1649                                cksum_missed, libcfs_nid2str(peer->nid));
1650         } else {
1651                 rc = 0;
1652         }
1653 out:
1654         if (rc >= 0)
1655                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1656                                      aa->aa_oa, &body->oa);
1657
1658         RETURN(rc);
1659 }
1660
1661 static int osc_brw_redo_request(struct ptlrpc_request *request,
1662                                 struct osc_brw_async_args *aa, int rc)
1663 {
1664         struct ptlrpc_request *new_req;
1665         struct osc_brw_async_args *new_aa;
1666         struct osc_async_page *oap;
1667         ENTRY;
1668
1669         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1670                   "redo for recoverable error %d", rc);
1671
1672         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1673                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1674                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1675                                   aa->aa_ppga, &new_req, 1);
1676         if (rc)
1677                 RETURN(rc);
1678
1679         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request != NULL) {
1681                         LASSERTF(request == oap->oap_request,
1682                                  "request %p != oap_request %p\n",
1683                                  request, oap->oap_request);
1684                         if (oap->oap_interrupted) {
1685                                 ptlrpc_req_finished(new_req);
1686                                 RETURN(-EINTR);
1687                         }
1688                 }
1689         }
1690         /* New request takes over pga and oaps from old request.
1691          * Note that copying a list_head doesn't work, need to move it... */
1692         aa->aa_resends++;
1693         new_req->rq_interpret_reply = request->rq_interpret_reply;
1694         new_req->rq_async_args = request->rq_async_args;
1695         new_req->rq_commit_cb = request->rq_commit_cb;
1696         /* cap resend delay to the current request timeout, this is similar to
1697          * what ptlrpc does (see after_reply()) */
1698         if (aa->aa_resends > new_req->rq_timeout)
1699                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1700         else
1701                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1702         new_req->rq_generation_set = 1;
1703         new_req->rq_import_generation = request->rq_import_generation;
1704
1705         new_aa = ptlrpc_req_async_args(new_req);
1706
1707         INIT_LIST_HEAD(&new_aa->aa_oaps);
1708         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1709         INIT_LIST_HEAD(&new_aa->aa_exts);
1710         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1711         new_aa->aa_resends = aa->aa_resends;
1712
1713         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1714                 if (oap->oap_request) {
1715                         ptlrpc_req_finished(oap->oap_request);
1716                         oap->oap_request = ptlrpc_request_addref(new_req);
1717                 }
1718         }
1719
1720         /* XXX: This code will run into problem if we're going to support
1721          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1722          * and wait for all of them to be finished. We should inherit request
1723          * set from old request. */
1724         ptlrpcd_add_req(new_req);
1725
1726         DEBUG_REQ(D_INFO, new_req, "new request");
1727         RETURN(0);
1728 }
1729
1730 /*
1731  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1732  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1733  * fine for our small page arrays and doesn't require allocation.  its an
1734  * insertion sort that swaps elements that are strides apart, shrinking the
1735  * stride down until its '1' and the array is sorted.
1736  */
1737 static void sort_brw_pages(struct brw_page **array, int num)
1738 {
1739         int stride, i, j;
1740         struct brw_page *tmp;
1741
1742         if (num == 1)
1743                 return;
1744         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1745                 ;
1746
1747         do {
1748                 stride /= 3;
1749                 for (i = stride ; i < num ; i++) {
1750                         tmp = array[i];
1751                         j = i;
1752                         while (j >= stride && array[j - stride]->off > tmp->off) {
1753                                 array[j] = array[j - stride];
1754                                 j -= stride;
1755                         }
1756                         array[j] = tmp;
1757                 }
1758         } while (stride > 1);
1759 }
1760
1761 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1762 {
1763         LASSERT(ppga != NULL);
1764         OBD_FREE(ppga, sizeof(*ppga) * count);
1765 }
1766
1767 static int brw_interpret(const struct lu_env *env,
1768                          struct ptlrpc_request *req, void *data, int rc)
1769 {
1770         struct osc_brw_async_args *aa = data;
1771         struct osc_extent *ext;
1772         struct osc_extent *tmp;
1773         struct client_obd *cli = aa->aa_cli;
1774         unsigned long           transferred = 0;
1775         ENTRY;
1776
1777         rc = osc_brw_fini_request(req, rc);
1778         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1779         /* When server return -EINPROGRESS, client should always retry
1780          * regardless of the number of times the bulk was resent already. */
1781         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1782                 if (req->rq_import_generation !=
1783                     req->rq_import->imp_generation) {
1784                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1785                                ""DOSTID", rc = %d.\n",
1786                                req->rq_import->imp_obd->obd_name,
1787                                POSTID(&aa->aa_oa->o_oi), rc);
1788                 } else if (rc == -EINPROGRESS ||
1789                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1790                         rc = osc_brw_redo_request(req, aa, rc);
1791                 } else {
1792                         CERROR("%s: too many resent retries for object: "
1793                                "%llu:%llu, rc = %d.\n",
1794                                req->rq_import->imp_obd->obd_name,
1795                                POSTID(&aa->aa_oa->o_oi), rc);
1796                 }
1797
1798                 if (rc == 0)
1799                         RETURN(0);
1800                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1801                         rc = -EIO;
1802         }
1803
1804         if (rc == 0) {
1805                 struct obdo *oa = aa->aa_oa;
1806                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1807                 unsigned long valid = 0;
1808                 struct cl_object *obj;
1809                 struct osc_async_page *last;
1810
1811                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1812                 obj = osc2cl(last->oap_obj);
1813
1814                 cl_object_attr_lock(obj);
1815                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1816                         attr->cat_blocks = oa->o_blocks;
1817                         valid |= CAT_BLOCKS;
1818                 }
1819                 if (oa->o_valid & OBD_MD_FLMTIME) {
1820                         attr->cat_mtime = oa->o_mtime;
1821                         valid |= CAT_MTIME;
1822                 }
1823                 if (oa->o_valid & OBD_MD_FLATIME) {
1824                         attr->cat_atime = oa->o_atime;
1825                         valid |= CAT_ATIME;
1826                 }
1827                 if (oa->o_valid & OBD_MD_FLCTIME) {
1828                         attr->cat_ctime = oa->o_ctime;
1829                         valid |= CAT_CTIME;
1830                 }
1831
1832                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1833                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1834                         loff_t last_off = last->oap_count + last->oap_obj_off +
1835                                 last->oap_page_off;
1836
1837                         /* Change file size if this is an out of quota or
1838                          * direct IO write and it extends the file size */
1839                         if (loi->loi_lvb.lvb_size < last_off) {
1840                                 attr->cat_size = last_off;
1841                                 valid |= CAT_SIZE;
1842                         }
1843                         /* Extend KMS if it's not a lockless write */
1844                         if (loi->loi_kms < last_off &&
1845                             oap2osc_page(last)->ops_srvlock == 0) {
1846                                 attr->cat_kms = last_off;
1847                                 valid |= CAT_KMS;
1848                         }
1849                 }
1850
1851                 if (valid != 0)
1852                         cl_object_attr_update(env, obj, attr, valid);
1853                 cl_object_attr_unlock(obj);
1854         }
1855         OBDO_FREE(aa->aa_oa);
1856
1857         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1858                 osc_inc_unstable_pages(req);
1859
1860         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1861                 list_del_init(&ext->oe_link);
1862                 osc_extent_finish(env, ext, 1,
1863                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
1864         }
1865         LASSERT(list_empty(&aa->aa_exts));
1866         LASSERT(list_empty(&aa->aa_oaps));
1867
1868         transferred = (req->rq_bulk == NULL ? /* short io */
1869                        aa->aa_requested_nob :
1870                        req->rq_bulk->bd_nob_transferred);
1871
1872         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1873         ptlrpc_lprocfs_brw(req, transferred);
1874
1875         spin_lock(&cli->cl_loi_list_lock);
1876         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1877          * is called so we know whether to go to sync BRWs or wait for more
1878          * RPCs to complete */
1879         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1880                 cli->cl_w_in_flight--;
1881         else
1882                 cli->cl_r_in_flight--;
1883         osc_wake_cache_waiters(cli);
1884         spin_unlock(&cli->cl_loi_list_lock);
1885
1886         osc_io_unplug(env, cli, NULL);
1887         RETURN(rc);
1888 }
1889
1890 static void brw_commit(struct ptlrpc_request *req)
1891 {
1892         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1893          * this called via the rq_commit_cb, I need to ensure
1894          * osc_dec_unstable_pages is still called. Otherwise unstable
1895          * pages may be leaked. */
1896         spin_lock(&req->rq_lock);
1897         if (likely(req->rq_unstable)) {
1898                 req->rq_unstable = 0;
1899                 spin_unlock(&req->rq_lock);
1900
1901                 osc_dec_unstable_pages(req);
1902         } else {
1903                 req->rq_committed = 1;
1904                 spin_unlock(&req->rq_lock);
1905         }
1906 }
1907
1908 /**
1909  * Build an RPC by the list of extent @ext_list. The caller must ensure
1910  * that the total pages in this list are NOT over max pages per RPC.
1911  * Extents in the list must be in OES_RPC state.
1912  */
1913 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1914                   struct list_head *ext_list, int cmd)
1915 {
1916         struct ptlrpc_request           *req = NULL;
1917         struct osc_extent               *ext;
1918         struct brw_page                 **pga = NULL;
1919         struct osc_brw_async_args       *aa = NULL;
1920         struct obdo                     *oa = NULL;
1921         struct osc_async_page           *oap;
1922         struct osc_object               *obj = NULL;
1923         struct cl_req_attr              *crattr = NULL;
1924         loff_t                          starting_offset = OBD_OBJECT_EOF;
1925         loff_t                          ending_offset = 0;
1926         int                             mpflag = 0;
1927         int                             mem_tight = 0;
1928         int                             page_count = 0;
1929         bool                            soft_sync = false;
1930         bool                            interrupted = false;
1931         bool                            ndelay = false;
1932         int                             i;
1933         int                             grant = 0;
1934         int                             rc;
1935         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1936         struct ost_body                 *body;
1937         ENTRY;
1938         LASSERT(!list_empty(ext_list));
1939
1940         /* add pages into rpc_list to build BRW rpc */
1941         list_for_each_entry(ext, ext_list, oe_link) {
1942                 LASSERT(ext->oe_state == OES_RPC);
1943                 mem_tight |= ext->oe_memalloc;
1944                 grant += ext->oe_grants;
1945                 page_count += ext->oe_nr_pages;
1946                 if (obj == NULL)
1947                         obj = ext->oe_obj;
1948         }
1949
1950         soft_sync = osc_over_unstable_soft_limit(cli);
1951         if (mem_tight)
1952                 mpflag = cfs_memory_pressure_get_and_set();
1953
1954         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1955         if (pga == NULL)
1956                 GOTO(out, rc = -ENOMEM);
1957
1958         OBDO_ALLOC(oa);
1959         if (oa == NULL)
1960                 GOTO(out, rc = -ENOMEM);
1961
1962         i = 0;
1963         list_for_each_entry(ext, ext_list, oe_link) {
1964                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1965                         if (mem_tight)
1966                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1967                         if (soft_sync)
1968                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1969                         pga[i] = &oap->oap_brw_page;
1970                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1971                         i++;
1972
1973                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1974                         if (starting_offset == OBD_OBJECT_EOF ||
1975                             starting_offset > oap->oap_obj_off)
1976                                 starting_offset = oap->oap_obj_off;
1977                         else
1978                                 LASSERT(oap->oap_page_off == 0);
1979                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1980                                 ending_offset = oap->oap_obj_off +
1981                                                 oap->oap_count;
1982                         else
1983                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1984                                         PAGE_SIZE);
1985                         if (oap->oap_interrupted)
1986                                 interrupted = true;
1987                 }
1988                 if (ext->oe_ndelay)
1989                         ndelay = true;
1990         }
1991
1992         /* first page in the list */
1993         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1994
1995         crattr = &osc_env_info(env)->oti_req_attr;
1996         memset(crattr, 0, sizeof(*crattr));
1997         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1998         crattr->cra_flags = ~0ULL;
1999         crattr->cra_page = oap2cl_page(oap);
2000         crattr->cra_oa = oa;
2001         cl_req_attr_set(env, osc2cl(obj), crattr);
2002
2003         if (cmd == OBD_BRW_WRITE)
2004                 oa->o_grant_used = grant;
2005
2006         sort_brw_pages(pga, page_count);
2007         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2008         if (rc != 0) {
2009                 CERROR("prep_req failed: %d\n", rc);
2010                 GOTO(out, rc);
2011         }
2012
2013         req->rq_commit_cb = brw_commit;
2014         req->rq_interpret_reply = brw_interpret;
2015         req->rq_memalloc = mem_tight != 0;
2016         oap->oap_request = ptlrpc_request_addref(req);
2017         if (interrupted && !req->rq_intr)
2018                 ptlrpc_mark_interrupted(req);
2019         if (ndelay) {
2020                 req->rq_no_resend = req->rq_no_delay = 1;
2021                 /* probably set a shorter timeout value.
2022                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2023                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2024         }
2025
2026         /* Need to update the timestamps after the request is built in case
2027          * we race with setattr (locally or in queue at OST).  If OST gets
2028          * later setattr before earlier BRW (as determined by the request xid),
2029          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2030          * way to do this in a single call.  bug 10150 */
2031         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2032         crattr->cra_oa = &body->oa;
2033         crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
2034         cl_req_attr_set(env, osc2cl(obj), crattr);
2035         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2036
2037         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2038         aa = ptlrpc_req_async_args(req);
2039         INIT_LIST_HEAD(&aa->aa_oaps);
2040         list_splice_init(&rpc_list, &aa->aa_oaps);
2041         INIT_LIST_HEAD(&aa->aa_exts);
2042         list_splice_init(ext_list, &aa->aa_exts);
2043
2044         spin_lock(&cli->cl_loi_list_lock);
2045         starting_offset >>= PAGE_SHIFT;
2046         if (cmd == OBD_BRW_READ) {
2047                 cli->cl_r_in_flight++;
2048                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2049                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2050                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2051                                       starting_offset + 1);
2052         } else {
2053                 cli->cl_w_in_flight++;
2054                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2055                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2056                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2057                                       starting_offset + 1);
2058         }
2059         spin_unlock(&cli->cl_loi_list_lock);
2060
2061         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2062                   page_count, aa, cli->cl_r_in_flight,
2063                   cli->cl_w_in_flight);
2064         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2065
2066         ptlrpcd_add_req(req);
2067         rc = 0;
2068         EXIT;
2069
2070 out:
2071         if (mem_tight != 0)
2072                 cfs_memory_pressure_restore(mpflag);
2073
2074         if (rc != 0) {
2075                 LASSERT(req == NULL);
2076
2077                 if (oa)
2078                         OBDO_FREE(oa);
2079                 if (pga)
2080                         OBD_FREE(pga, sizeof(*pga) * page_count);
2081                 /* this should happen rarely and is pretty bad, it makes the
2082                  * pending list not follow the dirty order */
2083                 while (!list_empty(ext_list)) {
2084                         ext = list_entry(ext_list->next, struct osc_extent,
2085                                          oe_link);
2086                         list_del_init(&ext->oe_link);
2087                         osc_extent_finish(env, ext, 0, rc);
2088                 }
2089         }
2090         RETURN(rc);
2091 }
2092
2093 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2094 {
2095         int set = 0;
2096
2097         LASSERT(lock != NULL);
2098
2099         lock_res_and_lock(lock);
2100
2101         if (lock->l_ast_data == NULL)
2102                 lock->l_ast_data = data;
2103         if (lock->l_ast_data == data)
2104                 set = 1;
2105
2106         unlock_res_and_lock(lock);
2107
2108         return set;
2109 }
2110
2111 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2112                      void *cookie, struct lustre_handle *lockh,
2113                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2114                      int errcode)
2115 {
2116         bool intent = *flags & LDLM_FL_HAS_INTENT;
2117         int rc;
2118         ENTRY;
2119
2120         /* The request was created before ldlm_cli_enqueue call. */
2121         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2122                 struct ldlm_reply *rep;
2123
2124                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2125                 LASSERT(rep != NULL);
2126
2127                 rep->lock_policy_res1 =
2128                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2129                 if (rep->lock_policy_res1)
2130                         errcode = rep->lock_policy_res1;
2131                 if (!speculative)
2132                         *flags |= LDLM_FL_LVB_READY;
2133         } else if (errcode == ELDLM_OK) {
2134                 *flags |= LDLM_FL_LVB_READY;
2135         }
2136
2137         /* Call the update callback. */
2138         rc = (*upcall)(cookie, lockh, errcode);
2139
2140         /* release the reference taken in ldlm_cli_enqueue() */
2141         if (errcode == ELDLM_LOCK_MATCHED)
2142                 errcode = ELDLM_OK;
2143         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2144                 ldlm_lock_decref(lockh, mode);
2145
2146         RETURN(rc);
2147 }
2148
2149 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2150                           struct osc_enqueue_args *aa, int rc)
2151 {
2152         struct ldlm_lock *lock;
2153         struct lustre_handle *lockh = &aa->oa_lockh;
2154         enum ldlm_mode mode = aa->oa_mode;
2155         struct ost_lvb *lvb = aa->oa_lvb;
2156         __u32 lvb_len = sizeof(*lvb);
2157         __u64 flags = 0;
2158
2159         ENTRY;
2160
2161         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2162          * be valid. */
2163         lock = ldlm_handle2lock(lockh);
2164         LASSERTF(lock != NULL,
2165                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2166                  lockh->cookie, req, aa);
2167
2168         /* Take an additional reference so that a blocking AST that
2169          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2170          * to arrive after an upcall has been executed by
2171          * osc_enqueue_fini(). */
2172         ldlm_lock_addref(lockh, mode);
2173
2174         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2175         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2176
2177         /* Let CP AST to grant the lock first. */
2178         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2179
2180         if (aa->oa_speculative) {
2181                 LASSERT(aa->oa_lvb == NULL);
2182                 LASSERT(aa->oa_flags == NULL);
2183                 aa->oa_flags = &flags;
2184         }
2185
2186         /* Complete obtaining the lock procedure. */
2187         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2188                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2189                                    lockh, rc);
2190         /* Complete osc stuff. */
2191         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2192                               aa->oa_flags, aa->oa_speculative, rc);
2193
2194         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2195
2196         ldlm_lock_decref(lockh, mode);
2197         LDLM_LOCK_PUT(lock);
2198         RETURN(rc);
2199 }
2200
2201 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2202
2203 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2204  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2205  * other synchronous requests, however keeping some locks and trying to obtain
2206  * others may take a considerable amount of time in a case of ost failure; and
2207  * when other sync requests do not get released lock from a client, the client
2208  * is evicted from the cluster -- such scenarious make the life difficult, so
2209  * release locks just after they are obtained. */
2210 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2211                      __u64 *flags, union ldlm_policy_data *policy,
2212                      struct ost_lvb *lvb, int kms_valid,
2213                      osc_enqueue_upcall_f upcall, void *cookie,
2214                      struct ldlm_enqueue_info *einfo,
2215                      struct ptlrpc_request_set *rqset, int async,
2216                      bool speculative)
2217 {
2218         struct obd_device *obd = exp->exp_obd;
2219         struct lustre_handle lockh = { 0 };
2220         struct ptlrpc_request *req = NULL;
2221         int intent = *flags & LDLM_FL_HAS_INTENT;
2222         __u64 match_flags = *flags;
2223         enum ldlm_mode mode;
2224         int rc;
2225         ENTRY;
2226
2227         /* Filesystem lock extents are extended to page boundaries so that
2228          * dealing with the page cache is a little smoother.  */
2229         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2230         policy->l_extent.end |= ~PAGE_MASK;
2231
2232         /*
2233          * kms is not valid when either object is completely fresh (so that no
2234          * locks are cached), or object was evicted. In the latter case cached
2235          * lock cannot be used, because it would prime inode state with
2236          * potentially stale LVB.
2237          */
2238         if (!kms_valid)
2239                 goto no_match;
2240
2241         /* Next, search for already existing extent locks that will cover us */
2242         /* If we're trying to read, we also search for an existing PW lock.  The
2243          * VFS and page cache already protect us locally, so lots of readers/
2244          * writers can share a single PW lock.
2245          *
2246          * There are problems with conversion deadlocks, so instead of
2247          * converting a read lock to a write lock, we'll just enqueue a new
2248          * one.
2249          *
2250          * At some point we should cancel the read lock instead of making them
2251          * send us a blocking callback, but there are problems with canceling
2252          * locks out from other users right now, too. */
2253         mode = einfo->ei_mode;
2254         if (einfo->ei_mode == LCK_PR)
2255                 mode |= LCK_PW;
2256         /* Normal lock requests must wait for the LVB to be ready before
2257          * matching a lock; speculative lock requests do not need to,
2258          * because they will not actually use the lock. */
2259         if (!speculative)
2260                 match_flags |= LDLM_FL_LVB_READY;
2261         if (intent != 0)
2262                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2263         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2264                                einfo->ei_type, policy, mode, &lockh, 0);
2265         if (mode) {
2266                 struct ldlm_lock *matched;
2267
2268                 if (*flags & LDLM_FL_TEST_LOCK)
2269                         RETURN(ELDLM_OK);
2270
2271                 matched = ldlm_handle2lock(&lockh);
2272                 if (speculative) {
2273                         /* This DLM lock request is speculative, and does not
2274                          * have an associated IO request. Therefore if there
2275                          * is already a DLM lock, it wll just inform the
2276                          * caller to cancel the request for this stripe.*/
2277                         lock_res_and_lock(matched);
2278                         if (ldlm_extent_equal(&policy->l_extent,
2279                             &matched->l_policy_data.l_extent))
2280                                 rc = -EEXIST;
2281                         else
2282                                 rc = -ECANCELED;
2283                         unlock_res_and_lock(matched);
2284
2285                         ldlm_lock_decref(&lockh, mode);
2286                         LDLM_LOCK_PUT(matched);
2287                         RETURN(rc);
2288                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2289                         *flags |= LDLM_FL_LVB_READY;
2290
2291                         /* We already have a lock, and it's referenced. */
2292                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2293
2294                         ldlm_lock_decref(&lockh, mode);
2295                         LDLM_LOCK_PUT(matched);
2296                         RETURN(ELDLM_OK);
2297                 } else {
2298                         ldlm_lock_decref(&lockh, mode);
2299                         LDLM_LOCK_PUT(matched);
2300                 }
2301         }
2302
2303 no_match:
2304         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2305                 RETURN(-ENOLCK);
2306
2307         if (intent) {
2308                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2309                                            &RQF_LDLM_ENQUEUE_LVB);
2310                 if (req == NULL)
2311                         RETURN(-ENOMEM);
2312
2313                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2314                 if (rc) {
2315                         ptlrpc_request_free(req);
2316                         RETURN(rc);
2317                 }
2318
2319                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2320                                      sizeof *lvb);
2321                 ptlrpc_request_set_replen(req);
2322         }
2323
2324         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2325         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2326
2327         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2328                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2329         if (async) {
2330                 if (!rc) {
2331                         struct osc_enqueue_args *aa;
2332                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2333                         aa = ptlrpc_req_async_args(req);
2334                         aa->oa_exp         = exp;
2335                         aa->oa_mode        = einfo->ei_mode;
2336                         aa->oa_type        = einfo->ei_type;
2337                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2338                         aa->oa_upcall      = upcall;
2339                         aa->oa_cookie      = cookie;
2340                         aa->oa_speculative = speculative;
2341                         if (!speculative) {
2342                                 aa->oa_flags  = flags;
2343                                 aa->oa_lvb    = lvb;
2344                         } else {
2345                                 /* speculative locks are essentially to enqueue
2346                                  * a DLM lock  in advance, so we don't care
2347                                  * about the result of the enqueue. */
2348                                 aa->oa_lvb    = NULL;
2349                                 aa->oa_flags  = NULL;
2350                         }
2351
2352                         req->rq_interpret_reply =
2353                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2354                         if (rqset == PTLRPCD_SET)
2355                                 ptlrpcd_add_req(req);
2356                         else
2357                                 ptlrpc_set_add_req(rqset, req);
2358                 } else if (intent) {
2359                         ptlrpc_req_finished(req);
2360                 }
2361                 RETURN(rc);
2362         }
2363
2364         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2365                               flags, speculative, rc);
2366         if (intent)
2367                 ptlrpc_req_finished(req);
2368
2369         RETURN(rc);
2370 }
2371
2372 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2373                    enum ldlm_type type, union ldlm_policy_data *policy,
2374                    enum ldlm_mode mode, __u64 *flags, void *data,
2375                    struct lustre_handle *lockh, int unref)
2376 {
2377         struct obd_device *obd = exp->exp_obd;
2378         __u64 lflags = *flags;
2379         enum ldlm_mode rc;
2380         ENTRY;
2381
2382         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2383                 RETURN(-EIO);
2384
2385         /* Filesystem lock extents are extended to page boundaries so that
2386          * dealing with the page cache is a little smoother */
2387         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2388         policy->l_extent.end |= ~PAGE_MASK;
2389
2390         /* Next, search for already existing extent locks that will cover us */
2391         /* If we're trying to read, we also search for an existing PW lock.  The
2392          * VFS and page cache already protect us locally, so lots of readers/
2393          * writers can share a single PW lock. */
2394         rc = mode;
2395         if (mode == LCK_PR)
2396                 rc |= LCK_PW;
2397         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2398                              res_id, type, policy, rc, lockh, unref);
2399         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2400                 RETURN(rc);
2401
2402         if (data != NULL) {
2403                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2404
2405                 LASSERT(lock != NULL);
2406                 if (!osc_set_lock_data(lock, data)) {
2407                         ldlm_lock_decref(lockh, rc);
2408                         rc = 0;
2409                 }
2410                 LDLM_LOCK_PUT(lock);
2411         }
2412         RETURN(rc);
2413 }
2414
2415 static int osc_statfs_interpret(const struct lu_env *env,
2416                                 struct ptlrpc_request *req,
2417                                 struct osc_async_args *aa, int rc)
2418 {
2419         struct obd_statfs *msfs;
2420         ENTRY;
2421
2422         if (rc == -EBADR)
2423                 /* The request has in fact never been sent
2424                  * due to issues at a higher level (LOV).
2425                  * Exit immediately since the caller is
2426                  * aware of the problem and takes care
2427                  * of the clean up */
2428                  RETURN(rc);
2429
2430         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2431             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2432                 GOTO(out, rc = 0);
2433
2434         if (rc != 0)
2435                 GOTO(out, rc);
2436
2437         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2438         if (msfs == NULL) {
2439                 GOTO(out, rc = -EPROTO);
2440         }
2441
2442         *aa->aa_oi->oi_osfs = *msfs;
2443 out:
2444         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2445         RETURN(rc);
2446 }
2447
2448 static int osc_statfs_async(struct obd_export *exp,
2449                             struct obd_info *oinfo, __u64 max_age,
2450                             struct ptlrpc_request_set *rqset)
2451 {
2452         struct obd_device     *obd = class_exp2obd(exp);
2453         struct ptlrpc_request *req;
2454         struct osc_async_args *aa;
2455         int                    rc;
2456         ENTRY;
2457
2458         /* We could possibly pass max_age in the request (as an absolute
2459          * timestamp or a "seconds.usec ago") so the target can avoid doing
2460          * extra calls into the filesystem if that isn't necessary (e.g.
2461          * during mount that would help a bit).  Having relative timestamps
2462          * is not so great if request processing is slow, while absolute
2463          * timestamps are not ideal because they need time synchronization. */
2464         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2465         if (req == NULL)
2466                 RETURN(-ENOMEM);
2467
2468         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2469         if (rc) {
2470                 ptlrpc_request_free(req);
2471                 RETURN(rc);
2472         }
2473         ptlrpc_request_set_replen(req);
2474         req->rq_request_portal = OST_CREATE_PORTAL;
2475         ptlrpc_at_set_req_timeout(req);
2476
2477         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2478                 /* procfs requests not want stat in wait for avoid deadlock */
2479                 req->rq_no_resend = 1;
2480                 req->rq_no_delay = 1;
2481         }
2482
2483         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2484         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2485         aa = ptlrpc_req_async_args(req);
2486         aa->aa_oi = oinfo;
2487
2488         ptlrpc_set_add_req(rqset, req);
2489         RETURN(0);
2490 }
2491
2492 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2493                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2494 {
2495         struct obd_device     *obd = class_exp2obd(exp);
2496         struct obd_statfs     *msfs;
2497         struct ptlrpc_request *req;
2498         struct obd_import     *imp = NULL;
2499         int rc;
2500         ENTRY;
2501
2502         /*Since the request might also come from lprocfs, so we need
2503          *sync this with client_disconnect_export Bug15684*/
2504         down_read(&obd->u.cli.cl_sem);
2505         if (obd->u.cli.cl_import)
2506                 imp = class_import_get(obd->u.cli.cl_import);
2507         up_read(&obd->u.cli.cl_sem);
2508         if (!imp)
2509                 RETURN(-ENODEV);
2510
2511         /* We could possibly pass max_age in the request (as an absolute
2512          * timestamp or a "seconds.usec ago") so the target can avoid doing
2513          * extra calls into the filesystem if that isn't necessary (e.g.
2514          * during mount that would help a bit).  Having relative timestamps
2515          * is not so great if request processing is slow, while absolute
2516          * timestamps are not ideal because they need time synchronization. */
2517         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2518
2519         class_import_put(imp);
2520
2521         if (req == NULL)
2522                 RETURN(-ENOMEM);
2523
2524         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2525         if (rc) {
2526                 ptlrpc_request_free(req);
2527                 RETURN(rc);
2528         }
2529         ptlrpc_request_set_replen(req);
2530         req->rq_request_portal = OST_CREATE_PORTAL;
2531         ptlrpc_at_set_req_timeout(req);
2532
2533         if (flags & OBD_STATFS_NODELAY) {
2534                 /* procfs requests not want stat in wait for avoid deadlock */
2535                 req->rq_no_resend = 1;
2536                 req->rq_no_delay = 1;
2537         }
2538
2539         rc = ptlrpc_queue_wait(req);
2540         if (rc)
2541                 GOTO(out, rc);
2542
2543         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2544         if (msfs == NULL) {
2545                 GOTO(out, rc = -EPROTO);
2546         }
2547
2548         *osfs = *msfs;
2549
2550         EXIT;
2551  out:
2552         ptlrpc_req_finished(req);
2553         return rc;
2554 }
2555
2556 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2557                          void *karg, void __user *uarg)
2558 {
2559         struct obd_device *obd = exp->exp_obd;
2560         struct obd_ioctl_data *data = karg;
2561         int err = 0;
2562         ENTRY;
2563
2564         if (!try_module_get(THIS_MODULE)) {
2565                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2566                        module_name(THIS_MODULE));
2567                 return -EINVAL;
2568         }
2569         switch (cmd) {
2570         case OBD_IOC_CLIENT_RECOVER:
2571                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2572                                             data->ioc_inlbuf1, 0);
2573                 if (err > 0)
2574                         err = 0;
2575                 GOTO(out, err);
2576         case IOC_OSC_SET_ACTIVE:
2577                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2578                                                data->ioc_offset);
2579                 GOTO(out, err);
2580         case OBD_IOC_PING_TARGET:
2581                 err = ptlrpc_obd_ping(obd);
2582                 GOTO(out, err);
2583         default:
2584                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2585                        cmd, current_comm());
2586                 GOTO(out, err = -ENOTTY);
2587         }
2588 out:
2589         module_put(THIS_MODULE);
2590         return err;
2591 }
2592
2593 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2594                        u32 keylen, void *key, u32 vallen, void *val,
2595                        struct ptlrpc_request_set *set)
2596 {
2597         struct ptlrpc_request *req;
2598         struct obd_device     *obd = exp->exp_obd;
2599         struct obd_import     *imp = class_exp2cliimp(exp);
2600         char                  *tmp;
2601         int                    rc;
2602         ENTRY;
2603
2604         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2605
2606         if (KEY_IS(KEY_CHECKSUM)) {
2607                 if (vallen != sizeof(int))
2608                         RETURN(-EINVAL);
2609                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2610                 RETURN(0);
2611         }
2612
2613         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2614                 sptlrpc_conf_client_adapt(obd);
2615                 RETURN(0);
2616         }
2617
2618         if (KEY_IS(KEY_FLUSH_CTX)) {
2619                 sptlrpc_import_flush_my_ctx(imp);
2620                 RETURN(0);
2621         }
2622
2623         if (KEY_IS(KEY_CACHE_SET)) {
2624                 struct client_obd *cli = &obd->u.cli;
2625
2626                 LASSERT(cli->cl_cache == NULL); /* only once */
2627                 cli->cl_cache = (struct cl_client_cache *)val;
2628                 cl_cache_incref(cli->cl_cache);
2629                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2630
2631                 /* add this osc into entity list */
2632                 LASSERT(list_empty(&cli->cl_lru_osc));
2633                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2634                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2635                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2636
2637                 RETURN(0);
2638         }
2639
2640         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2641                 struct client_obd *cli = &obd->u.cli;
2642                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2643                 long target = *(long *)val;
2644
2645                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2646                 *(long *)val -= nr;
2647                 RETURN(0);
2648         }
2649
2650         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2651                 RETURN(-EINVAL);
2652
2653         /* We pass all other commands directly to OST. Since nobody calls osc
2654            methods directly and everybody is supposed to go through LOV, we
2655            assume lov checked invalid values for us.
2656            The only recognised values so far are evict_by_nid and mds_conn.
2657            Even if something bad goes through, we'd get a -EINVAL from OST
2658            anyway. */
2659
2660         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2661                                                 &RQF_OST_SET_GRANT_INFO :
2662                                                 &RQF_OBD_SET_INFO);
2663         if (req == NULL)
2664                 RETURN(-ENOMEM);
2665
2666         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2667                              RCL_CLIENT, keylen);
2668         if (!KEY_IS(KEY_GRANT_SHRINK))
2669                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2670                                      RCL_CLIENT, vallen);
2671         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2672         if (rc) {
2673                 ptlrpc_request_free(req);
2674                 RETURN(rc);
2675         }
2676
2677         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2678         memcpy(tmp, key, keylen);
2679         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2680                                                         &RMF_OST_BODY :
2681                                                         &RMF_SETINFO_VAL);
2682         memcpy(tmp, val, vallen);
2683
2684         if (KEY_IS(KEY_GRANT_SHRINK)) {
2685                 struct osc_grant_args *aa;
2686                 struct obdo *oa;
2687
2688                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2689                 aa = ptlrpc_req_async_args(req);
2690                 OBDO_ALLOC(oa);
2691                 if (!oa) {
2692                         ptlrpc_req_finished(req);
2693                         RETURN(-ENOMEM);
2694                 }
2695                 *oa = ((struct ost_body *)val)->oa;
2696                 aa->aa_oa = oa;
2697                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2698         }
2699
2700         ptlrpc_request_set_replen(req);
2701         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2702                 LASSERT(set != NULL);
2703                 ptlrpc_set_add_req(set, req);
2704                 ptlrpc_check_set(NULL, set);
2705         } else {
2706                 ptlrpcd_add_req(req);
2707         }
2708
2709         RETURN(0);
2710 }
2711 EXPORT_SYMBOL(osc_set_info_async);
2712
2713 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2714                   struct obd_device *obd, struct obd_uuid *cluuid,
2715                   struct obd_connect_data *data, void *localdata)
2716 {
2717         struct client_obd *cli = &obd->u.cli;
2718
2719         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2720                 long lost_grant;
2721                 long grant;
2722
2723                 spin_lock(&cli->cl_loi_list_lock);
2724                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2725                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2726                         grant += cli->cl_dirty_grant;
2727                 else
2728                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
2729                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2730                 lost_grant = cli->cl_lost_grant;
2731                 cli->cl_lost_grant = 0;
2732                 spin_unlock(&cli->cl_loi_list_lock);
2733
2734                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2735                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2736                        data->ocd_version, data->ocd_grant, lost_grant);
2737         }
2738
2739         RETURN(0);
2740 }
2741 EXPORT_SYMBOL(osc_reconnect);
2742
2743 int osc_disconnect(struct obd_export *exp)
2744 {
2745         struct obd_device *obd = class_exp2obd(exp);
2746         int rc;
2747
2748         rc = client_disconnect_export(exp);
2749         /**
2750          * Initially we put del_shrink_grant before disconnect_export, but it
2751          * causes the following problem if setup (connect) and cleanup
2752          * (disconnect) are tangled together.
2753          *      connect p1                     disconnect p2
2754          *   ptlrpc_connect_import
2755          *     ...............               class_manual_cleanup
2756          *                                     osc_disconnect
2757          *                                     del_shrink_grant
2758          *   ptlrpc_connect_interrupt
2759          *     init_grant_shrink
2760          *   add this client to shrink list
2761          *                                      cleanup_osc
2762          * Bang! pinger trigger the shrink.
2763          * So the osc should be disconnected from the shrink list, after we
2764          * are sure the import has been destroyed. BUG18662
2765          */
2766         if (obd->u.cli.cl_import == NULL)
2767                 osc_del_shrink_grant(&obd->u.cli);
2768         return rc;
2769 }
2770 EXPORT_SYMBOL(osc_disconnect);
2771
2772 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2773                                  struct hlist_node *hnode, void *arg)
2774 {
2775         struct lu_env *env = arg;
2776         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2777         struct ldlm_lock *lock;
2778         struct osc_object *osc = NULL;
2779         ENTRY;
2780
2781         lock_res(res);
2782         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2783                 if (lock->l_ast_data != NULL && osc == NULL) {
2784                         osc = lock->l_ast_data;
2785                         cl_object_get(osc2cl(osc));
2786                 }
2787
2788                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2789                  * by the 2nd round of ldlm_namespace_clean() call in
2790                  * osc_import_event(). */
2791                 ldlm_clear_cleaned(lock);
2792         }
2793         unlock_res(res);
2794
2795         if (osc != NULL) {
2796                 osc_object_invalidate(env, osc);
2797                 cl_object_put(env, osc2cl(osc));
2798         }
2799
2800         RETURN(0);
2801 }
2802 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
2803
2804 static int osc_import_event(struct obd_device *obd,
2805                             struct obd_import *imp,
2806                             enum obd_import_event event)
2807 {
2808         struct client_obd *cli;
2809         int rc = 0;
2810
2811         ENTRY;
2812         LASSERT(imp->imp_obd == obd);
2813
2814         switch (event) {
2815         case IMP_EVENT_DISCON: {
2816                 cli = &obd->u.cli;
2817                 spin_lock(&cli->cl_loi_list_lock);
2818                 cli->cl_avail_grant = 0;
2819                 cli->cl_lost_grant = 0;
2820                 spin_unlock(&cli->cl_loi_list_lock);
2821                 break;
2822         }
2823         case IMP_EVENT_INACTIVE: {
2824                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2825                 break;
2826         }
2827         case IMP_EVENT_INVALIDATE: {
2828                 struct ldlm_namespace *ns = obd->obd_namespace;
2829                 struct lu_env         *env;
2830                 __u16                  refcheck;
2831
2832                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2833
2834                 env = cl_env_get(&refcheck);
2835                 if (!IS_ERR(env)) {
2836                         osc_io_unplug(env, &obd->u.cli, NULL);
2837
2838                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2839                                                  osc_ldlm_resource_invalidate,
2840                                                  env, 0);
2841                         cl_env_put(env, &refcheck);
2842
2843                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2844                 } else
2845                         rc = PTR_ERR(env);
2846                 break;
2847         }
2848         case IMP_EVENT_ACTIVE: {
2849                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2850                 break;
2851         }
2852         case IMP_EVENT_OCD: {
2853                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2854
2855                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2856                         osc_init_grant(&obd->u.cli, ocd);
2857
2858                 /* See bug 7198 */
2859                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2860                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2861
2862                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2863                 break;
2864         }
2865         case IMP_EVENT_DEACTIVATE: {
2866                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2867                 break;
2868         }
2869         case IMP_EVENT_ACTIVATE: {
2870                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2871                 break;
2872         }
2873         default:
2874                 CERROR("Unknown import event %d\n", event);
2875                 LBUG();
2876         }
2877         RETURN(rc);
2878 }
2879
2880 /**
2881  * Determine whether the lock can be canceled before replaying the lock
2882  * during recovery, see bug16774 for detailed information.
2883  *
2884  * \retval zero the lock can't be canceled
2885  * \retval other ok to cancel
2886  */
2887 static int osc_cancel_weight(struct ldlm_lock *lock)
2888 {
2889         /*
2890          * Cancel all unused and granted extent lock.
2891          */
2892         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2893             lock->l_granted_mode == lock->l_req_mode &&
2894             osc_ldlm_weigh_ast(lock) == 0)
2895                 RETURN(1);
2896
2897         RETURN(0);
2898 }
2899
2900 static int brw_queue_work(const struct lu_env *env, void *data)
2901 {
2902         struct client_obd *cli = data;
2903
2904         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2905
2906         osc_io_unplug(env, cli, NULL);
2907         RETURN(0);
2908 }
2909
2910 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
2911 {
2912         struct client_obd *cli = &obd->u.cli;
2913         void *handler;
2914         int rc;
2915
2916         ENTRY;
2917
2918         rc = ptlrpcd_addref();
2919         if (rc)
2920                 RETURN(rc);
2921
2922         rc = client_obd_setup(obd, lcfg);
2923         if (rc)
2924                 GOTO(out_ptlrpcd, rc);
2925
2926
2927         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2928         if (IS_ERR(handler))
2929                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2930         cli->cl_writeback_work = handler;
2931
2932         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2933         if (IS_ERR(handler))
2934                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2935         cli->cl_lru_work = handler;
2936
2937         rc = osc_quota_setup(obd);
2938         if (rc)
2939                 GOTO(out_ptlrpcd_work, rc);
2940
2941         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2942
2943         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2944         RETURN(rc);
2945
2946 out_ptlrpcd_work:
2947         if (cli->cl_writeback_work != NULL) {
2948                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2949                 cli->cl_writeback_work = NULL;
2950         }
2951         if (cli->cl_lru_work != NULL) {
2952                 ptlrpcd_destroy_work(cli->cl_lru_work);
2953                 cli->cl_lru_work = NULL;
2954         }
2955         client_obd_cleanup(obd);
2956 out_ptlrpcd:
2957         ptlrpcd_decref();
2958         RETURN(rc);
2959 }
2960 EXPORT_SYMBOL(osc_setup_common);
2961
2962 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2963 {
2964         struct client_obd *cli = &obd->u.cli;
2965         struct obd_type   *type;
2966         int                adding;
2967         int                added;
2968         int                req_count;
2969         int                rc;
2970
2971         ENTRY;
2972
2973         rc = osc_setup_common(obd, lcfg);
2974         if (rc < 0)
2975                 RETURN(rc);
2976
2977 #ifdef CONFIG_PROC_FS
2978         obd->obd_vars = lprocfs_osc_obd_vars;
2979 #endif
2980         /* If this is true then both client (osc) and server (osp) are on the
2981          * same node. The osp layer if loaded first will register the osc proc
2982          * directory. In that case this obd_device will be attached its proc
2983          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2984          */
2985         type = class_search_type(LUSTRE_OSP_NAME);
2986         if (type && type->typ_procsym) {
2987                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2988                                                        type->typ_procsym,
2989                                                        obd->obd_vars, obd);
2990                 if (IS_ERR(obd->obd_proc_entry)) {
2991                         rc = PTR_ERR(obd->obd_proc_entry);
2992                         CERROR("error %d setting up lprocfs for %s\n", rc,
2993                                obd->obd_name);
2994                         obd->obd_proc_entry = NULL;
2995                 }
2996         }
2997
2998         rc = lprocfs_obd_setup(obd, false);
2999         if (!rc) {
3000                 /* If the basic OSC proc tree construction succeeded then
3001                  * lets do the rest.
3002                  */
3003                 lproc_osc_attach_seqstat(obd);
3004                 sptlrpc_lprocfs_cliobd_attach(obd);
3005                 ptlrpc_lprocfs_register_obd(obd);
3006         }
3007
3008         /*
3009          * We try to control the total number of requests with a upper limit
3010          * osc_reqpool_maxreqcount. There might be some race which will cause
3011          * over-limit allocation, but it is fine.
3012          */
3013         req_count = atomic_read(&osc_pool_req_count);
3014         if (req_count < osc_reqpool_maxreqcount) {
3015                 adding = cli->cl_max_rpcs_in_flight + 2;
3016                 if (req_count + adding > osc_reqpool_maxreqcount)
3017                         adding = osc_reqpool_maxreqcount - req_count;
3018
3019                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3020                 atomic_add(added, &osc_pool_req_count);
3021         }
3022
3023         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3024         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3025
3026         spin_lock(&osc_shrink_lock);
3027         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3028         spin_unlock(&osc_shrink_lock);
3029
3030         RETURN(0);
3031 }
3032
3033 int osc_precleanup_common(struct obd_device *obd)
3034 {
3035         struct client_obd *cli = &obd->u.cli;
3036         ENTRY;
3037
3038         /* LU-464
3039          * for echo client, export may be on zombie list, wait for
3040          * zombie thread to cull it, because cli.cl_import will be
3041          * cleared in client_disconnect_export():
3042          *   class_export_destroy() -> obd_cleanup() ->
3043          *   echo_device_free() -> echo_client_cleanup() ->
3044          *   obd_disconnect() -> osc_disconnect() ->
3045          *   client_disconnect_export()
3046          */
3047         obd_zombie_barrier();
3048         if (cli->cl_writeback_work) {
3049                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3050                 cli->cl_writeback_work = NULL;
3051         }
3052
3053         if (cli->cl_lru_work) {
3054                 ptlrpcd_destroy_work(cli->cl_lru_work);
3055                 cli->cl_lru_work = NULL;
3056         }
3057
3058         obd_cleanup_client_import(obd);
3059         RETURN(0);
3060 }
3061 EXPORT_SYMBOL(osc_precleanup_common);
3062
3063 static int osc_precleanup(struct obd_device *obd)
3064 {
3065         ENTRY;
3066
3067         osc_precleanup_common(obd);
3068
3069         ptlrpc_lprocfs_unregister_obd(obd);
3070         lprocfs_obd_cleanup(obd);
3071         RETURN(0);
3072 }
3073
3074 int osc_cleanup_common(struct obd_device *obd)
3075 {
3076         struct client_obd *cli = &obd->u.cli;
3077         int rc;
3078
3079         ENTRY;
3080
3081         spin_lock(&osc_shrink_lock);
3082         list_del(&cli->cl_shrink_list);
3083         spin_unlock(&osc_shrink_lock);
3084
3085         /* lru cleanup */
3086         if (cli->cl_cache != NULL) {
3087                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3088                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3089                 list_del_init(&cli->cl_lru_osc);
3090                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3091                 cli->cl_lru_left = NULL;
3092                 cl_cache_decref(cli->cl_cache);
3093                 cli->cl_cache = NULL;
3094         }
3095
3096         /* free memory of osc quota cache */
3097         osc_quota_cleanup(obd);
3098
3099         rc = client_obd_cleanup(obd);
3100
3101         ptlrpcd_decref();
3102         RETURN(rc);
3103 }
3104 EXPORT_SYMBOL(osc_cleanup_common);
3105
3106 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3107 {
3108         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3109         return rc > 0 ? 0: rc;
3110 }
3111
3112 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3113 {
3114         return osc_process_config_base(obd, buf);
3115 }
3116
3117 static struct obd_ops osc_obd_ops = {
3118         .o_owner                = THIS_MODULE,
3119         .o_setup                = osc_setup,
3120         .o_precleanup           = osc_precleanup,
3121         .o_cleanup              = osc_cleanup_common,
3122         .o_add_conn             = client_import_add_conn,
3123         .o_del_conn             = client_import_del_conn,
3124         .o_connect              = client_connect_import,
3125         .o_reconnect            = osc_reconnect,
3126         .o_disconnect           = osc_disconnect,
3127         .o_statfs               = osc_statfs,
3128         .o_statfs_async         = osc_statfs_async,
3129         .o_create               = osc_create,
3130         .o_destroy              = osc_destroy,
3131         .o_getattr              = osc_getattr,
3132         .o_setattr              = osc_setattr,
3133         .o_iocontrol            = osc_iocontrol,
3134         .o_set_info_async       = osc_set_info_async,
3135         .o_import_event         = osc_import_event,
3136         .o_process_config       = osc_process_config,
3137         .o_quotactl             = osc_quotactl,
3138 };
3139
3140 static struct shrinker *osc_cache_shrinker;
3141 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3142 DEFINE_SPINLOCK(osc_shrink_lock);
3143
3144 #ifndef HAVE_SHRINKER_COUNT
3145 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3146 {
3147         struct shrink_control scv = {
3148                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3149                 .gfp_mask   = shrink_param(sc, gfp_mask)
3150         };
3151 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3152         struct shrinker *shrinker = NULL;
3153 #endif
3154
3155         (void)osc_cache_shrink_scan(shrinker, &scv);
3156
3157         return osc_cache_shrink_count(shrinker, &scv);
3158 }
3159 #endif
3160
3161 static int __init osc_init(void)
3162 {
3163         bool enable_proc = true;
3164         struct obd_type *type;
3165         unsigned int reqpool_size;
3166         unsigned int reqsize;
3167         int rc;
3168         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3169                          osc_cache_shrink_count, osc_cache_shrink_scan);
3170         ENTRY;
3171
3172         /* print an address of _any_ initialized kernel symbol from this
3173          * module, to allow debugging with gdb that doesn't support data
3174          * symbols from modules.*/
3175         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3176
3177         rc = lu_kmem_init(osc_caches);
3178         if (rc)
3179                 RETURN(rc);
3180
3181         type = class_search_type(LUSTRE_OSP_NAME);
3182         if (type != NULL && type->typ_procsym != NULL)
3183                 enable_proc = false;
3184
3185         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3186                                  LUSTRE_OSC_NAME, &osc_device_type);
3187         if (rc)
3188                 GOTO(out_kmem, rc);
3189
3190         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3191
3192         /* This is obviously too much memory, only prevent overflow here */
3193         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3194                 GOTO(out_type, rc = -EINVAL);
3195
3196         reqpool_size = osc_reqpool_mem_max << 20;
3197
3198         reqsize = 1;
3199         while (reqsize < OST_IO_MAXREQSIZE)
3200                 reqsize = reqsize << 1;
3201
3202         /*
3203          * We don't enlarge the request count in OSC pool according to
3204          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3205          * tried after normal allocation failed. So a small OSC pool won't
3206          * cause much performance degression in most of cases.
3207          */
3208         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3209
3210         atomic_set(&osc_pool_req_count, 0);
3211         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3212                                           ptlrpc_add_rqs_to_pool);
3213
3214         if (osc_rq_pool != NULL)
3215                 GOTO(out, rc);
3216         rc = -ENOMEM;
3217 out_type:
3218         class_unregister_type(LUSTRE_OSC_NAME);
3219 out_kmem:
3220         lu_kmem_fini(osc_caches);
3221 out:
3222         RETURN(rc);
3223 }
3224
3225 static void __exit osc_exit(void)
3226 {
3227         remove_shrinker(osc_cache_shrinker);
3228         class_unregister_type(LUSTRE_OSC_NAME);
3229         lu_kmem_fini(osc_caches);
3230         ptlrpc_free_rq_pool(osc_rq_pool);
3231 }
3232
3233 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3234 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3235 MODULE_VERSION(LUSTRE_VERSION_STRING);
3236 MODULE_LICENSE("GPL");
3237
3238 module_init(osc_init);
3239 module_exit(osc_exit);