Whamcloud - gitweb
LU-6325 ptlrpc: make ptlrpcd threads cpt-aware
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct cl_req            *aa_clerq;
66 };
67
68 #define osc_grant_args osc_brw_async_args
69
70 struct osc_setattr_args {
71         struct obdo             *sa_oa;
72         obd_enqueue_update_f     sa_upcall;
73         void                    *sa_cookie;
74 };
75
76 struct osc_fsync_args {
77         struct osc_object       *fa_obj;
78         struct obdo             *fa_oa;
79         obd_enqueue_update_f    fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
100 {
101         struct ost_body *body;
102
103         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
104         LASSERT(body);
105
106         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
107 }
108
109 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
110                        struct obdo *oa)
111 {
112         struct ptlrpc_request   *req;
113         struct ost_body         *body;
114         int                      rc;
115
116         ENTRY;
117         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
118         if (req == NULL)
119                 RETURN(-ENOMEM);
120
121         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
122         if (rc) {
123                 ptlrpc_request_free(req);
124                 RETURN(rc);
125         }
126
127         osc_pack_req_body(req, oa);
128
129         ptlrpc_request_set_replen(req);
130
131         rc = ptlrpc_queue_wait(req);
132         if (rc)
133                 GOTO(out, rc);
134
135         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
136         if (body == NULL)
137                 GOTO(out, rc = -EPROTO);
138
139         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
140         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
141
142         oa->o_blksize = cli_brw_size(exp->exp_obd);
143         oa->o_valid |= OBD_MD_FLBLKSZ;
144
145         EXIT;
146 out:
147         ptlrpc_req_finished(req);
148
149         return rc;
150 }
151
152 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
153                        struct obdo *oa)
154 {
155         struct ptlrpc_request   *req;
156         struct ost_body         *body;
157         int                      rc;
158
159         ENTRY;
160         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
161
162         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
163         if (req == NULL)
164                 RETURN(-ENOMEM);
165
166         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
167         if (rc) {
168                 ptlrpc_request_free(req);
169                 RETURN(rc);
170         }
171
172         osc_pack_req_body(req, oa);
173
174         ptlrpc_request_set_replen(req);
175
176         rc = ptlrpc_queue_wait(req);
177         if (rc)
178                 GOTO(out, rc);
179
180         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
181         if (body == NULL)
182                 GOTO(out, rc = -EPROTO);
183
184         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
185
186         EXIT;
187 out:
188         ptlrpc_req_finished(req);
189
190         RETURN(rc);
191 }
192
193 static int osc_setattr_interpret(const struct lu_env *env,
194                                  struct ptlrpc_request *req,
195                                  struct osc_setattr_args *sa, int rc)
196 {
197         struct ost_body *body;
198         ENTRY;
199
200         if (rc != 0)
201                 GOTO(out, rc);
202
203         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204         if (body == NULL)
205                 GOTO(out, rc = -EPROTO);
206
207         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
208                              &body->oa);
209 out:
210         rc = sa->sa_upcall(sa->sa_cookie, rc);
211         RETURN(rc);
212 }
213
214 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
215                       obd_enqueue_update_f upcall, void *cookie,
216                       struct ptlrpc_request_set *rqset)
217 {
218         struct ptlrpc_request   *req;
219         struct osc_setattr_args *sa;
220         int                      rc;
221
222         ENTRY;
223
224         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225         if (req == NULL)
226                 RETURN(-ENOMEM);
227
228         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
229         if (rc) {
230                 ptlrpc_request_free(req);
231                 RETURN(rc);
232         }
233
234         osc_pack_req_body(req, oa);
235
236         ptlrpc_request_set_replen(req);
237
238         /* do mds to ost setattr asynchronously */
239         if (!rqset) {
240                 /* Do not wait for response. */
241                 ptlrpcd_add_req(req);
242         } else {
243                 req->rq_interpret_reply =
244                         (ptlrpc_interpterer_t)osc_setattr_interpret;
245
246                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
247                 sa = ptlrpc_req_async_args(req);
248                 sa->sa_oa = oa;
249                 sa->sa_upcall = upcall;
250                 sa->sa_cookie = cookie;
251
252                 if (rqset == PTLRPCD_SET)
253                         ptlrpcd_add_req(req);
254                 else
255                         ptlrpc_set_add_req(rqset, req);
256         }
257
258         RETURN(0);
259 }
260
261 static int osc_create(const struct lu_env *env, struct obd_export *exp,
262                       struct obdo *oa)
263 {
264         struct ptlrpc_request *req;
265         struct ost_body       *body;
266         int                    rc;
267         ENTRY;
268
269         LASSERT(oa != NULL);
270         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
271         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
272
273         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
274         if (req == NULL)
275                 GOTO(out, rc = -ENOMEM);
276
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 GOTO(out, rc);
281         }
282
283         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
284         LASSERT(body);
285
286         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
287
288         ptlrpc_request_set_replen(req);
289
290         rc = ptlrpc_queue_wait(req);
291         if (rc)
292                 GOTO(out_req, rc);
293
294         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295         if (body == NULL)
296                 GOTO(out_req, rc = -EPROTO);
297
298         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
299         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
300
301         oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         CDEBUG(D_HA, "transno: "LPD64"\n",
305                lustre_msg_get_transno(req->rq_repmsg));
306 out_req:
307         ptlrpc_req_finished(req);
308 out:
309         RETURN(rc);
310 }
311
312 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
313                    obd_enqueue_update_f upcall, void *cookie,
314                    struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         struct ost_body         *body;
319         int                      rc;
320         ENTRY;
321
322         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
323         if (req == NULL)
324                 RETURN(-ENOMEM);
325
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
332         ptlrpc_at_set_req_timeout(req);
333
334         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
335         LASSERT(body);
336         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
337
338         ptlrpc_request_set_replen(req);
339
340         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
341         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
342         sa = ptlrpc_req_async_args(req);
343         sa->sa_oa = oa;
344         sa->sa_upcall = upcall;
345         sa->sa_cookie = cookie;
346         if (rqset == PTLRPCD_SET)
347                 ptlrpcd_add_req(req);
348         else
349                 ptlrpc_set_add_req(rqset, req);
350
351         RETURN(0);
352 }
353
354 static int osc_sync_interpret(const struct lu_env *env,
355                               struct ptlrpc_request *req,
356                               void *arg, int rc)
357 {
358         struct osc_fsync_args   *fa = arg;
359         struct ost_body         *body;
360         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
361         unsigned long           valid = 0;
362         struct cl_object        *obj;
363         ENTRY;
364
365         if (rc != 0)
366                 GOTO(out, rc);
367
368         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL) {
370                 CERROR("can't unpack ost_body\n");
371                 GOTO(out, rc = -EPROTO);
372         }
373
374         *fa->fa_oa = body->oa;
375         obj = osc2cl(fa->fa_obj);
376
377         /* Update osc object's blocks attribute */
378         cl_object_attr_lock(obj);
379         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
380                 attr->cat_blocks = body->oa.o_blocks;
381                 valid |= CAT_BLOCKS;
382         }
383
384         if (valid != 0)
385                 cl_object_attr_update(env, obj, attr, valid);
386         cl_object_attr_unlock(obj);
387
388 out:
389         rc = fa->fa_upcall(fa->fa_cookie, rc);
390         RETURN(rc);
391 }
392
393 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
394                   obd_enqueue_update_f upcall, void *cookie,
395                   struct ptlrpc_request_set *rqset)
396 {
397         struct obd_export     *exp = osc_export(obj);
398         struct ptlrpc_request *req;
399         struct ost_body       *body;
400         struct osc_fsync_args *fa;
401         int                    rc;
402         ENTRY;
403
404         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
405         if (req == NULL)
406                 RETURN(-ENOMEM);
407
408         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
409         if (rc) {
410                 ptlrpc_request_free(req);
411                 RETURN(rc);
412         }
413
414         /* overload the size and blocks fields in the oa with start/end */
415         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
416         LASSERT(body);
417         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
418
419         ptlrpc_request_set_replen(req);
420         req->rq_interpret_reply = osc_sync_interpret;
421
422         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
423         fa = ptlrpc_req_async_args(req);
424         fa->fa_obj = obj;
425         fa->fa_oa = oa;
426         fa->fa_upcall = upcall;
427         fa->fa_cookie = cookie;
428
429         if (rqset == PTLRPCD_SET)
430                 ptlrpcd_add_req(req);
431         else
432                 ptlrpc_set_add_req(rqset, req);
433
434         RETURN (0);
435 }
436
437 /* Find and cancel locally locks matched by @mode in the resource found by
438  * @objid. Found locks are added into @cancel list. Returns the amount of
439  * locks added to @cancels list. */
440 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
441                                    struct list_head *cancels,
442                                    ldlm_mode_t mode, __u64 lock_flags)
443 {
444         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
445         struct ldlm_res_id res_id;
446         struct ldlm_resource *res;
447         int count;
448         ENTRY;
449
450         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
451          * export) but disabled through procfs (flag in NS).
452          *
453          * This distinguishes from a case when ELC is not supported originally,
454          * when we still want to cancel locks in advance and just cancel them
455          * locally, without sending any RPC. */
456         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
457                 RETURN(0);
458
459         ostid_build_res_name(&oa->o_oi, &res_id);
460         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
461         if (IS_ERR(res))
462                 RETURN(0);
463
464         LDLM_RESOURCE_ADDREF(res);
465         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
466                                            lock_flags, 0, NULL);
467         LDLM_RESOURCE_DELREF(res);
468         ldlm_resource_putref(res);
469         RETURN(count);
470 }
471
472 static int osc_destroy_interpret(const struct lu_env *env,
473                                  struct ptlrpc_request *req, void *data,
474                                  int rc)
475 {
476         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
477
478         atomic_dec(&cli->cl_destroy_in_flight);
479         wake_up(&cli->cl_destroy_waitq);
480         return 0;
481 }
482
483 static int osc_can_send_destroy(struct client_obd *cli)
484 {
485         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
486             cli->cl_max_rpcs_in_flight) {
487                 /* The destroy request can be sent */
488                 return 1;
489         }
490         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
491             cli->cl_max_rpcs_in_flight) {
492                 /*
493                  * The counter has been modified between the two atomic
494                  * operations.
495                  */
496                 wake_up(&cli->cl_destroy_waitq);
497         }
498         return 0;
499 }
500
501 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
502                        struct obdo *oa)
503 {
504         struct client_obd     *cli = &exp->exp_obd->u.cli;
505         struct ptlrpc_request *req;
506         struct ost_body       *body;
507         struct list_head       cancels = LIST_HEAD_INIT(cancels);
508         int rc, count;
509         ENTRY;
510
511         if (!oa) {
512                 CDEBUG(D_INFO, "oa NULL\n");
513                 RETURN(-EINVAL);
514         }
515
516         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
517                                         LDLM_FL_DISCARD_DATA);
518
519         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
520         if (req == NULL) {
521                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
522                 RETURN(-ENOMEM);
523         }
524
525         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
526                                0, &cancels, count);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
538
539         ptlrpc_request_set_replen(req);
540
541         req->rq_interpret_reply = osc_destroy_interpret;
542         if (!osc_can_send_destroy(cli)) {
543                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
544
545                 /*
546                  * Wait until the number of on-going destroy RPCs drops
547                  * under max_rpc_in_flight
548                  */
549                 l_wait_event_exclusive(cli->cl_destroy_waitq,
550                                        osc_can_send_destroy(cli), &lwi);
551         }
552
553         /* Do not wait for response */
554         ptlrpcd_add_req(req);
555         RETURN(0);
556 }
557
558 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
559                                 long writing_bytes)
560 {
561         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
562
563         LASSERT(!(oa->o_valid & bits));
564
565         oa->o_valid |= bits;
566         spin_lock(&cli->cl_loi_list_lock);
567         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
568         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
569                      cli->cl_dirty_max_pages)) {
570                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
571                        cli->cl_dirty_pages, cli->cl_dirty_transit,
572                        cli->cl_dirty_max_pages);
573                 oa->o_undirty = 0;
574         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
575                             atomic_long_read(&obd_dirty_transit_pages) >
576                             (obd_max_dirty_pages + 1))) {
577                 /* The atomic_read() allowing the atomic_inc() are
578                  * not covered by a lock thus they may safely race and trip
579                  * this CERROR() unless we add in a small fudge factor (+1). */
580                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
581                        cli->cl_import->imp_obd->obd_name,
582                        atomic_long_read(&obd_dirty_pages),
583                        atomic_long_read(&obd_dirty_transit_pages),
584                        obd_max_dirty_pages);
585                 oa->o_undirty = 0;
586         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
587                             0x7fffffff)) {
588                 CERROR("dirty %lu - dirty_max %lu too big???\n",
589                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
590                 oa->o_undirty = 0;
591         } else {
592                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
593                                       PAGE_CACHE_SHIFT) *
594                                      (cli->cl_max_rpcs_in_flight + 1);
595                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
596                                     max_in_flight);
597         }
598         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
599         oa->o_dropped = cli->cl_lost_grant;
600         cli->cl_lost_grant = 0;
601         spin_unlock(&cli->cl_loi_list_lock);
602         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
603                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
604
605 }
606
607 void osc_update_next_shrink(struct client_obd *cli)
608 {
609         cli->cl_next_shrink_grant =
610                 cfs_time_shift(cli->cl_grant_shrink_interval);
611         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
612                cli->cl_next_shrink_grant);
613 }
614
615 static void __osc_update_grant(struct client_obd *cli, u64 grant)
616 {
617         spin_lock(&cli->cl_loi_list_lock);
618         cli->cl_avail_grant += grant;
619         spin_unlock(&cli->cl_loi_list_lock);
620 }
621
622 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
623 {
624         if (body->oa.o_valid & OBD_MD_FLGRANT) {
625                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
626                 __osc_update_grant(cli, body->oa.o_grant);
627         }
628 }
629
630 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
631                               u32 keylen, void *key,
632                               u32 vallen, void *val,
633                               struct ptlrpc_request_set *set);
634
635 static int osc_shrink_grant_interpret(const struct lu_env *env,
636                                       struct ptlrpc_request *req,
637                                       void *aa, int rc)
638 {
639         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
640         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
641         struct ost_body *body;
642
643         if (rc != 0) {
644                 __osc_update_grant(cli, oa->o_grant);
645                 GOTO(out, rc);
646         }
647
648         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
649         LASSERT(body);
650         osc_update_grant(cli, body);
651 out:
652         OBDO_FREE(oa);
653         return rc;
654 }
655
656 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
657 {
658         spin_lock(&cli->cl_loi_list_lock);
659         oa->o_grant = cli->cl_avail_grant / 4;
660         cli->cl_avail_grant -= oa->o_grant;
661         spin_unlock(&cli->cl_loi_list_lock);
662         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
663                 oa->o_valid |= OBD_MD_FLFLAGS;
664                 oa->o_flags = 0;
665         }
666         oa->o_flags |= OBD_FL_SHRINK_GRANT;
667         osc_update_next_shrink(cli);
668 }
669
670 /* Shrink the current grant, either from some large amount to enough for a
671  * full set of in-flight RPCs, or if we have already shrunk to that limit
672  * then to enough for a single RPC.  This avoids keeping more grant than
673  * needed, and avoids shrinking the grant piecemeal. */
674 static int osc_shrink_grant(struct client_obd *cli)
675 {
676         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
677                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
678
679         spin_lock(&cli->cl_loi_list_lock);
680         if (cli->cl_avail_grant <= target_bytes)
681                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
682         spin_unlock(&cli->cl_loi_list_lock);
683
684         return osc_shrink_grant_to_target(cli, target_bytes);
685 }
686
687 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
688 {
689         int                     rc = 0;
690         struct ost_body        *body;
691         ENTRY;
692
693         spin_lock(&cli->cl_loi_list_lock);
694         /* Don't shrink if we are already above or below the desired limit
695          * We don't want to shrink below a single RPC, as that will negatively
696          * impact block allocation and long-term performance. */
697         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
698                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
699
700         if (target_bytes >= cli->cl_avail_grant) {
701                 spin_unlock(&cli->cl_loi_list_lock);
702                 RETURN(0);
703         }
704         spin_unlock(&cli->cl_loi_list_lock);
705
706         OBD_ALLOC_PTR(body);
707         if (!body)
708                 RETURN(-ENOMEM);
709
710         osc_announce_cached(cli, &body->oa, 0);
711
712         spin_lock(&cli->cl_loi_list_lock);
713         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
714         cli->cl_avail_grant = target_bytes;
715         spin_unlock(&cli->cl_loi_list_lock);
716         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
717                 body->oa.o_valid |= OBD_MD_FLFLAGS;
718                 body->oa.o_flags = 0;
719         }
720         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
721         osc_update_next_shrink(cli);
722
723         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
724                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
725                                 sizeof(*body), body, NULL);
726         if (rc != 0)
727                 __osc_update_grant(cli, body->oa.o_grant);
728         OBD_FREE_PTR(body);
729         RETURN(rc);
730 }
731
732 static int osc_should_shrink_grant(struct client_obd *client)
733 {
734         cfs_time_t time = cfs_time_current();
735         cfs_time_t next_shrink = client->cl_next_shrink_grant;
736
737         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
738              OBD_CONNECT_GRANT_SHRINK) == 0)
739                 return 0;
740
741         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
742                 /* Get the current RPC size directly, instead of going via:
743                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
744                  * Keep comment here so that it can be found by searching. */
745                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
746
747                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
748                     client->cl_avail_grant > brw_size)
749                         return 1;
750                 else
751                         osc_update_next_shrink(client);
752         }
753         return 0;
754 }
755
756 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
757 {
758         struct client_obd *client;
759
760         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
761                 if (osc_should_shrink_grant(client))
762                         osc_shrink_grant(client);
763         }
764         return 0;
765 }
766
767 static int osc_add_shrink_grant(struct client_obd *client)
768 {
769         int rc;
770
771         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
772                                        TIMEOUT_GRANT,
773                                        osc_grant_shrink_grant_cb, NULL,
774                                        &client->cl_grant_shrink_list);
775         if (rc) {
776                 CERROR("add grant client %s error %d\n",
777                         client->cl_import->imp_obd->obd_name, rc);
778                 return rc;
779         }
780         CDEBUG(D_CACHE, "add grant client %s \n",
781                client->cl_import->imp_obd->obd_name);
782         osc_update_next_shrink(client);
783         return 0;
784 }
785
786 static int osc_del_shrink_grant(struct client_obd *client)
787 {
788         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
789                                          TIMEOUT_GRANT);
790 }
791
792 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
793 {
794         /*
795          * ocd_grant is the total grant amount we're expect to hold: if we've
796          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
797          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
798          * dirty.
799          *
800          * race is tolerable here: if we're evicted, but imp_state already
801          * left EVICTED state, then cl_dirty_pages must be 0 already.
802          */
803         spin_lock(&cli->cl_loi_list_lock);
804         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
805                 cli->cl_avail_grant = ocd->ocd_grant;
806         else
807                 cli->cl_avail_grant = ocd->ocd_grant -
808                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
809
810         if (cli->cl_avail_grant < 0) {
811                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
812                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
813                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
814                 /* workaround for servers which do not have the patch from
815                  * LU-2679 */
816                 cli->cl_avail_grant = ocd->ocd_grant;
817         }
818
819         /* determine the appropriate chunk size used by osc_extent. */
820         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
821         spin_unlock(&cli->cl_loi_list_lock);
822
823         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
824                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
825                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
826
827         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
828             list_empty(&cli->cl_grant_shrink_list))
829                 osc_add_shrink_grant(cli);
830 }
831
832 /* We assume that the reason this OSC got a short read is because it read
833  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
834  * via the LOV, and it _knows_ it's reading inside the file, it's just that
835  * this stripe never got written at or beyond this stripe offset yet. */
836 static void handle_short_read(int nob_read, size_t page_count,
837                               struct brw_page **pga)
838 {
839         char *ptr;
840         int i = 0;
841
842         /* skip bytes read OK */
843         while (nob_read > 0) {
844                 LASSERT (page_count > 0);
845
846                 if (pga[i]->count > nob_read) {
847                         /* EOF inside this page */
848                         ptr = kmap(pga[i]->pg) +
849                                 (pga[i]->off & ~PAGE_MASK);
850                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
851                         kunmap(pga[i]->pg);
852                         page_count--;
853                         i++;
854                         break;
855                 }
856
857                 nob_read -= pga[i]->count;
858                 page_count--;
859                 i++;
860         }
861
862         /* zero remaining pages */
863         while (page_count-- > 0) {
864                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
865                 memset(ptr, 0, pga[i]->count);
866                 kunmap(pga[i]->pg);
867                 i++;
868         }
869 }
870
871 static int check_write_rcs(struct ptlrpc_request *req,
872                            int requested_nob, int niocount,
873                            size_t page_count, struct brw_page **pga)
874 {
875         int     i;
876         __u32   *remote_rcs;
877
878         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
879                                                   sizeof(*remote_rcs) *
880                                                   niocount);
881         if (remote_rcs == NULL) {
882                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
883                 return(-EPROTO);
884         }
885
886         /* return error if any niobuf was in error */
887         for (i = 0; i < niocount; i++) {
888                 if ((int)remote_rcs[i] < 0)
889                         return(remote_rcs[i]);
890
891                 if (remote_rcs[i] != 0) {
892                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
893                                 i, remote_rcs[i], req);
894                         return(-EPROTO);
895                 }
896         }
897
898         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
899                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
900                        req->rq_bulk->bd_nob_transferred, requested_nob);
901                 return(-EPROTO);
902         }
903
904         return (0);
905 }
906
907 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
908 {
909         if (p1->flag != p2->flag) {
910                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
911                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
912                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
913
914                 /* warn if we try to combine flags that we don't know to be
915                  * safe to combine */
916                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
917                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
918                               "report this at https://jira.hpdd.intel.com/\n",
919                               p1->flag, p2->flag);
920                 }
921                 return 0;
922         }
923
924         return (p1->off + p1->count == p2->off);
925 }
926
927 static u32 osc_checksum_bulk(int nob, size_t pg_count,
928                              struct brw_page **pga, int opc,
929                              cksum_type_t cksum_type)
930 {
931         u32                             cksum;
932         int                             i = 0;
933         struct cfs_crypto_hash_desc     *hdesc;
934         unsigned int                    bufsize;
935         int                             err;
936         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
937
938         LASSERT(pg_count > 0);
939
940         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
941         if (IS_ERR(hdesc)) {
942                 CERROR("Unable to initialize checksum hash %s\n",
943                        cfs_crypto_hash_name(cfs_alg));
944                 return PTR_ERR(hdesc);
945         }
946
947         while (nob > 0 && pg_count > 0) {
948                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
949
950                 /* corrupt the data before we compute the checksum, to
951                  * simulate an OST->client data error */
952                 if (i == 0 && opc == OST_READ &&
953                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
954                         unsigned char *ptr = kmap(pga[i]->pg);
955                         int off = pga[i]->off & ~PAGE_MASK;
956
957                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
958                         kunmap(pga[i]->pg);
959                 }
960                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
961                                             pga[i]->off & ~PAGE_MASK,
962                                             count);
963                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
964                                (int)(pga[i]->off & ~PAGE_MASK));
965
966                 nob -= pga[i]->count;
967                 pg_count--;
968                 i++;
969         }
970
971         bufsize = sizeof(cksum);
972         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
973
974         /* For sending we only compute the wrong checksum instead
975          * of corrupting the data so it is still correct on a redo */
976         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
977                 cksum++;
978
979         return cksum;
980 }
981
982 static int
983 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
984                      u32 page_count, struct brw_page **pga,
985                      struct ptlrpc_request **reqp, int resend)
986 {
987         struct ptlrpc_request   *req;
988         struct ptlrpc_bulk_desc *desc;
989         struct ost_body         *body;
990         struct obd_ioobj        *ioobj;
991         struct niobuf_remote    *niobuf;
992         int niocount, i, requested_nob, opc, rc;
993         struct osc_brw_async_args *aa;
994         struct req_capsule      *pill;
995         struct brw_page *pg_prev;
996
997         ENTRY;
998         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
999                 RETURN(-ENOMEM); /* Recoverable */
1000         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1001                 RETURN(-EINVAL); /* Fatal */
1002
1003         if ((cmd & OBD_BRW_WRITE) != 0) {
1004                 opc = OST_WRITE;
1005                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1006                                                 cli->cl_import->imp_rq_pool,
1007                                                 &RQF_OST_BRW_WRITE);
1008         } else {
1009                 opc = OST_READ;
1010                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1011         }
1012         if (req == NULL)
1013                 RETURN(-ENOMEM);
1014
1015         for (niocount = i = 1; i < page_count; i++) {
1016                 if (!can_merge_pages(pga[i - 1], pga[i]))
1017                         niocount++;
1018         }
1019
1020         pill = &req->rq_pill;
1021         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1022                              sizeof(*ioobj));
1023         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1024                              niocount * sizeof(*niobuf));
1025
1026         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1027         if (rc) {
1028                 ptlrpc_request_free(req);
1029                 RETURN(rc);
1030         }
1031         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1032         ptlrpc_at_set_req_timeout(req);
1033         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1034          * retry logic */
1035         req->rq_no_retry_einprogress = 1;
1036
1037         desc = ptlrpc_prep_bulk_imp(req, page_count,
1038                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1039                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1040                         PTLRPC_BULK_PUT_SINK) |
1041                         PTLRPC_BULK_BUF_KIOV,
1042                 OST_BULK_PORTAL,
1043                 &ptlrpc_bulk_kiov_pin_ops);
1044
1045         if (desc == NULL)
1046                 GOTO(out, rc = -ENOMEM);
1047         /* NB request now owns desc and will free it when it gets freed */
1048
1049         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1050         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1051         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1052         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1053
1054         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1055
1056         obdo_to_ioobj(oa, ioobj);
1057         ioobj->ioo_bufcnt = niocount;
1058         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1059          * that might be send for this request.  The actual number is decided
1060          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1061          * "max - 1" for old client compatibility sending "0", and also so the
1062          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1063         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1064         LASSERT(page_count > 0);
1065         pg_prev = pga[0];
1066         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1067                 struct brw_page *pg = pga[i];
1068                 int poff = pg->off & ~PAGE_MASK;
1069
1070                 LASSERT(pg->count > 0);
1071                 /* make sure there is no gap in the middle of page array */
1072                 LASSERTF(page_count == 1 ||
1073                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1074                           ergo(i > 0 && i < page_count - 1,
1075                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1076                           ergo(i == page_count - 1, poff == 0)),
1077                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1078                          i, page_count, pg, pg->off, pg->count);
1079                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1080                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1081                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1082                          i, page_count,
1083                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1084                          pg_prev->pg, page_private(pg_prev->pg),
1085                          pg_prev->pg->index, pg_prev->off);
1086                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1087                         (pg->flag & OBD_BRW_SRVLOCK));
1088
1089                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1090                 requested_nob += pg->count;
1091
1092                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1093                         niobuf--;
1094                         niobuf->rnb_len += pg->count;
1095                 } else {
1096                         niobuf->rnb_offset = pg->off;
1097                         niobuf->rnb_len    = pg->count;
1098                         niobuf->rnb_flags  = pg->flag;
1099                 }
1100                 pg_prev = pg;
1101         }
1102
1103         LASSERTF((void *)(niobuf - niocount) ==
1104                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1105                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1106                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1107
1108         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1109         if (resend) {
1110                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1111                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1112                         body->oa.o_flags = 0;
1113                 }
1114                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1115         }
1116
1117         if (osc_should_shrink_grant(cli))
1118                 osc_shrink_grant_local(cli, &body->oa);
1119
1120         /* size[REQ_REC_OFF] still sizeof (*body) */
1121         if (opc == OST_WRITE) {
1122                 if (cli->cl_checksum &&
1123                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1124                         /* store cl_cksum_type in a local variable since
1125                          * it can be changed via lprocfs */
1126                         cksum_type_t cksum_type = cli->cl_cksum_type;
1127
1128                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1129                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1130                                 body->oa.o_flags = 0;
1131                         }
1132                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1133                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1134                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1135                                                              page_count, pga,
1136                                                              OST_WRITE,
1137                                                              cksum_type);
1138                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1139                                body->oa.o_cksum);
1140                         /* save this in 'oa', too, for later checking */
1141                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1142                         oa->o_flags |= cksum_type_pack(cksum_type);
1143                 } else {
1144                         /* clear out the checksum flag, in case this is a
1145                          * resend but cl_checksum is no longer set. b=11238 */
1146                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1147                 }
1148                 oa->o_cksum = body->oa.o_cksum;
1149                 /* 1 RC per niobuf */
1150                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1151                                      sizeof(__u32) * niocount);
1152         } else {
1153                 if (cli->cl_checksum &&
1154                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1155                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1156                                 body->oa.o_flags = 0;
1157                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1158                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1159                 }
1160         }
1161         ptlrpc_request_set_replen(req);
1162
1163         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1164         aa = ptlrpc_req_async_args(req);
1165         aa->aa_oa = oa;
1166         aa->aa_requested_nob = requested_nob;
1167         aa->aa_nio_count = niocount;
1168         aa->aa_page_count = page_count;
1169         aa->aa_resends = 0;
1170         aa->aa_ppga = pga;
1171         aa->aa_cli = cli;
1172         INIT_LIST_HEAD(&aa->aa_oaps);
1173
1174         *reqp = req;
1175         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1176         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1177                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1178                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1179         RETURN(0);
1180
1181  out:
1182         ptlrpc_req_finished(req);
1183         RETURN(rc);
1184 }
1185
1186 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1187                                 __u32 client_cksum, __u32 server_cksum, int nob,
1188                                 size_t page_count, struct brw_page **pga,
1189                                 cksum_type_t client_cksum_type)
1190 {
1191         __u32 new_cksum;
1192         char *msg;
1193         cksum_type_t cksum_type;
1194
1195         if (server_cksum == client_cksum) {
1196                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1197                 return 0;
1198         }
1199
1200         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1201                                        oa->o_flags : 0);
1202         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1203                                       cksum_type);
1204
1205         if (cksum_type != client_cksum_type)
1206                 msg = "the server did not use the checksum type specified in "
1207                       "the original request - likely a protocol problem";
1208         else if (new_cksum == server_cksum)
1209                 msg = "changed on the client after we checksummed it - "
1210                       "likely false positive due to mmap IO (bug 11742)";
1211         else if (new_cksum == client_cksum)
1212                 msg = "changed in transit before arrival at OST";
1213         else
1214                 msg = "changed in transit AND doesn't match the original - "
1215                       "likely false positive due to mmap IO (bug 11742)";
1216
1217         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1218                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1219                            msg, libcfs_nid2str(peer->nid),
1220                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1221                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1222                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1223                            POSTID(&oa->o_oi), pga[0]->off,
1224                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1225         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1226                "client csum now %x\n", client_cksum, client_cksum_type,
1227                server_cksum, cksum_type, new_cksum);
1228         return 1;
1229 }
1230
1231 /* Note rc enters this function as number of bytes transferred */
1232 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1233 {
1234         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1235         const lnet_process_id_t *peer =
1236                         &req->rq_import->imp_connection->c_peer;
1237         struct client_obd *cli = aa->aa_cli;
1238         struct ost_body *body;
1239         u32 client_cksum = 0;
1240         ENTRY;
1241
1242         if (rc < 0 && rc != -EDQUOT) {
1243                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1244                 RETURN(rc);
1245         }
1246
1247         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1248         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1249         if (body == NULL) {
1250                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1251                 RETURN(-EPROTO);
1252         }
1253
1254         /* set/clear over quota flag for a uid/gid */
1255         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1256             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1257                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1258
1259                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1260                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1261                        body->oa.o_flags);
1262                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1263         }
1264
1265         osc_update_grant(cli, body);
1266
1267         if (rc < 0)
1268                 RETURN(rc);
1269
1270         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1271                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1272
1273         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1274                 if (rc > 0) {
1275                         CERROR("Unexpected +ve rc %d\n", rc);
1276                         RETURN(-EPROTO);
1277                 }
1278                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1279
1280                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1281                         RETURN(-EAGAIN);
1282
1283                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1284                     check_write_checksum(&body->oa, peer, client_cksum,
1285                                          body->oa.o_cksum, aa->aa_requested_nob,
1286                                          aa->aa_page_count, aa->aa_ppga,
1287                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1288                         RETURN(-EAGAIN);
1289
1290                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1291                                      aa->aa_page_count, aa->aa_ppga);
1292                 GOTO(out, rc);
1293         }
1294
1295         /* The rest of this function executes only for OST_READs */
1296
1297         /* if unwrap_bulk failed, return -EAGAIN to retry */
1298         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1299         if (rc < 0)
1300                 GOTO(out, rc = -EAGAIN);
1301
1302         if (rc > aa->aa_requested_nob) {
1303                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1304                        aa->aa_requested_nob);
1305                 RETURN(-EPROTO);
1306         }
1307
1308         if (rc != req->rq_bulk->bd_nob_transferred) {
1309                 CERROR ("Unexpected rc %d (%d transferred)\n",
1310                         rc, req->rq_bulk->bd_nob_transferred);
1311                 return (-EPROTO);
1312         }
1313
1314         if (rc < aa->aa_requested_nob)
1315                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1316
1317         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1318                 static int cksum_counter;
1319                 u32        server_cksum = body->oa.o_cksum;
1320                 char      *via = "";
1321                 char      *router = "";
1322                 cksum_type_t cksum_type;
1323
1324                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1325                                                body->oa.o_flags : 0);
1326                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1327                                                  aa->aa_ppga, OST_READ,
1328                                                  cksum_type);
1329
1330                 if (peer->nid != req->rq_bulk->bd_sender) {
1331                         via = " via ";
1332                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1333                 }
1334
1335                 if (server_cksum != client_cksum) {
1336                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1337                                            "%s%s%s inode "DFID" object "DOSTID
1338                                            " extent ["LPU64"-"LPU64"]\n",
1339                                            req->rq_import->imp_obd->obd_name,
1340                                            libcfs_nid2str(peer->nid),
1341                                            via, router,
1342                                            body->oa.o_valid & OBD_MD_FLFID ?
1343                                                 body->oa.o_parent_seq : (__u64)0,
1344                                            body->oa.o_valid & OBD_MD_FLFID ?
1345                                                 body->oa.o_parent_oid : 0,
1346                                            body->oa.o_valid & OBD_MD_FLFID ?
1347                                                 body->oa.o_parent_ver : 0,
1348                                            POSTID(&body->oa.o_oi),
1349                                            aa->aa_ppga[0]->off,
1350                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1351                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1352                                                                         1);
1353                         CERROR("client %x, server %x, cksum_type %x\n",
1354                                client_cksum, server_cksum, cksum_type);
1355                         cksum_counter = 0;
1356                         aa->aa_oa->o_cksum = client_cksum;
1357                         rc = -EAGAIN;
1358                 } else {
1359                         cksum_counter++;
1360                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1361                         rc = 0;
1362                 }
1363         } else if (unlikely(client_cksum)) {
1364                 static int cksum_missed;
1365
1366                 cksum_missed++;
1367                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1368                         CERROR("Checksum %u requested from %s but not sent\n",
1369                                cksum_missed, libcfs_nid2str(peer->nid));
1370         } else {
1371                 rc = 0;
1372         }
1373 out:
1374         if (rc >= 0)
1375                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1376                                      aa->aa_oa, &body->oa);
1377
1378         RETURN(rc);
1379 }
1380
1381 static int osc_brw_redo_request(struct ptlrpc_request *request,
1382                                 struct osc_brw_async_args *aa, int rc)
1383 {
1384         struct ptlrpc_request *new_req;
1385         struct osc_brw_async_args *new_aa;
1386         struct osc_async_page *oap;
1387         ENTRY;
1388
1389         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1390                   "redo for recoverable error %d", rc);
1391
1392         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1393                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1394                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1395                                   aa->aa_ppga, &new_req, 1);
1396         if (rc)
1397                 RETURN(rc);
1398
1399         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1400                 if (oap->oap_request != NULL) {
1401                         LASSERTF(request == oap->oap_request,
1402                                  "request %p != oap_request %p\n",
1403                                  request, oap->oap_request);
1404                         if (oap->oap_interrupted) {
1405                                 ptlrpc_req_finished(new_req);
1406                                 RETURN(-EINTR);
1407                         }
1408                 }
1409         }
1410         /* New request takes over pga and oaps from old request.
1411          * Note that copying a list_head doesn't work, need to move it... */
1412         aa->aa_resends++;
1413         new_req->rq_interpret_reply = request->rq_interpret_reply;
1414         new_req->rq_async_args = request->rq_async_args;
1415         new_req->rq_commit_cb = request->rq_commit_cb;
1416         /* cap resend delay to the current request timeout, this is similar to
1417          * what ptlrpc does (see after_reply()) */
1418         if (aa->aa_resends > new_req->rq_timeout)
1419                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1420         else
1421                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1422         new_req->rq_generation_set = 1;
1423         new_req->rq_import_generation = request->rq_import_generation;
1424
1425         new_aa = ptlrpc_req_async_args(new_req);
1426
1427         INIT_LIST_HEAD(&new_aa->aa_oaps);
1428         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1429         INIT_LIST_HEAD(&new_aa->aa_exts);
1430         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1431         new_aa->aa_resends = aa->aa_resends;
1432
1433         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1434                 if (oap->oap_request) {
1435                         ptlrpc_req_finished(oap->oap_request);
1436                         oap->oap_request = ptlrpc_request_addref(new_req);
1437                 }
1438         }
1439
1440         /* XXX: This code will run into problem if we're going to support
1441          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1442          * and wait for all of them to be finished. We should inherit request
1443          * set from old request. */
1444         ptlrpcd_add_req(new_req);
1445
1446         DEBUG_REQ(D_INFO, new_req, "new request");
1447         RETURN(0);
1448 }
1449
1450 /*
1451  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1452  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1453  * fine for our small page arrays and doesn't require allocation.  its an
1454  * insertion sort that swaps elements that are strides apart, shrinking the
1455  * stride down until its '1' and the array is sorted.
1456  */
1457 static void sort_brw_pages(struct brw_page **array, int num)
1458 {
1459         int stride, i, j;
1460         struct brw_page *tmp;
1461
1462         if (num == 1)
1463                 return;
1464         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1465                 ;
1466
1467         do {
1468                 stride /= 3;
1469                 for (i = stride ; i < num ; i++) {
1470                         tmp = array[i];
1471                         j = i;
1472                         while (j >= stride && array[j - stride]->off > tmp->off) {
1473                                 array[j] = array[j - stride];
1474                                 j -= stride;
1475                         }
1476                         array[j] = tmp;
1477                 }
1478         } while (stride > 1);
1479 }
1480
1481 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1482 {
1483         LASSERT(ppga != NULL);
1484         OBD_FREE(ppga, sizeof(*ppga) * count);
1485 }
1486
1487 static int brw_interpret(const struct lu_env *env,
1488                          struct ptlrpc_request *req, void *data, int rc)
1489 {
1490         struct osc_brw_async_args *aa = data;
1491         struct osc_extent *ext;
1492         struct osc_extent *tmp;
1493         struct client_obd *cli = aa->aa_cli;
1494         ENTRY;
1495
1496         rc = osc_brw_fini_request(req, rc);
1497         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1498         /* When server return -EINPROGRESS, client should always retry
1499          * regardless of the number of times the bulk was resent already. */
1500         if (osc_recoverable_error(rc)) {
1501                 if (req->rq_import_generation !=
1502                     req->rq_import->imp_generation) {
1503                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1504                                ""DOSTID", rc = %d.\n",
1505                                req->rq_import->imp_obd->obd_name,
1506                                POSTID(&aa->aa_oa->o_oi), rc);
1507                 } else if (rc == -EINPROGRESS ||
1508                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1509                         rc = osc_brw_redo_request(req, aa, rc);
1510                 } else {
1511                         CERROR("%s: too many resent retries for object: "
1512                                ""LPU64":"LPU64", rc = %d.\n",
1513                                req->rq_import->imp_obd->obd_name,
1514                                POSTID(&aa->aa_oa->o_oi), rc);
1515                 }
1516
1517                 if (rc == 0)
1518                         RETURN(0);
1519                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1520                         rc = -EIO;
1521         }
1522
1523         if (rc == 0) {
1524                 struct obdo *oa = aa->aa_oa;
1525                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1526                 unsigned long valid = 0;
1527                 struct cl_object *obj;
1528                 struct osc_async_page *last;
1529
1530                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1531                 obj = osc2cl(last->oap_obj);
1532
1533                 cl_object_attr_lock(obj);
1534                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1535                         attr->cat_blocks = oa->o_blocks;
1536                         valid |= CAT_BLOCKS;
1537                 }
1538                 if (oa->o_valid & OBD_MD_FLMTIME) {
1539                         attr->cat_mtime = oa->o_mtime;
1540                         valid |= CAT_MTIME;
1541                 }
1542                 if (oa->o_valid & OBD_MD_FLATIME) {
1543                         attr->cat_atime = oa->o_atime;
1544                         valid |= CAT_ATIME;
1545                 }
1546                 if (oa->o_valid & OBD_MD_FLCTIME) {
1547                         attr->cat_ctime = oa->o_ctime;
1548                         valid |= CAT_CTIME;
1549                 }
1550
1551                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1552                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1553                         loff_t last_off = last->oap_count + last->oap_obj_off +
1554                                 last->oap_page_off;
1555
1556                         /* Change file size if this is an out of quota or
1557                          * direct IO write and it extends the file size */
1558                         if (loi->loi_lvb.lvb_size < last_off) {
1559                                 attr->cat_size = last_off;
1560                                 valid |= CAT_SIZE;
1561                         }
1562                         /* Extend KMS if it's not a lockless write */
1563                         if (loi->loi_kms < last_off &&
1564                             oap2osc_page(last)->ops_srvlock == 0) {
1565                                 attr->cat_kms = last_off;
1566                                 valid |= CAT_KMS;
1567                         }
1568                 }
1569
1570                 if (valid != 0)
1571                         cl_object_attr_update(env, obj, attr, valid);
1572                 cl_object_attr_unlock(obj);
1573         }
1574         OBDO_FREE(aa->aa_oa);
1575
1576         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1577                 osc_inc_unstable_pages(req);
1578
1579         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1580                 list_del_init(&ext->oe_link);
1581                 osc_extent_finish(env, ext, 1, rc);
1582         }
1583         LASSERT(list_empty(&aa->aa_exts));
1584         LASSERT(list_empty(&aa->aa_oaps));
1585
1586         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1587                           req->rq_bulk->bd_nob_transferred);
1588         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1589         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1590
1591         spin_lock(&cli->cl_loi_list_lock);
1592         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1593          * is called so we know whether to go to sync BRWs or wait for more
1594          * RPCs to complete */
1595         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1596                 cli->cl_w_in_flight--;
1597         else
1598                 cli->cl_r_in_flight--;
1599         osc_wake_cache_waiters(cli);
1600         spin_unlock(&cli->cl_loi_list_lock);
1601
1602         osc_io_unplug(env, cli, NULL);
1603         RETURN(rc);
1604 }
1605
1606 static void brw_commit(struct ptlrpc_request *req)
1607 {
1608         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1609          * this called via the rq_commit_cb, I need to ensure
1610          * osc_dec_unstable_pages is still called. Otherwise unstable
1611          * pages may be leaked. */
1612         spin_lock(&req->rq_lock);
1613         if (likely(req->rq_unstable)) {
1614                 req->rq_unstable = 0;
1615                 spin_unlock(&req->rq_lock);
1616
1617                 osc_dec_unstable_pages(req);
1618         } else {
1619                 req->rq_committed = 1;
1620                 spin_unlock(&req->rq_lock);
1621         }
1622 }
1623
1624 /**
1625  * Build an RPC by the list of extent @ext_list. The caller must ensure
1626  * that the total pages in this list are NOT over max pages per RPC.
1627  * Extents in the list must be in OES_RPC state.
1628  */
1629 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1630                   struct list_head *ext_list, int cmd)
1631 {
1632         struct ptlrpc_request           *req = NULL;
1633         struct osc_extent               *ext;
1634         struct brw_page                 **pga = NULL;
1635         struct osc_brw_async_args       *aa = NULL;
1636         struct obdo                     *oa = NULL;
1637         struct osc_async_page           *oap;
1638         struct osc_async_page           *tmp;
1639         struct cl_req                   *clerq = NULL;
1640         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1641                                                                       CRT_READ;
1642         struct cl_req_attr              *crattr = NULL;
1643         loff_t                          starting_offset = OBD_OBJECT_EOF;
1644         loff_t                          ending_offset = 0;
1645         int                             mpflag = 0;
1646         int                             mem_tight = 0;
1647         int                             page_count = 0;
1648         bool                            soft_sync = false;
1649         int                             i;
1650         int                             rc;
1651         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1652         struct ost_body                 *body;
1653         ENTRY;
1654         LASSERT(!list_empty(ext_list));
1655
1656         /* add pages into rpc_list to build BRW rpc */
1657         list_for_each_entry(ext, ext_list, oe_link) {
1658                 LASSERT(ext->oe_state == OES_RPC);
1659                 mem_tight |= ext->oe_memalloc;
1660                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1661                         ++page_count;
1662                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1663                         if (starting_offset == OBD_OBJECT_EOF ||
1664                             starting_offset > oap->oap_obj_off)
1665                                 starting_offset = oap->oap_obj_off;
1666                         else
1667                                 LASSERT(oap->oap_page_off == 0);
1668                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1669                                 ending_offset = oap->oap_obj_off +
1670                                                 oap->oap_count;
1671                         else
1672                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1673                                         PAGE_CACHE_SIZE);
1674                 }
1675         }
1676
1677         soft_sync = osc_over_unstable_soft_limit(cli);
1678         if (mem_tight)
1679                 mpflag = cfs_memory_pressure_get_and_set();
1680
1681         OBD_ALLOC(crattr, sizeof(*crattr));
1682         if (crattr == NULL)
1683                 GOTO(out, rc = -ENOMEM);
1684
1685         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1686         if (pga == NULL)
1687                 GOTO(out, rc = -ENOMEM);
1688
1689         OBDO_ALLOC(oa);
1690         if (oa == NULL)
1691                 GOTO(out, rc = -ENOMEM);
1692
1693         i = 0;
1694         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1695                 struct cl_page *page = oap2cl_page(oap);
1696                 if (clerq == NULL) {
1697                         clerq = cl_req_alloc(env, page, crt,
1698                                              1 /* only 1-object rpcs for now */);
1699                         if (IS_ERR(clerq))
1700                                 GOTO(out, rc = PTR_ERR(clerq));
1701                 }
1702                 if (mem_tight)
1703                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1704                 if (soft_sync)
1705                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1706                 pga[i] = &oap->oap_brw_page;
1707                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1708                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1709                        pga[i]->pg, page_index(oap->oap_page), oap,
1710                        pga[i]->flag);
1711                 i++;
1712                 cl_req_page_add(env, clerq, page);
1713         }
1714
1715         /* always get the data for the obdo for the rpc */
1716         LASSERT(clerq != NULL);
1717         crattr->cra_oa = oa;
1718         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1719
1720         rc = cl_req_prep(env, clerq);
1721         if (rc != 0) {
1722                 CERROR("cl_req_prep failed: %d\n", rc);
1723                 GOTO(out, rc);
1724         }
1725
1726         sort_brw_pages(pga, page_count);
1727         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1728         if (rc != 0) {
1729                 CERROR("prep_req failed: %d\n", rc);
1730                 GOTO(out, rc);
1731         }
1732
1733         req->rq_commit_cb = brw_commit;
1734         req->rq_interpret_reply = brw_interpret;
1735
1736         if (mem_tight != 0)
1737                 req->rq_memalloc = 1;
1738
1739         /* Need to update the timestamps after the request is built in case
1740          * we race with setattr (locally or in queue at OST).  If OST gets
1741          * later setattr before earlier BRW (as determined by the request xid),
1742          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1743          * way to do this in a single call.  bug 10150 */
1744         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1745         crattr->cra_oa = &body->oa;
1746         cl_req_attr_set(env, clerq, crattr,
1747                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1748
1749         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1750
1751         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1752         aa = ptlrpc_req_async_args(req);
1753         INIT_LIST_HEAD(&aa->aa_oaps);
1754         list_splice_init(&rpc_list, &aa->aa_oaps);
1755         INIT_LIST_HEAD(&aa->aa_exts);
1756         list_splice_init(ext_list, &aa->aa_exts);
1757         aa->aa_clerq = clerq;
1758
1759         /* queued sync pages can be torn down while the pages
1760          * were between the pending list and the rpc */
1761         tmp = NULL;
1762         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1763                 /* only one oap gets a request reference */
1764                 if (tmp == NULL)
1765                         tmp = oap;
1766                 if (oap->oap_interrupted && !req->rq_intr) {
1767                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1768                                         oap, req);
1769                         ptlrpc_mark_interrupted(req);
1770                 }
1771         }
1772         if (tmp != NULL)
1773                 tmp->oap_request = ptlrpc_request_addref(req);
1774
1775         spin_lock(&cli->cl_loi_list_lock);
1776         starting_offset >>= PAGE_CACHE_SHIFT;
1777         if (cmd == OBD_BRW_READ) {
1778                 cli->cl_r_in_flight++;
1779                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1780                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1781                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1782                                       starting_offset + 1);
1783         } else {
1784                 cli->cl_w_in_flight++;
1785                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1786                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1787                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1788                                       starting_offset + 1);
1789         }
1790         spin_unlock(&cli->cl_loi_list_lock);
1791
1792         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1793                   page_count, aa, cli->cl_r_in_flight,
1794                   cli->cl_w_in_flight);
1795
1796         ptlrpcd_add_req(req);
1797         rc = 0;
1798         EXIT;
1799
1800 out:
1801         if (mem_tight != 0)
1802                 cfs_memory_pressure_restore(mpflag);
1803
1804         if (crattr != NULL)
1805                 OBD_FREE(crattr, sizeof(*crattr));
1806
1807         if (rc != 0) {
1808                 LASSERT(req == NULL);
1809
1810                 if (oa)
1811                         OBDO_FREE(oa);
1812                 if (pga)
1813                         OBD_FREE(pga, sizeof(*pga) * page_count);
1814                 /* this should happen rarely and is pretty bad, it makes the
1815                  * pending list not follow the dirty order */
1816                 while (!list_empty(ext_list)) {
1817                         ext = list_entry(ext_list->next, struct osc_extent,
1818                                          oe_link);
1819                         list_del_init(&ext->oe_link);
1820                         osc_extent_finish(env, ext, 0, rc);
1821                 }
1822                 if (clerq && !IS_ERR(clerq))
1823                         cl_req_completion(env, clerq, rc);
1824         }
1825         RETURN(rc);
1826 }
1827
1828 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1829                                         struct ldlm_enqueue_info *einfo)
1830 {
1831         void *data = einfo->ei_cbdata;
1832         int set = 0;
1833
1834         LASSERT(lock != NULL);
1835         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1836         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1837         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1838         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1839
1840         lock_res_and_lock(lock);
1841
1842         if (lock->l_ast_data == NULL)
1843                 lock->l_ast_data = data;
1844         if (lock->l_ast_data == data)
1845                 set = 1;
1846
1847         unlock_res_and_lock(lock);
1848
1849         return set;
1850 }
1851
1852 static int osc_set_data_with_check(struct lustre_handle *lockh,
1853                                    struct ldlm_enqueue_info *einfo)
1854 {
1855         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1856         int set = 0;
1857
1858         if (lock != NULL) {
1859                 set = osc_set_lock_data_with_check(lock, einfo);
1860                 LDLM_LOCK_PUT(lock);
1861         } else
1862                 CERROR("lockh %p, data %p - client evicted?\n",
1863                        lockh, einfo->ei_cbdata);
1864         return set;
1865 }
1866
1867 static int osc_enqueue_fini(struct ptlrpc_request *req,
1868                             osc_enqueue_upcall_f upcall, void *cookie,
1869                             struct lustre_handle *lockh, ldlm_mode_t mode,
1870                             __u64 *flags, int agl, int errcode)
1871 {
1872         bool intent = *flags & LDLM_FL_HAS_INTENT;
1873         int rc;
1874         ENTRY;
1875
1876         /* The request was created before ldlm_cli_enqueue call. */
1877         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1878                 struct ldlm_reply *rep;
1879
1880                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1881                 LASSERT(rep != NULL);
1882
1883                 rep->lock_policy_res1 =
1884                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1885                 if (rep->lock_policy_res1)
1886                         errcode = rep->lock_policy_res1;
1887                 if (!agl)
1888                         *flags |= LDLM_FL_LVB_READY;
1889         } else if (errcode == ELDLM_OK) {
1890                 *flags |= LDLM_FL_LVB_READY;
1891         }
1892
1893         /* Call the update callback. */
1894         rc = (*upcall)(cookie, lockh, errcode);
1895
1896         /* release the reference taken in ldlm_cli_enqueue() */
1897         if (errcode == ELDLM_LOCK_MATCHED)
1898                 errcode = ELDLM_OK;
1899         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1900                 ldlm_lock_decref(lockh, mode);
1901
1902         RETURN(rc);
1903 }
1904
1905 static int osc_enqueue_interpret(const struct lu_env *env,
1906                                  struct ptlrpc_request *req,
1907                                  struct osc_enqueue_args *aa, int rc)
1908 {
1909         struct ldlm_lock *lock;
1910         struct lustre_handle *lockh = &aa->oa_lockh;
1911         ldlm_mode_t mode = aa->oa_mode;
1912         struct ost_lvb *lvb = aa->oa_lvb;
1913         __u32 lvb_len = sizeof(*lvb);
1914         __u64 flags = 0;
1915
1916         ENTRY;
1917
1918         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1919          * be valid. */
1920         lock = ldlm_handle2lock(lockh);
1921         LASSERTF(lock != NULL,
1922                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1923                  lockh->cookie, req, aa);
1924
1925         /* Take an additional reference so that a blocking AST that
1926          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1927          * to arrive after an upcall has been executed by
1928          * osc_enqueue_fini(). */
1929         ldlm_lock_addref(lockh, mode);
1930
1931         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1932         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1933
1934         /* Let CP AST to grant the lock first. */
1935         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1936
1937         if (aa->oa_agl) {
1938                 LASSERT(aa->oa_lvb == NULL);
1939                 LASSERT(aa->oa_flags == NULL);
1940                 aa->oa_flags = &flags;
1941         }
1942
1943         /* Complete obtaining the lock procedure. */
1944         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1945                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1946                                    lockh, rc);
1947         /* Complete osc stuff. */
1948         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1949                               aa->oa_flags, aa->oa_agl, rc);
1950
1951         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1952
1953         ldlm_lock_decref(lockh, mode);
1954         LDLM_LOCK_PUT(lock);
1955         RETURN(rc);
1956 }
1957
1958 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1959
1960 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1961  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1962  * other synchronous requests, however keeping some locks and trying to obtain
1963  * others may take a considerable amount of time in a case of ost failure; and
1964  * when other sync requests do not get released lock from a client, the client
1965  * is evicted from the cluster -- such scenarious make the life difficult, so
1966  * release locks just after they are obtained. */
1967 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1968                      __u64 *flags, ldlm_policy_data_t *policy,
1969                      struct ost_lvb *lvb, int kms_valid,
1970                      osc_enqueue_upcall_f upcall, void *cookie,
1971                      struct ldlm_enqueue_info *einfo,
1972                      struct ptlrpc_request_set *rqset, int async, int agl)
1973 {
1974         struct obd_device *obd = exp->exp_obd;
1975         struct lustre_handle lockh = { 0 };
1976         struct ptlrpc_request *req = NULL;
1977         int intent = *flags & LDLM_FL_HAS_INTENT;
1978         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1979         ldlm_mode_t mode;
1980         int rc;
1981         ENTRY;
1982
1983         /* Filesystem lock extents are extended to page boundaries so that
1984          * dealing with the page cache is a little smoother.  */
1985         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1986         policy->l_extent.end |= ~PAGE_MASK;
1987
1988         /*
1989          * kms is not valid when either object is completely fresh (so that no
1990          * locks are cached), or object was evicted. In the latter case cached
1991          * lock cannot be used, because it would prime inode state with
1992          * potentially stale LVB.
1993          */
1994         if (!kms_valid)
1995                 goto no_match;
1996
1997         /* Next, search for already existing extent locks that will cover us */
1998         /* If we're trying to read, we also search for an existing PW lock.  The
1999          * VFS and page cache already protect us locally, so lots of readers/
2000          * writers can share a single PW lock.
2001          *
2002          * There are problems with conversion deadlocks, so instead of
2003          * converting a read lock to a write lock, we'll just enqueue a new
2004          * one.
2005          *
2006          * At some point we should cancel the read lock instead of making them
2007          * send us a blocking callback, but there are problems with canceling
2008          * locks out from other users right now, too. */
2009         mode = einfo->ei_mode;
2010         if (einfo->ei_mode == LCK_PR)
2011                 mode |= LCK_PW;
2012         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2013                                einfo->ei_type, policy, mode, &lockh, 0);
2014         if (mode) {
2015                 struct ldlm_lock *matched;
2016
2017                 if (*flags & LDLM_FL_TEST_LOCK)
2018                         RETURN(ELDLM_OK);
2019
2020                 matched = ldlm_handle2lock(&lockh);
2021                 if (agl) {
2022                         /* AGL enqueues DLM locks speculatively. Therefore if
2023                          * it already exists a DLM lock, it wll just inform the
2024                          * caller to cancel the AGL process for this stripe. */
2025                         ldlm_lock_decref(&lockh, mode);
2026                         LDLM_LOCK_PUT(matched);
2027                         RETURN(-ECANCELED);
2028                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2029                         *flags |= LDLM_FL_LVB_READY;
2030
2031                         /* We already have a lock, and it's referenced. */
2032                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2033
2034                         ldlm_lock_decref(&lockh, mode);
2035                         LDLM_LOCK_PUT(matched);
2036                         RETURN(ELDLM_OK);
2037                 } else {
2038                         ldlm_lock_decref(&lockh, mode);
2039                         LDLM_LOCK_PUT(matched);
2040                 }
2041         }
2042
2043 no_match:
2044         if (*flags & LDLM_FL_TEST_LOCK)
2045                 RETURN(-ENOLCK);
2046
2047         if (intent) {
2048                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2049                                            &RQF_LDLM_ENQUEUE_LVB);
2050                 if (req == NULL)
2051                         RETURN(-ENOMEM);
2052
2053                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2054                 if (rc) {
2055                         ptlrpc_request_free(req);
2056                         RETURN(rc);
2057                 }
2058
2059                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2060                                      sizeof *lvb);
2061                 ptlrpc_request_set_replen(req);
2062         }
2063
2064         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2065         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2066
2067         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2068                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2069         if (async) {
2070                 if (!rc) {
2071                         struct osc_enqueue_args *aa;
2072                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2073                         aa = ptlrpc_req_async_args(req);
2074                         aa->oa_exp    = exp;
2075                         aa->oa_mode   = einfo->ei_mode;
2076                         aa->oa_type   = einfo->ei_type;
2077                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2078                         aa->oa_upcall = upcall;
2079                         aa->oa_cookie = cookie;
2080                         aa->oa_agl    = !!agl;
2081                         if (!agl) {
2082                                 aa->oa_flags  = flags;
2083                                 aa->oa_lvb    = lvb;
2084                         } else {
2085                                 /* AGL is essentially to enqueue an DLM lock
2086                                  * in advance, so we don't care about the
2087                                  * result of AGL enqueue. */
2088                                 aa->oa_lvb    = NULL;
2089                                 aa->oa_flags  = NULL;
2090                         }
2091
2092                         req->rq_interpret_reply =
2093                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2094                         if (rqset == PTLRPCD_SET)
2095                                 ptlrpcd_add_req(req);
2096                         else
2097                                 ptlrpc_set_add_req(rqset, req);
2098                 } else if (intent) {
2099                         ptlrpc_req_finished(req);
2100                 }
2101                 RETURN(rc);
2102         }
2103
2104         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2105                               flags, agl, rc);
2106         if (intent)
2107                 ptlrpc_req_finished(req);
2108
2109         RETURN(rc);
2110 }
2111
2112 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2113                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2114                    __u64 *flags, void *data, struct lustre_handle *lockh,
2115                    int unref)
2116 {
2117         struct obd_device *obd = exp->exp_obd;
2118         __u64 lflags = *flags;
2119         ldlm_mode_t rc;
2120         ENTRY;
2121
2122         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2123                 RETURN(-EIO);
2124
2125         /* Filesystem lock extents are extended to page boundaries so that
2126          * dealing with the page cache is a little smoother */
2127         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2128         policy->l_extent.end |= ~PAGE_MASK;
2129
2130         /* Next, search for already existing extent locks that will cover us */
2131         /* If we're trying to read, we also search for an existing PW lock.  The
2132          * VFS and page cache already protect us locally, so lots of readers/
2133          * writers can share a single PW lock. */
2134         rc = mode;
2135         if (mode == LCK_PR)
2136                 rc |= LCK_PW;
2137         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2138                              res_id, type, policy, rc, lockh, unref);
2139         if (rc) {
2140                 if (data != NULL) {
2141                         if (!osc_set_data_with_check(lockh, data)) {
2142                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2143                                         ldlm_lock_decref(lockh, rc);
2144                                 RETURN(0);
2145                         }
2146                 }
2147                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2148                         ldlm_lock_addref(lockh, LCK_PR);
2149                         ldlm_lock_decref(lockh, LCK_PW);
2150                 }
2151                 RETURN(rc);
2152         }
2153         RETURN(rc);
2154 }
2155
2156 static int osc_statfs_interpret(const struct lu_env *env,
2157                                 struct ptlrpc_request *req,
2158                                 struct osc_async_args *aa, int rc)
2159 {
2160         struct obd_statfs *msfs;
2161         ENTRY;
2162
2163         if (rc == -EBADR)
2164                 /* The request has in fact never been sent
2165                  * due to issues at a higher level (LOV).
2166                  * Exit immediately since the caller is
2167                  * aware of the problem and takes care
2168                  * of the clean up */
2169                  RETURN(rc);
2170
2171         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2172             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2173                 GOTO(out, rc = 0);
2174
2175         if (rc != 0)
2176                 GOTO(out, rc);
2177
2178         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2179         if (msfs == NULL) {
2180                 GOTO(out, rc = -EPROTO);
2181         }
2182
2183         *aa->aa_oi->oi_osfs = *msfs;
2184 out:
2185         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2186         RETURN(rc);
2187 }
2188
2189 static int osc_statfs_async(struct obd_export *exp,
2190                             struct obd_info *oinfo, __u64 max_age,
2191                             struct ptlrpc_request_set *rqset)
2192 {
2193         struct obd_device     *obd = class_exp2obd(exp);
2194         struct ptlrpc_request *req;
2195         struct osc_async_args *aa;
2196         int                    rc;
2197         ENTRY;
2198
2199         /* We could possibly pass max_age in the request (as an absolute
2200          * timestamp or a "seconds.usec ago") so the target can avoid doing
2201          * extra calls into the filesystem if that isn't necessary (e.g.
2202          * during mount that would help a bit).  Having relative timestamps
2203          * is not so great if request processing is slow, while absolute
2204          * timestamps are not ideal because they need time synchronization. */
2205         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2206         if (req == NULL)
2207                 RETURN(-ENOMEM);
2208
2209         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2210         if (rc) {
2211                 ptlrpc_request_free(req);
2212                 RETURN(rc);
2213         }
2214         ptlrpc_request_set_replen(req);
2215         req->rq_request_portal = OST_CREATE_PORTAL;
2216         ptlrpc_at_set_req_timeout(req);
2217
2218         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2219                 /* procfs requests not want stat in wait for avoid deadlock */
2220                 req->rq_no_resend = 1;
2221                 req->rq_no_delay = 1;
2222         }
2223
2224         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2225         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2226         aa = ptlrpc_req_async_args(req);
2227         aa->aa_oi = oinfo;
2228
2229         ptlrpc_set_add_req(rqset, req);
2230         RETURN(0);
2231 }
2232
2233 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2234                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2235 {
2236         struct obd_device     *obd = class_exp2obd(exp);
2237         struct obd_statfs     *msfs;
2238         struct ptlrpc_request *req;
2239         struct obd_import     *imp = NULL;
2240         int rc;
2241         ENTRY;
2242
2243         /*Since the request might also come from lprocfs, so we need
2244          *sync this with client_disconnect_export Bug15684*/
2245         down_read(&obd->u.cli.cl_sem);
2246         if (obd->u.cli.cl_import)
2247                 imp = class_import_get(obd->u.cli.cl_import);
2248         up_read(&obd->u.cli.cl_sem);
2249         if (!imp)
2250                 RETURN(-ENODEV);
2251
2252         /* We could possibly pass max_age in the request (as an absolute
2253          * timestamp or a "seconds.usec ago") so the target can avoid doing
2254          * extra calls into the filesystem if that isn't necessary (e.g.
2255          * during mount that would help a bit).  Having relative timestamps
2256          * is not so great if request processing is slow, while absolute
2257          * timestamps are not ideal because they need time synchronization. */
2258         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2259
2260         class_import_put(imp);
2261
2262         if (req == NULL)
2263                 RETURN(-ENOMEM);
2264
2265         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2266         if (rc) {
2267                 ptlrpc_request_free(req);
2268                 RETURN(rc);
2269         }
2270         ptlrpc_request_set_replen(req);
2271         req->rq_request_portal = OST_CREATE_PORTAL;
2272         ptlrpc_at_set_req_timeout(req);
2273
2274         if (flags & OBD_STATFS_NODELAY) {
2275                 /* procfs requests not want stat in wait for avoid deadlock */
2276                 req->rq_no_resend = 1;
2277                 req->rq_no_delay = 1;
2278         }
2279
2280         rc = ptlrpc_queue_wait(req);
2281         if (rc)
2282                 GOTO(out, rc);
2283
2284         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2285         if (msfs == NULL) {
2286                 GOTO(out, rc = -EPROTO);
2287         }
2288
2289         *osfs = *msfs;
2290
2291         EXIT;
2292  out:
2293         ptlrpc_req_finished(req);
2294         return rc;
2295 }
2296
2297 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2298                          void *karg, void *uarg)
2299 {
2300         struct obd_device *obd = exp->exp_obd;
2301         struct obd_ioctl_data *data = karg;
2302         int err = 0;
2303         ENTRY;
2304
2305         if (!try_module_get(THIS_MODULE)) {
2306                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2307                        module_name(THIS_MODULE));
2308                 return -EINVAL;
2309         }
2310         switch (cmd) {
2311         case OBD_IOC_CLIENT_RECOVER:
2312                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2313                                             data->ioc_inlbuf1, 0);
2314                 if (err > 0)
2315                         err = 0;
2316                 GOTO(out, err);
2317         case IOC_OSC_SET_ACTIVE:
2318                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2319                                                data->ioc_offset);
2320                 GOTO(out, err);
2321         case OBD_IOC_PING_TARGET:
2322                 err = ptlrpc_obd_ping(obd);
2323                 GOTO(out, err);
2324         default:
2325                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2326                        cmd, current_comm());
2327                 GOTO(out, err = -ENOTTY);
2328         }
2329 out:
2330         module_put(THIS_MODULE);
2331         return err;
2332 }
2333
2334 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2335                               u32 keylen, void *key,
2336                               u32 vallen, void *val,
2337                               struct ptlrpc_request_set *set)
2338 {
2339         struct ptlrpc_request *req;
2340         struct obd_device     *obd = exp->exp_obd;
2341         struct obd_import     *imp = class_exp2cliimp(exp);
2342         char                  *tmp;
2343         int                    rc;
2344         ENTRY;
2345
2346         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2347
2348         if (KEY_IS(KEY_CHECKSUM)) {
2349                 if (vallen != sizeof(int))
2350                         RETURN(-EINVAL);
2351                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2352                 RETURN(0);
2353         }
2354
2355         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2356                 sptlrpc_conf_client_adapt(obd);
2357                 RETURN(0);
2358         }
2359
2360         if (KEY_IS(KEY_FLUSH_CTX)) {
2361                 sptlrpc_import_flush_my_ctx(imp);
2362                 RETURN(0);
2363         }
2364
2365         if (KEY_IS(KEY_CACHE_SET)) {
2366                 struct client_obd *cli = &obd->u.cli;
2367
2368                 LASSERT(cli->cl_cache == NULL); /* only once */
2369                 cli->cl_cache = (struct cl_client_cache *)val;
2370                 cl_cache_incref(cli->cl_cache);
2371                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2372
2373                 /* add this osc into entity list */
2374                 LASSERT(list_empty(&cli->cl_lru_osc));
2375                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2376                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2377                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2378
2379                 RETURN(0);
2380         }
2381
2382         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2383                 struct client_obd *cli = &obd->u.cli;
2384                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2385                 long target = *(long *)val;
2386
2387                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2388                 *(long *)val -= nr;
2389                 RETURN(0);
2390         }
2391
2392         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2393                 RETURN(-EINVAL);
2394
2395         /* We pass all other commands directly to OST. Since nobody calls osc
2396            methods directly and everybody is supposed to go through LOV, we
2397            assume lov checked invalid values for us.
2398            The only recognised values so far are evict_by_nid and mds_conn.
2399            Even if something bad goes through, we'd get a -EINVAL from OST
2400            anyway. */
2401
2402         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2403                                                 &RQF_OST_SET_GRANT_INFO :
2404                                                 &RQF_OBD_SET_INFO);
2405         if (req == NULL)
2406                 RETURN(-ENOMEM);
2407
2408         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2409                              RCL_CLIENT, keylen);
2410         if (!KEY_IS(KEY_GRANT_SHRINK))
2411                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2412                                      RCL_CLIENT, vallen);
2413         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2414         if (rc) {
2415                 ptlrpc_request_free(req);
2416                 RETURN(rc);
2417         }
2418
2419         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2420         memcpy(tmp, key, keylen);
2421         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2422                                                         &RMF_OST_BODY :
2423                                                         &RMF_SETINFO_VAL);
2424         memcpy(tmp, val, vallen);
2425
2426         if (KEY_IS(KEY_GRANT_SHRINK)) {
2427                 struct osc_grant_args *aa;
2428                 struct obdo *oa;
2429
2430                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2431                 aa = ptlrpc_req_async_args(req);
2432                 OBDO_ALLOC(oa);
2433                 if (!oa) {
2434                         ptlrpc_req_finished(req);
2435                         RETURN(-ENOMEM);
2436                 }
2437                 *oa = ((struct ost_body *)val)->oa;
2438                 aa->aa_oa = oa;
2439                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2440         }
2441
2442         ptlrpc_request_set_replen(req);
2443         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2444                 LASSERT(set != NULL);
2445                 ptlrpc_set_add_req(set, req);
2446                 ptlrpc_check_set(NULL, set);
2447         } else {
2448                 ptlrpcd_add_req(req);
2449         }
2450
2451         RETURN(0);
2452 }
2453
2454 static int osc_reconnect(const struct lu_env *env,
2455                          struct obd_export *exp, struct obd_device *obd,
2456                          struct obd_uuid *cluuid,
2457                          struct obd_connect_data *data,
2458                          void *localdata)
2459 {
2460         struct client_obd *cli = &obd->u.cli;
2461
2462         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2463                 long lost_grant;
2464
2465                 spin_lock(&cli->cl_loi_list_lock);
2466                 data->ocd_grant = (cli->cl_avail_grant +
2467                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2468                                   2 * cli_brw_size(obd);
2469                 lost_grant = cli->cl_lost_grant;
2470                 cli->cl_lost_grant = 0;
2471                 spin_unlock(&cli->cl_loi_list_lock);
2472
2473                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2474                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2475                        data->ocd_version, data->ocd_grant, lost_grant);
2476         }
2477
2478         RETURN(0);
2479 }
2480
2481 static int osc_disconnect(struct obd_export *exp)
2482 {
2483         struct obd_device *obd = class_exp2obd(exp);
2484         int rc;
2485
2486         rc = client_disconnect_export(exp);
2487         /**
2488          * Initially we put del_shrink_grant before disconnect_export, but it
2489          * causes the following problem if setup (connect) and cleanup
2490          * (disconnect) are tangled together.
2491          *      connect p1                     disconnect p2
2492          *   ptlrpc_connect_import
2493          *     ...............               class_manual_cleanup
2494          *                                     osc_disconnect
2495          *                                     del_shrink_grant
2496          *   ptlrpc_connect_interrupt
2497          *     init_grant_shrink
2498          *   add this client to shrink list
2499          *                                      cleanup_osc
2500          * Bang! pinger trigger the shrink.
2501          * So the osc should be disconnected from the shrink list, after we
2502          * are sure the import has been destroyed. BUG18662
2503          */
2504         if (obd->u.cli.cl_import == NULL)
2505                 osc_del_shrink_grant(&obd->u.cli);
2506         return rc;
2507 }
2508
2509 static int osc_import_event(struct obd_device *obd,
2510                             struct obd_import *imp,
2511                             enum obd_import_event event)
2512 {
2513         struct client_obd *cli;
2514         int rc = 0;
2515
2516         ENTRY;
2517         LASSERT(imp->imp_obd == obd);
2518
2519         switch (event) {
2520         case IMP_EVENT_DISCON: {
2521                 cli = &obd->u.cli;
2522                 spin_lock(&cli->cl_loi_list_lock);
2523                 cli->cl_avail_grant = 0;
2524                 cli->cl_lost_grant = 0;
2525                 spin_unlock(&cli->cl_loi_list_lock);
2526                 break;
2527         }
2528         case IMP_EVENT_INACTIVE: {
2529                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2530                 break;
2531         }
2532         case IMP_EVENT_INVALIDATE: {
2533                 struct ldlm_namespace *ns = obd->obd_namespace;
2534                 struct lu_env         *env;
2535                 int                    refcheck;
2536
2537                 env = cl_env_get(&refcheck);
2538                 if (!IS_ERR(env)) {
2539                         /* Reset grants */
2540                         cli = &obd->u.cli;
2541                         /* all pages go to failing rpcs due to the invalid
2542                          * import */
2543                         osc_io_unplug(env, cli, NULL);
2544
2545                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2546                         cl_env_put(env, &refcheck);
2547                 } else
2548                         rc = PTR_ERR(env);
2549                 break;
2550         }
2551         case IMP_EVENT_ACTIVE: {
2552                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2553                 break;
2554         }
2555         case IMP_EVENT_OCD: {
2556                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2557
2558                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2559                         osc_init_grant(&obd->u.cli, ocd);
2560
2561                 /* See bug 7198 */
2562                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2563                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2564
2565                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2566                 break;
2567         }
2568         case IMP_EVENT_DEACTIVATE: {
2569                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2570                 break;
2571         }
2572         case IMP_EVENT_ACTIVATE: {
2573                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2574                 break;
2575         }
2576         default:
2577                 CERROR("Unknown import event %d\n", event);
2578                 LBUG();
2579         }
2580         RETURN(rc);
2581 }
2582
2583 /**
2584  * Determine whether the lock can be canceled before replaying the lock
2585  * during recovery, see bug16774 for detailed information.
2586  *
2587  * \retval zero the lock can't be canceled
2588  * \retval other ok to cancel
2589  */
2590 static int osc_cancel_weight(struct ldlm_lock *lock)
2591 {
2592         /*
2593          * Cancel all unused and granted extent lock.
2594          */
2595         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2596             lock->l_granted_mode == lock->l_req_mode &&
2597             osc_ldlm_weigh_ast(lock) == 0)
2598                 RETURN(1);
2599
2600         RETURN(0);
2601 }
2602
2603 static int brw_queue_work(const struct lu_env *env, void *data)
2604 {
2605         struct client_obd *cli = data;
2606
2607         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2608
2609         osc_io_unplug(env, cli, NULL);
2610         RETURN(0);
2611 }
2612
2613 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2614 {
2615         struct client_obd *cli = &obd->u.cli;
2616         struct obd_type   *type;
2617         void              *handler;
2618         int                rc;
2619         ENTRY;
2620
2621         rc = ptlrpcd_addref();
2622         if (rc)
2623                 RETURN(rc);
2624
2625         rc = client_obd_setup(obd, lcfg);
2626         if (rc)
2627                 GOTO(out_ptlrpcd, rc);
2628
2629         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2630         if (IS_ERR(handler))
2631                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2632         cli->cl_writeback_work = handler;
2633
2634         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2635         if (IS_ERR(handler))
2636                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2637         cli->cl_lru_work = handler;
2638
2639         rc = osc_quota_setup(obd);
2640         if (rc)
2641                 GOTO(out_ptlrpcd_work, rc);
2642
2643         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2644
2645 #ifdef CONFIG_PROC_FS
2646         obd->obd_vars = lprocfs_osc_obd_vars;
2647 #endif
2648         /* If this is true then both client (osc) and server (osp) are on the
2649          * same node. The osp layer if loaded first will register the osc proc
2650          * directory. In that case this obd_device will be attached its proc
2651          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2652         type = class_search_type(LUSTRE_OSP_NAME);
2653         if (type && type->typ_procsym) {
2654                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2655                                                        type->typ_procsym,
2656                                                        obd->obd_vars, obd);
2657                 if (IS_ERR(obd->obd_proc_entry)) {
2658                         rc = PTR_ERR(obd->obd_proc_entry);
2659                         CERROR("error %d setting up lprocfs for %s\n", rc,
2660                                obd->obd_name);
2661                         obd->obd_proc_entry = NULL;
2662                 }
2663         } else {
2664                 rc = lprocfs_obd_setup(obd);
2665         }
2666
2667         /* If the basic OSC proc tree construction succeeded then
2668          * lets do the rest. */
2669         if (rc == 0) {
2670                 lproc_osc_attach_seqstat(obd);
2671                 sptlrpc_lprocfs_cliobd_attach(obd);
2672                 ptlrpc_lprocfs_register_obd(obd);
2673         }
2674
2675         /* We need to allocate a few requests more, because
2676          * brw_interpret tries to create new requests before freeing
2677          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2678          * reserved, but I'm afraid that might be too much wasted RAM
2679          * in fact, so 2 is just my guess and still should work. */
2680         cli->cl_import->imp_rq_pool =
2681                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2682                                     OST_MAXREQSIZE,
2683                                     ptlrpc_add_rqs_to_pool);
2684
2685         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2686         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2687         RETURN(0);
2688
2689 out_ptlrpcd_work:
2690         if (cli->cl_writeback_work != NULL) {
2691                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2692                 cli->cl_writeback_work = NULL;
2693         }
2694         if (cli->cl_lru_work != NULL) {
2695                 ptlrpcd_destroy_work(cli->cl_lru_work);
2696                 cli->cl_lru_work = NULL;
2697         }
2698 out_client_setup:
2699         client_obd_cleanup(obd);
2700 out_ptlrpcd:
2701         ptlrpcd_decref();
2702         RETURN(rc);
2703 }
2704
2705 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2706 {
2707         int rc = 0;
2708         ENTRY;
2709
2710         switch (stage) {
2711         case OBD_CLEANUP_EARLY: {
2712                 struct obd_import *imp;
2713                 imp = obd->u.cli.cl_import;
2714                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2715                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2716                 ptlrpc_deactivate_import(imp);
2717                 spin_lock(&imp->imp_lock);
2718                 imp->imp_pingable = 0;
2719                 spin_unlock(&imp->imp_lock);
2720                 break;
2721         }
2722         case OBD_CLEANUP_EXPORTS: {
2723                 struct client_obd *cli = &obd->u.cli;
2724                 /* LU-464
2725                  * for echo client, export may be on zombie list, wait for
2726                  * zombie thread to cull it, because cli.cl_import will be
2727                  * cleared in client_disconnect_export():
2728                  *   class_export_destroy() -> obd_cleanup() ->
2729                  *   echo_device_free() -> echo_client_cleanup() ->
2730                  *   obd_disconnect() -> osc_disconnect() ->
2731                  *   client_disconnect_export()
2732                  */
2733                 obd_zombie_barrier();
2734                 if (cli->cl_writeback_work) {
2735                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2736                         cli->cl_writeback_work = NULL;
2737                 }
2738                 if (cli->cl_lru_work) {
2739                         ptlrpcd_destroy_work(cli->cl_lru_work);
2740                         cli->cl_lru_work = NULL;
2741                 }
2742                 obd_cleanup_client_import(obd);
2743                 ptlrpc_lprocfs_unregister_obd(obd);
2744                 lprocfs_obd_cleanup(obd);
2745                 break;
2746                 }
2747         }
2748         RETURN(rc);
2749 }
2750
2751 int osc_cleanup(struct obd_device *obd)
2752 {
2753         struct client_obd *cli = &obd->u.cli;
2754         int rc;
2755
2756         ENTRY;
2757
2758         /* lru cleanup */
2759         if (cli->cl_cache != NULL) {
2760                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2761                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2762                 list_del_init(&cli->cl_lru_osc);
2763                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2764                 cli->cl_lru_left = NULL;
2765                 cl_cache_decref(cli->cl_cache);
2766                 cli->cl_cache = NULL;
2767         }
2768
2769         /* free memory of osc quota cache */
2770         osc_quota_cleanup(obd);
2771
2772         rc = client_obd_cleanup(obd);
2773
2774         ptlrpcd_decref();
2775         RETURN(rc);
2776 }
2777
2778 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2779 {
2780         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2781         return rc > 0 ? 0: rc;
2782 }
2783
2784 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2785 {
2786         return osc_process_config_base(obd, buf);
2787 }
2788
2789 static struct obd_ops osc_obd_ops = {
2790         .o_owner                = THIS_MODULE,
2791         .o_setup                = osc_setup,
2792         .o_precleanup           = osc_precleanup,
2793         .o_cleanup              = osc_cleanup,
2794         .o_add_conn             = client_import_add_conn,
2795         .o_del_conn             = client_import_del_conn,
2796         .o_connect              = client_connect_import,
2797         .o_reconnect            = osc_reconnect,
2798         .o_disconnect           = osc_disconnect,
2799         .o_statfs               = osc_statfs,
2800         .o_statfs_async         = osc_statfs_async,
2801         .o_create               = osc_create,
2802         .o_destroy              = osc_destroy,
2803         .o_getattr              = osc_getattr,
2804         .o_setattr              = osc_setattr,
2805         .o_iocontrol            = osc_iocontrol,
2806         .o_set_info_async       = osc_set_info_async,
2807         .o_import_event         = osc_import_event,
2808         .o_process_config       = osc_process_config,
2809         .o_quotactl             = osc_quotactl,
2810 };
2811
2812 static int __init osc_init(void)
2813 {
2814         bool enable_proc = true;
2815         struct obd_type *type;
2816         int rc;
2817         ENTRY;
2818
2819         /* print an address of _any_ initialized kernel symbol from this
2820          * module, to allow debugging with gdb that doesn't support data
2821          * symbols from modules.*/
2822         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2823
2824         rc = lu_kmem_init(osc_caches);
2825         if (rc)
2826                 RETURN(rc);
2827
2828         type = class_search_type(LUSTRE_OSP_NAME);
2829         if (type != NULL && type->typ_procsym != NULL)
2830                 enable_proc = false;
2831
2832         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2833                                  LUSTRE_OSC_NAME, &osc_device_type);
2834         if (rc) {
2835                 lu_kmem_fini(osc_caches);
2836                 RETURN(rc);
2837         }
2838
2839         RETURN(rc);
2840 }
2841
2842 static void /*__exit*/ osc_exit(void)
2843 {
2844         class_unregister_type(LUSTRE_OSC_NAME);
2845         lu_kmem_fini(osc_caches);
2846 }
2847
2848 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2849 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2850 MODULE_VERSION(LUSTRE_VERSION_STRING);
2851 MODULE_LICENSE("GPL");
2852
2853 module_init(osc_init);
2854 module_exit(osc_exit);