Whamcloud - gitweb
LU-5835 ptlrpc: Introduce iovec to bulk descriptor
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct cl_req            *aa_clerq;
66 };
67
68 #define osc_grant_args osc_brw_async_args
69
70 struct osc_setattr_args {
71         struct obdo             *sa_oa;
72         obd_enqueue_update_f     sa_upcall;
73         void                    *sa_cookie;
74 };
75
76 struct osc_fsync_args {
77         struct obdo             *fa_oa;
78         obd_enqueue_update_f     fa_upcall;
79         void                    *fa_cookie;
80 };
81
82 struct osc_enqueue_args {
83         struct obd_export       *oa_exp;
84         ldlm_type_t             oa_type;
85         ldlm_mode_t             oa_mode;
86         __u64                   *oa_flags;
87         osc_enqueue_upcall_f    oa_upcall;
88         void                    *oa_cookie;
89         struct ost_lvb          *oa_lvb;
90         struct lustre_handle    oa_lockh;
91         unsigned int            oa_agl:1;
92 };
93
94 static void osc_release_ppga(struct brw_page **ppga, size_t count);
95 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
96                          void *data, int rc);
97
98 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
99 {
100         struct ost_body *body;
101
102         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
103         LASSERT(body);
104
105         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
106 }
107
108 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
109                        struct obdo *oa)
110 {
111         struct ptlrpc_request   *req;
112         struct ost_body         *body;
113         int                      rc;
114
115         ENTRY;
116         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
117         if (req == NULL)
118                 RETURN(-ENOMEM);
119
120         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
121         if (rc) {
122                 ptlrpc_request_free(req);
123                 RETURN(rc);
124         }
125
126         osc_pack_req_body(req, oa);
127
128         ptlrpc_request_set_replen(req);
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc)
132                 GOTO(out, rc);
133
134         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
135         if (body == NULL)
136                 GOTO(out, rc = -EPROTO);
137
138         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
140
141         oa->o_blksize = cli_brw_size(exp->exp_obd);
142         oa->o_valid |= OBD_MD_FLBLKSZ;
143
144         EXIT;
145 out:
146         ptlrpc_req_finished(req);
147
148         return rc;
149 }
150
151 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
152                        struct obdo *oa)
153 {
154         struct ptlrpc_request   *req;
155         struct ost_body         *body;
156         int                      rc;
157
158         ENTRY;
159         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
160
161         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
162         if (req == NULL)
163                 RETURN(-ENOMEM);
164
165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
166         if (rc) {
167                 ptlrpc_request_free(req);
168                 RETURN(rc);
169         }
170
171         osc_pack_req_body(req, oa);
172
173         ptlrpc_request_set_replen(req);
174
175         rc = ptlrpc_queue_wait(req);
176         if (rc)
177                 GOTO(out, rc);
178
179         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
180         if (body == NULL)
181                 GOTO(out, rc = -EPROTO);
182
183         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
184
185         EXIT;
186 out:
187         ptlrpc_req_finished(req);
188
189         RETURN(rc);
190 }
191
192 static int osc_setattr_interpret(const struct lu_env *env,
193                                  struct ptlrpc_request *req,
194                                  struct osc_setattr_args *sa, int rc)
195 {
196         struct ost_body *body;
197         ENTRY;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body == NULL)
204                 GOTO(out, rc = -EPROTO);
205
206         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
207                              &body->oa);
208 out:
209         rc = sa->sa_upcall(sa->sa_cookie, rc);
210         RETURN(rc);
211 }
212
213 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
214                       obd_enqueue_update_f upcall, void *cookie,
215                       struct ptlrpc_request_set *rqset)
216 {
217         struct ptlrpc_request   *req;
218         struct osc_setattr_args *sa;
219         int                      rc;
220
221         ENTRY;
222
223         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
224         if (req == NULL)
225                 RETURN(-ENOMEM);
226
227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228         if (rc) {
229                 ptlrpc_request_free(req);
230                 RETURN(rc);
231         }
232
233         osc_pack_req_body(req, oa);
234
235         ptlrpc_request_set_replen(req);
236
237         /* do mds to ost setattr asynchronously */
238         if (!rqset) {
239                 /* Do not wait for response. */
240                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
241         } else {
242                 req->rq_interpret_reply =
243                         (ptlrpc_interpterer_t)osc_setattr_interpret;
244
245                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
246                 sa = ptlrpc_req_async_args(req);
247                 sa->sa_oa = oa;
248                 sa->sa_upcall = upcall;
249                 sa->sa_cookie = cookie;
250
251                 if (rqset == PTLRPCD_SET)
252                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
253                 else
254                         ptlrpc_set_add_req(rqset, req);
255         }
256
257         RETURN(0);
258 }
259
260 static int osc_create(const struct lu_env *env, struct obd_export *exp,
261                       struct obdo *oa)
262 {
263         struct ptlrpc_request *req;
264         struct ost_body       *body;
265         int                    rc;
266         ENTRY;
267
268         LASSERT(oa != NULL);
269         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
270         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
273         if (req == NULL)
274                 GOTO(out, rc = -ENOMEM);
275
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 GOTO(out, rc);
280         }
281
282         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
283         LASSERT(body);
284
285         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out_req, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out_req, rc = -EPROTO);
296
297         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
299
300         oa->o_blksize = cli_brw_size(exp->exp_obd);
301         oa->o_valid |= OBD_MD_FLBLKSZ;
302
303         CDEBUG(D_HA, "transno: "LPD64"\n",
304                lustre_msg_get_transno(req->rq_repmsg));
305 out_req:
306         ptlrpc_req_finished(req);
307 out:
308         RETURN(rc);
309 }
310
311 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
312                    obd_enqueue_update_f upcall, void *cookie,
313                    struct ptlrpc_request_set *rqset)
314 {
315         struct ptlrpc_request   *req;
316         struct osc_setattr_args *sa;
317         struct ost_body         *body;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
331         ptlrpc_at_set_req_timeout(req);
332
333         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334         LASSERT(body);
335         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
336
337         ptlrpc_request_set_replen(req);
338
339         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
340         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
341         sa = ptlrpc_req_async_args(req);
342         sa->sa_oa = oa;
343         sa->sa_upcall = upcall;
344         sa->sa_cookie = cookie;
345         if (rqset == PTLRPCD_SET)
346                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
347         else
348                 ptlrpc_set_add_req(rqset, req);
349
350         RETURN(0);
351 }
352
353 static int osc_sync_interpret(const struct lu_env *env,
354                               struct ptlrpc_request *req,
355                               void *arg, int rc)
356 {
357         struct osc_fsync_args *fa = arg;
358         struct ost_body *body;
359         ENTRY;
360
361         if (rc)
362                 GOTO(out, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL) {
366                 CERROR ("can't unpack ost_body\n");
367                 GOTO(out, rc = -EPROTO);
368         }
369
370         *fa->fa_oa = body->oa;
371 out:
372         rc = fa->fa_upcall(fa->fa_cookie, rc);
373         RETURN(rc);
374 }
375
376 int osc_sync_base(struct obd_export *exp, struct obdo *oa,
377                   obd_enqueue_update_f upcall, void *cookie,
378                   struct ptlrpc_request_set *rqset)
379 {
380         struct ptlrpc_request *req;
381         struct ost_body       *body;
382         struct osc_fsync_args *fa;
383         int                    rc;
384         ENTRY;
385
386         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
387         if (req == NULL)
388                 RETURN(-ENOMEM);
389
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 RETURN(rc);
394         }
395
396         /* overload the size and blocks fields in the oa with start/end */
397         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
398         LASSERT(body);
399         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
400
401         ptlrpc_request_set_replen(req);
402         req->rq_interpret_reply = osc_sync_interpret;
403
404         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
405         fa = ptlrpc_req_async_args(req);
406         fa->fa_oa = oa;
407         fa->fa_upcall = upcall;
408         fa->fa_cookie = cookie;
409
410         if (rqset == PTLRPCD_SET)
411                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412         else
413                 ptlrpc_set_add_req(rqset, req);
414
415         RETURN (0);
416 }
417
418 /* Find and cancel locally locks matched by @mode in the resource found by
419  * @objid. Found locks are added into @cancel list. Returns the amount of
420  * locks added to @cancels list. */
421 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
422                                    struct list_head *cancels,
423                                    ldlm_mode_t mode, __u64 lock_flags)
424 {
425         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
426         struct ldlm_res_id res_id;
427         struct ldlm_resource *res;
428         int count;
429         ENTRY;
430
431         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
432          * export) but disabled through procfs (flag in NS).
433          *
434          * This distinguishes from a case when ELC is not supported originally,
435          * when we still want to cancel locks in advance and just cancel them
436          * locally, without sending any RPC. */
437         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
438                 RETURN(0);
439
440         ostid_build_res_name(&oa->o_oi, &res_id);
441         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
442         if (IS_ERR(res))
443                 RETURN(0);
444
445         LDLM_RESOURCE_ADDREF(res);
446         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
447                                            lock_flags, 0, NULL);
448         LDLM_RESOURCE_DELREF(res);
449         ldlm_resource_putref(res);
450         RETURN(count);
451 }
452
453 static int osc_destroy_interpret(const struct lu_env *env,
454                                  struct ptlrpc_request *req, void *data,
455                                  int rc)
456 {
457         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
458
459         atomic_dec(&cli->cl_destroy_in_flight);
460         wake_up(&cli->cl_destroy_waitq);
461         return 0;
462 }
463
464 static int osc_can_send_destroy(struct client_obd *cli)
465 {
466         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
467             cli->cl_max_rpcs_in_flight) {
468                 /* The destroy request can be sent */
469                 return 1;
470         }
471         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
472             cli->cl_max_rpcs_in_flight) {
473                 /*
474                  * The counter has been modified between the two atomic
475                  * operations.
476                  */
477                 wake_up(&cli->cl_destroy_waitq);
478         }
479         return 0;
480 }
481
482 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
483                        struct obdo *oa)
484 {
485         struct client_obd     *cli = &exp->exp_obd->u.cli;
486         struct ptlrpc_request *req;
487         struct ost_body       *body;
488         struct list_head       cancels = LIST_HEAD_INIT(cancels);
489         int rc, count;
490         ENTRY;
491
492         if (!oa) {
493                 CDEBUG(D_INFO, "oa NULL\n");
494                 RETURN(-EINVAL);
495         }
496
497         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
498                                         LDLM_FL_DISCARD_DATA);
499
500         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
501         if (req == NULL) {
502                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
503                 RETURN(-ENOMEM);
504         }
505
506         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
507                                0, &cancels, count);
508         if (rc) {
509                 ptlrpc_request_free(req);
510                 RETURN(rc);
511         }
512
513         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
514         ptlrpc_at_set_req_timeout(req);
515
516         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
517         LASSERT(body);
518         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
519
520         ptlrpc_request_set_replen(req);
521
522         req->rq_interpret_reply = osc_destroy_interpret;
523         if (!osc_can_send_destroy(cli)) {
524                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
525
526                 /*
527                  * Wait until the number of on-going destroy RPCs drops
528                  * under max_rpc_in_flight
529                  */
530                 l_wait_event_exclusive(cli->cl_destroy_waitq,
531                                        osc_can_send_destroy(cli), &lwi);
532         }
533
534         /* Do not wait for response */
535         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
536         RETURN(0);
537 }
538
539 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
540                                 long writing_bytes)
541 {
542         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
543
544         LASSERT(!(oa->o_valid & bits));
545
546         oa->o_valid |= bits;
547         spin_lock(&cli->cl_loi_list_lock);
548         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
549         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
550                      cli->cl_dirty_max_pages)) {
551                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
552                        cli->cl_dirty_pages, cli->cl_dirty_transit,
553                        cli->cl_dirty_max_pages);
554                 oa->o_undirty = 0;
555         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
556                             atomic_long_read(&obd_dirty_transit_pages) >
557                             (obd_max_dirty_pages + 1))) {
558                 /* The atomic_read() allowing the atomic_inc() are
559                  * not covered by a lock thus they may safely race and trip
560                  * this CERROR() unless we add in a small fudge factor (+1). */
561                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
562                        cli->cl_import->imp_obd->obd_name,
563                        atomic_long_read(&obd_dirty_pages),
564                        atomic_long_read(&obd_dirty_transit_pages),
565                        obd_max_dirty_pages);
566                 oa->o_undirty = 0;
567         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
568                             0x7fffffff)) {
569                 CERROR("dirty %lu - dirty_max %lu too big???\n",
570                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
571                 oa->o_undirty = 0;
572         } else {
573                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
574                                       PAGE_CACHE_SHIFT) *
575                                      (cli->cl_max_rpcs_in_flight + 1);
576                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
577                                     max_in_flight);
578         }
579         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
580         oa->o_dropped = cli->cl_lost_grant;
581         cli->cl_lost_grant = 0;
582         spin_unlock(&cli->cl_loi_list_lock);
583         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
584                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
585
586 }
587
588 void osc_update_next_shrink(struct client_obd *cli)
589 {
590         cli->cl_next_shrink_grant =
591                 cfs_time_shift(cli->cl_grant_shrink_interval);
592         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
593                cli->cl_next_shrink_grant);
594 }
595
596 static void __osc_update_grant(struct client_obd *cli, u64 grant)
597 {
598         spin_lock(&cli->cl_loi_list_lock);
599         cli->cl_avail_grant += grant;
600         spin_unlock(&cli->cl_loi_list_lock);
601 }
602
603 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
604 {
605         if (body->oa.o_valid & OBD_MD_FLGRANT) {
606                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
607                 __osc_update_grant(cli, body->oa.o_grant);
608         }
609 }
610
611 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
612                               u32 keylen, void *key,
613                               u32 vallen, void *val,
614                               struct ptlrpc_request_set *set);
615
616 static int osc_shrink_grant_interpret(const struct lu_env *env,
617                                       struct ptlrpc_request *req,
618                                       void *aa, int rc)
619 {
620         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
621         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
622         struct ost_body *body;
623
624         if (rc != 0) {
625                 __osc_update_grant(cli, oa->o_grant);
626                 GOTO(out, rc);
627         }
628
629         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
630         LASSERT(body);
631         osc_update_grant(cli, body);
632 out:
633         OBDO_FREE(oa);
634         return rc;
635 }
636
637 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
638 {
639         spin_lock(&cli->cl_loi_list_lock);
640         oa->o_grant = cli->cl_avail_grant / 4;
641         cli->cl_avail_grant -= oa->o_grant;
642         spin_unlock(&cli->cl_loi_list_lock);
643         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
644                 oa->o_valid |= OBD_MD_FLFLAGS;
645                 oa->o_flags = 0;
646         }
647         oa->o_flags |= OBD_FL_SHRINK_GRANT;
648         osc_update_next_shrink(cli);
649 }
650
651 /* Shrink the current grant, either from some large amount to enough for a
652  * full set of in-flight RPCs, or if we have already shrunk to that limit
653  * then to enough for a single RPC.  This avoids keeping more grant than
654  * needed, and avoids shrinking the grant piecemeal. */
655 static int osc_shrink_grant(struct client_obd *cli)
656 {
657         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
658                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
659
660         spin_lock(&cli->cl_loi_list_lock);
661         if (cli->cl_avail_grant <= target_bytes)
662                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
663         spin_unlock(&cli->cl_loi_list_lock);
664
665         return osc_shrink_grant_to_target(cli, target_bytes);
666 }
667
668 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
669 {
670         int                     rc = 0;
671         struct ost_body        *body;
672         ENTRY;
673
674         spin_lock(&cli->cl_loi_list_lock);
675         /* Don't shrink if we are already above or below the desired limit
676          * We don't want to shrink below a single RPC, as that will negatively
677          * impact block allocation and long-term performance. */
678         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
679                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
680
681         if (target_bytes >= cli->cl_avail_grant) {
682                 spin_unlock(&cli->cl_loi_list_lock);
683                 RETURN(0);
684         }
685         spin_unlock(&cli->cl_loi_list_lock);
686
687         OBD_ALLOC_PTR(body);
688         if (!body)
689                 RETURN(-ENOMEM);
690
691         osc_announce_cached(cli, &body->oa, 0);
692
693         spin_lock(&cli->cl_loi_list_lock);
694         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
695         cli->cl_avail_grant = target_bytes;
696         spin_unlock(&cli->cl_loi_list_lock);
697         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
698                 body->oa.o_valid |= OBD_MD_FLFLAGS;
699                 body->oa.o_flags = 0;
700         }
701         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
702         osc_update_next_shrink(cli);
703
704         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
705                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
706                                 sizeof(*body), body, NULL);
707         if (rc != 0)
708                 __osc_update_grant(cli, body->oa.o_grant);
709         OBD_FREE_PTR(body);
710         RETURN(rc);
711 }
712
713 static int osc_should_shrink_grant(struct client_obd *client)
714 {
715         cfs_time_t time = cfs_time_current();
716         cfs_time_t next_shrink = client->cl_next_shrink_grant;
717
718         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
719              OBD_CONNECT_GRANT_SHRINK) == 0)
720                 return 0;
721
722         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
723                 /* Get the current RPC size directly, instead of going via:
724                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
725                  * Keep comment here so that it can be found by searching. */
726                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
727
728                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
729                     client->cl_avail_grant > brw_size)
730                         return 1;
731                 else
732                         osc_update_next_shrink(client);
733         }
734         return 0;
735 }
736
737 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
738 {
739         struct client_obd *client;
740
741         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
742                 if (osc_should_shrink_grant(client))
743                         osc_shrink_grant(client);
744         }
745         return 0;
746 }
747
748 static int osc_add_shrink_grant(struct client_obd *client)
749 {
750         int rc;
751
752         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
753                                        TIMEOUT_GRANT,
754                                        osc_grant_shrink_grant_cb, NULL,
755                                        &client->cl_grant_shrink_list);
756         if (rc) {
757                 CERROR("add grant client %s error %d\n",
758                         client->cl_import->imp_obd->obd_name, rc);
759                 return rc;
760         }
761         CDEBUG(D_CACHE, "add grant client %s \n",
762                client->cl_import->imp_obd->obd_name);
763         osc_update_next_shrink(client);
764         return 0;
765 }
766
767 static int osc_del_shrink_grant(struct client_obd *client)
768 {
769         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
770                                          TIMEOUT_GRANT);
771 }
772
773 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
774 {
775         /*
776          * ocd_grant is the total grant amount we're expect to hold: if we've
777          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
778          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
779          * dirty.
780          *
781          * race is tolerable here: if we're evicted, but imp_state already
782          * left EVICTED state, then cl_dirty_pages must be 0 already.
783          */
784         spin_lock(&cli->cl_loi_list_lock);
785         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
786                 cli->cl_avail_grant = ocd->ocd_grant;
787         else
788                 cli->cl_avail_grant = ocd->ocd_grant -
789                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
790
791         if (cli->cl_avail_grant < 0) {
792                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
793                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
794                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
795                 /* workaround for servers which do not have the patch from
796                  * LU-2679 */
797                 cli->cl_avail_grant = ocd->ocd_grant;
798         }
799
800         /* determine the appropriate chunk size used by osc_extent. */
801         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
802         spin_unlock(&cli->cl_loi_list_lock);
803
804         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
805                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
806                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
807
808         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
809             list_empty(&cli->cl_grant_shrink_list))
810                 osc_add_shrink_grant(cli);
811 }
812
813 /* We assume that the reason this OSC got a short read is because it read
814  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
815  * via the LOV, and it _knows_ it's reading inside the file, it's just that
816  * this stripe never got written at or beyond this stripe offset yet. */
817 static void handle_short_read(int nob_read, size_t page_count,
818                               struct brw_page **pga)
819 {
820         char *ptr;
821         int i = 0;
822
823         /* skip bytes read OK */
824         while (nob_read > 0) {
825                 LASSERT (page_count > 0);
826
827                 if (pga[i]->count > nob_read) {
828                         /* EOF inside this page */
829                         ptr = kmap(pga[i]->pg) +
830                                 (pga[i]->off & ~PAGE_MASK);
831                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
832                         kunmap(pga[i]->pg);
833                         page_count--;
834                         i++;
835                         break;
836                 }
837
838                 nob_read -= pga[i]->count;
839                 page_count--;
840                 i++;
841         }
842
843         /* zero remaining pages */
844         while (page_count-- > 0) {
845                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
846                 memset(ptr, 0, pga[i]->count);
847                 kunmap(pga[i]->pg);
848                 i++;
849         }
850 }
851
852 static int check_write_rcs(struct ptlrpc_request *req,
853                            int requested_nob, int niocount,
854                            size_t page_count, struct brw_page **pga)
855 {
856         int     i;
857         __u32   *remote_rcs;
858
859         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
860                                                   sizeof(*remote_rcs) *
861                                                   niocount);
862         if (remote_rcs == NULL) {
863                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
864                 return(-EPROTO);
865         }
866
867         /* return error if any niobuf was in error */
868         for (i = 0; i < niocount; i++) {
869                 if ((int)remote_rcs[i] < 0)
870                         return(remote_rcs[i]);
871
872                 if (remote_rcs[i] != 0) {
873                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
874                                 i, remote_rcs[i], req);
875                         return(-EPROTO);
876                 }
877         }
878
879         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881                        req->rq_bulk->bd_nob_transferred, requested_nob);
882                 return(-EPROTO);
883         }
884
885         return (0);
886 }
887
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 {
890         if (p1->flag != p2->flag) {
891                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
892                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
893                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
894
895                 /* warn if we try to combine flags that we don't know to be
896                  * safe to combine */
897                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
898                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
899                               "report this at https://jira.hpdd.intel.com/\n",
900                               p1->flag, p2->flag);
901                 }
902                 return 0;
903         }
904
905         return (p1->off + p1->count == p2->off);
906 }
907
908 static u32 osc_checksum_bulk(int nob, size_t pg_count,
909                              struct brw_page **pga, int opc,
910                              cksum_type_t cksum_type)
911 {
912         u32                             cksum;
913         int                             i = 0;
914         struct cfs_crypto_hash_desc     *hdesc;
915         unsigned int                    bufsize;
916         int                             err;
917         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
918
919         LASSERT(pg_count > 0);
920
921         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
922         if (IS_ERR(hdesc)) {
923                 CERROR("Unable to initialize checksum hash %s\n",
924                        cfs_crypto_hash_name(cfs_alg));
925                 return PTR_ERR(hdesc);
926         }
927
928         while (nob > 0 && pg_count > 0) {
929                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
930
931                 /* corrupt the data before we compute the checksum, to
932                  * simulate an OST->client data error */
933                 if (i == 0 && opc == OST_READ &&
934                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
935                         unsigned char *ptr = kmap(pga[i]->pg);
936                         int off = pga[i]->off & ~PAGE_MASK;
937
938                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
939                         kunmap(pga[i]->pg);
940                 }
941                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
942                                             pga[i]->off & ~PAGE_MASK,
943                                             count);
944                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
945                                (int)(pga[i]->off & ~PAGE_MASK));
946
947                 nob -= pga[i]->count;
948                 pg_count--;
949                 i++;
950         }
951
952         bufsize = sizeof(cksum);
953         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
954
955         /* For sending we only compute the wrong checksum instead
956          * of corrupting the data so it is still correct on a redo */
957         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
958                 cksum++;
959
960         return cksum;
961 }
962
963 static int
964 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
965                      u32 page_count, struct brw_page **pga,
966                      struct ptlrpc_request **reqp, int resend)
967 {
968         struct ptlrpc_request   *req;
969         struct ptlrpc_bulk_desc *desc;
970         struct ost_body         *body;
971         struct obd_ioobj        *ioobj;
972         struct niobuf_remote    *niobuf;
973         int niocount, i, requested_nob, opc, rc;
974         struct osc_brw_async_args *aa;
975         struct req_capsule      *pill;
976         struct brw_page *pg_prev;
977
978         ENTRY;
979         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
980                 RETURN(-ENOMEM); /* Recoverable */
981         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
982                 RETURN(-EINVAL); /* Fatal */
983
984         if ((cmd & OBD_BRW_WRITE) != 0) {
985                 opc = OST_WRITE;
986                 req = ptlrpc_request_alloc_pool(cli->cl_import,
987                                                 cli->cl_import->imp_rq_pool,
988                                                 &RQF_OST_BRW_WRITE);
989         } else {
990                 opc = OST_READ;
991                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
992         }
993         if (req == NULL)
994                 RETURN(-ENOMEM);
995
996         for (niocount = i = 1; i < page_count; i++) {
997                 if (!can_merge_pages(pga[i - 1], pga[i]))
998                         niocount++;
999         }
1000
1001         pill = &req->rq_pill;
1002         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1003                              sizeof(*ioobj));
1004         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1005                              niocount * sizeof(*niobuf));
1006
1007         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1008         if (rc) {
1009                 ptlrpc_request_free(req);
1010                 RETURN(rc);
1011         }
1012         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1013         ptlrpc_at_set_req_timeout(req);
1014         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1015          * retry logic */
1016         req->rq_no_retry_einprogress = 1;
1017
1018         desc = ptlrpc_prep_bulk_imp(req, page_count,
1019                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1020                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1021                         PTLRPC_BULK_PUT_SINK) |
1022                         PTLRPC_BULK_BUF_KIOV,
1023                 OST_BULK_PORTAL,
1024                 &ptlrpc_bulk_kiov_pin_ops);
1025
1026         if (desc == NULL)
1027                 GOTO(out, rc = -ENOMEM);
1028         /* NB request now owns desc and will free it when it gets freed */
1029
1030         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1031         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1032         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1033         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1034
1035         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1036
1037         obdo_to_ioobj(oa, ioobj);
1038         ioobj->ioo_bufcnt = niocount;
1039         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1040          * that might be send for this request.  The actual number is decided
1041          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1042          * "max - 1" for old client compatibility sending "0", and also so the
1043          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1044         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1045         LASSERT(page_count > 0);
1046         pg_prev = pga[0];
1047         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1048                 struct brw_page *pg = pga[i];
1049                 int poff = pg->off & ~PAGE_MASK;
1050
1051                 LASSERT(pg->count > 0);
1052                 /* make sure there is no gap in the middle of page array */
1053                 LASSERTF(page_count == 1 ||
1054                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1055                           ergo(i > 0 && i < page_count - 1,
1056                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1057                           ergo(i == page_count - 1, poff == 0)),
1058                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1059                          i, page_count, pg, pg->off, pg->count);
1060                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1061                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1062                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1063                          i, page_count,
1064                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1065                          pg_prev->pg, page_private(pg_prev->pg),
1066                          pg_prev->pg->index, pg_prev->off);
1067                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1068                         (pg->flag & OBD_BRW_SRVLOCK));
1069
1070                 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1071                 requested_nob += pg->count;
1072
1073                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1074                         niobuf--;
1075                         niobuf->rnb_len += pg->count;
1076                 } else {
1077                         niobuf->rnb_offset = pg->off;
1078                         niobuf->rnb_len    = pg->count;
1079                         niobuf->rnb_flags  = pg->flag;
1080                 }
1081                 pg_prev = pg;
1082         }
1083
1084         LASSERTF((void *)(niobuf - niocount) ==
1085                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1086                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1087                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1088
1089         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1090         if (resend) {
1091                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1092                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1093                         body->oa.o_flags = 0;
1094                 }
1095                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1096         }
1097
1098         if (osc_should_shrink_grant(cli))
1099                 osc_shrink_grant_local(cli, &body->oa);
1100
1101         /* size[REQ_REC_OFF] still sizeof (*body) */
1102         if (opc == OST_WRITE) {
1103                 if (cli->cl_checksum &&
1104                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1105                         /* store cl_cksum_type in a local variable since
1106                          * it can be changed via lprocfs */
1107                         cksum_type_t cksum_type = cli->cl_cksum_type;
1108
1109                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1110                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1111                                 body->oa.o_flags = 0;
1112                         }
1113                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1114                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1115                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1116                                                              page_count, pga,
1117                                                              OST_WRITE,
1118                                                              cksum_type);
1119                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1120                                body->oa.o_cksum);
1121                         /* save this in 'oa', too, for later checking */
1122                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1123                         oa->o_flags |= cksum_type_pack(cksum_type);
1124                 } else {
1125                         /* clear out the checksum flag, in case this is a
1126                          * resend but cl_checksum is no longer set. b=11238 */
1127                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1128                 }
1129                 oa->o_cksum = body->oa.o_cksum;
1130                 /* 1 RC per niobuf */
1131                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1132                                      sizeof(__u32) * niocount);
1133         } else {
1134                 if (cli->cl_checksum &&
1135                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1136                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1137                                 body->oa.o_flags = 0;
1138                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1139                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140                 }
1141         }
1142         ptlrpc_request_set_replen(req);
1143
1144         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1145         aa = ptlrpc_req_async_args(req);
1146         aa->aa_oa = oa;
1147         aa->aa_requested_nob = requested_nob;
1148         aa->aa_nio_count = niocount;
1149         aa->aa_page_count = page_count;
1150         aa->aa_resends = 0;
1151         aa->aa_ppga = pga;
1152         aa->aa_cli = cli;
1153         INIT_LIST_HEAD(&aa->aa_oaps);
1154
1155         *reqp = req;
1156         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1157         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1158                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1159                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1160         RETURN(0);
1161
1162  out:
1163         ptlrpc_req_finished(req);
1164         RETURN(rc);
1165 }
1166
1167 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1168                                 __u32 client_cksum, __u32 server_cksum, int nob,
1169                                 size_t page_count, struct brw_page **pga,
1170                                 cksum_type_t client_cksum_type)
1171 {
1172         __u32 new_cksum;
1173         char *msg;
1174         cksum_type_t cksum_type;
1175
1176         if (server_cksum == client_cksum) {
1177                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1178                 return 0;
1179         }
1180
1181         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1182                                        oa->o_flags : 0);
1183         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1184                                       cksum_type);
1185
1186         if (cksum_type != client_cksum_type)
1187                 msg = "the server did not use the checksum type specified in "
1188                       "the original request - likely a protocol problem";
1189         else if (new_cksum == server_cksum)
1190                 msg = "changed on the client after we checksummed it - "
1191                       "likely false positive due to mmap IO (bug 11742)";
1192         else if (new_cksum == client_cksum)
1193                 msg = "changed in transit before arrival at OST";
1194         else
1195                 msg = "changed in transit AND doesn't match the original - "
1196                       "likely false positive due to mmap IO (bug 11742)";
1197
1198         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1199                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1200                            msg, libcfs_nid2str(peer->nid),
1201                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1202                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1203                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1204                            POSTID(&oa->o_oi), pga[0]->off,
1205                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1206         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1207                "client csum now %x\n", client_cksum, client_cksum_type,
1208                server_cksum, cksum_type, new_cksum);
1209         return 1;
1210 }
1211
1212 /* Note rc enters this function as number of bytes transferred */
1213 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1214 {
1215         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1216         const lnet_process_id_t *peer =
1217                         &req->rq_import->imp_connection->c_peer;
1218         struct client_obd *cli = aa->aa_cli;
1219         struct ost_body *body;
1220         u32 client_cksum = 0;
1221         ENTRY;
1222
1223         if (rc < 0 && rc != -EDQUOT) {
1224                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1225                 RETURN(rc);
1226         }
1227
1228         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1229         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1230         if (body == NULL) {
1231                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1232                 RETURN(-EPROTO);
1233         }
1234
1235         /* set/clear over quota flag for a uid/gid */
1236         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1237             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1238                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1239
1240                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1241                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1242                        body->oa.o_flags);
1243                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1244         }
1245
1246         osc_update_grant(cli, body);
1247
1248         if (rc < 0)
1249                 RETURN(rc);
1250
1251         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1252                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1253
1254         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1255                 if (rc > 0) {
1256                         CERROR("Unexpected +ve rc %d\n", rc);
1257                         RETURN(-EPROTO);
1258                 }
1259                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1260
1261                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1262                         RETURN(-EAGAIN);
1263
1264                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1265                     check_write_checksum(&body->oa, peer, client_cksum,
1266                                          body->oa.o_cksum, aa->aa_requested_nob,
1267                                          aa->aa_page_count, aa->aa_ppga,
1268                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1269                         RETURN(-EAGAIN);
1270
1271                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1272                                      aa->aa_page_count, aa->aa_ppga);
1273                 GOTO(out, rc);
1274         }
1275
1276         /* The rest of this function executes only for OST_READs */
1277
1278         /* if unwrap_bulk failed, return -EAGAIN to retry */
1279         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1280         if (rc < 0)
1281                 GOTO(out, rc = -EAGAIN);
1282
1283         if (rc > aa->aa_requested_nob) {
1284                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1285                        aa->aa_requested_nob);
1286                 RETURN(-EPROTO);
1287         }
1288
1289         if (rc != req->rq_bulk->bd_nob_transferred) {
1290                 CERROR ("Unexpected rc %d (%d transferred)\n",
1291                         rc, req->rq_bulk->bd_nob_transferred);
1292                 return (-EPROTO);
1293         }
1294
1295         if (rc < aa->aa_requested_nob)
1296                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1297
1298         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1299                 static int cksum_counter;
1300                 u32        server_cksum = body->oa.o_cksum;
1301                 char      *via = "";
1302                 char      *router = "";
1303                 cksum_type_t cksum_type;
1304
1305                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1306                                                body->oa.o_flags : 0);
1307                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1308                                                  aa->aa_ppga, OST_READ,
1309                                                  cksum_type);
1310
1311                 if (peer->nid != req->rq_bulk->bd_sender) {
1312                         via = " via ";
1313                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1314                 }
1315
1316                 if (server_cksum != client_cksum) {
1317                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1318                                            "%s%s%s inode "DFID" object "DOSTID
1319                                            " extent ["LPU64"-"LPU64"]\n",
1320                                            req->rq_import->imp_obd->obd_name,
1321                                            libcfs_nid2str(peer->nid),
1322                                            via, router,
1323                                            body->oa.o_valid & OBD_MD_FLFID ?
1324                                                 body->oa.o_parent_seq : (__u64)0,
1325                                            body->oa.o_valid & OBD_MD_FLFID ?
1326                                                 body->oa.o_parent_oid : 0,
1327                                            body->oa.o_valid & OBD_MD_FLFID ?
1328                                                 body->oa.o_parent_ver : 0,
1329                                            POSTID(&body->oa.o_oi),
1330                                            aa->aa_ppga[0]->off,
1331                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1332                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1333                                                                         1);
1334                         CERROR("client %x, server %x, cksum_type %x\n",
1335                                client_cksum, server_cksum, cksum_type);
1336                         cksum_counter = 0;
1337                         aa->aa_oa->o_cksum = client_cksum;
1338                         rc = -EAGAIN;
1339                 } else {
1340                         cksum_counter++;
1341                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1342                         rc = 0;
1343                 }
1344         } else if (unlikely(client_cksum)) {
1345                 static int cksum_missed;
1346
1347                 cksum_missed++;
1348                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1349                         CERROR("Checksum %u requested from %s but not sent\n",
1350                                cksum_missed, libcfs_nid2str(peer->nid));
1351         } else {
1352                 rc = 0;
1353         }
1354 out:
1355         if (rc >= 0)
1356                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1357                                      aa->aa_oa, &body->oa);
1358
1359         RETURN(rc);
1360 }
1361
1362 static int osc_brw_redo_request(struct ptlrpc_request *request,
1363                                 struct osc_brw_async_args *aa, int rc)
1364 {
1365         struct ptlrpc_request *new_req;
1366         struct osc_brw_async_args *new_aa;
1367         struct osc_async_page *oap;
1368         ENTRY;
1369
1370         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1371                   "redo for recoverable error %d", rc);
1372
1373         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1374                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1375                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1376                                   aa->aa_ppga, &new_req, 1);
1377         if (rc)
1378                 RETURN(rc);
1379
1380         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1381                 if (oap->oap_request != NULL) {
1382                         LASSERTF(request == oap->oap_request,
1383                                  "request %p != oap_request %p\n",
1384                                  request, oap->oap_request);
1385                         if (oap->oap_interrupted) {
1386                                 ptlrpc_req_finished(new_req);
1387                                 RETURN(-EINTR);
1388                         }
1389                 }
1390         }
1391         /* New request takes over pga and oaps from old request.
1392          * Note that copying a list_head doesn't work, need to move it... */
1393         aa->aa_resends++;
1394         new_req->rq_interpret_reply = request->rq_interpret_reply;
1395         new_req->rq_async_args = request->rq_async_args;
1396         new_req->rq_commit_cb = request->rq_commit_cb;
1397         /* cap resend delay to the current request timeout, this is similar to
1398          * what ptlrpc does (see after_reply()) */
1399         if (aa->aa_resends > new_req->rq_timeout)
1400                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1401         else
1402                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1403         new_req->rq_generation_set = 1;
1404         new_req->rq_import_generation = request->rq_import_generation;
1405
1406         new_aa = ptlrpc_req_async_args(new_req);
1407
1408         INIT_LIST_HEAD(&new_aa->aa_oaps);
1409         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1410         INIT_LIST_HEAD(&new_aa->aa_exts);
1411         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1412         new_aa->aa_resends = aa->aa_resends;
1413
1414         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1415                 if (oap->oap_request) {
1416                         ptlrpc_req_finished(oap->oap_request);
1417                         oap->oap_request = ptlrpc_request_addref(new_req);
1418                 }
1419         }
1420
1421         /* XXX: This code will run into problem if we're going to support
1422          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1423          * and wait for all of them to be finished. We should inherit request
1424          * set from old request. */
1425         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1426
1427         DEBUG_REQ(D_INFO, new_req, "new request");
1428         RETURN(0);
1429 }
1430
1431 /*
1432  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1433  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1434  * fine for our small page arrays and doesn't require allocation.  its an
1435  * insertion sort that swaps elements that are strides apart, shrinking the
1436  * stride down until its '1' and the array is sorted.
1437  */
1438 static void sort_brw_pages(struct brw_page **array, int num)
1439 {
1440         int stride, i, j;
1441         struct brw_page *tmp;
1442
1443         if (num == 1)
1444                 return;
1445         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1446                 ;
1447
1448         do {
1449                 stride /= 3;
1450                 for (i = stride ; i < num ; i++) {
1451                         tmp = array[i];
1452                         j = i;
1453                         while (j >= stride && array[j - stride]->off > tmp->off) {
1454                                 array[j] = array[j - stride];
1455                                 j -= stride;
1456                         }
1457                         array[j] = tmp;
1458                 }
1459         } while (stride > 1);
1460 }
1461
1462 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1463 {
1464         LASSERT(ppga != NULL);
1465         OBD_FREE(ppga, sizeof(*ppga) * count);
1466 }
1467
1468 static int brw_interpret(const struct lu_env *env,
1469                          struct ptlrpc_request *req, void *data, int rc)
1470 {
1471         struct osc_brw_async_args *aa = data;
1472         struct osc_extent *ext;
1473         struct osc_extent *tmp;
1474         struct client_obd *cli = aa->aa_cli;
1475         ENTRY;
1476
1477         rc = osc_brw_fini_request(req, rc);
1478         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1479         /* When server return -EINPROGRESS, client should always retry
1480          * regardless of the number of times the bulk was resent already. */
1481         if (osc_recoverable_error(rc)) {
1482                 if (req->rq_import_generation !=
1483                     req->rq_import->imp_generation) {
1484                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1485                                ""DOSTID", rc = %d.\n",
1486                                req->rq_import->imp_obd->obd_name,
1487                                POSTID(&aa->aa_oa->o_oi), rc);
1488                 } else if (rc == -EINPROGRESS ||
1489                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1490                         rc = osc_brw_redo_request(req, aa, rc);
1491                 } else {
1492                         CERROR("%s: too many resent retries for object: "
1493                                ""LPU64":"LPU64", rc = %d.\n",
1494                                req->rq_import->imp_obd->obd_name,
1495                                POSTID(&aa->aa_oa->o_oi), rc);
1496                 }
1497
1498                 if (rc == 0)
1499                         RETURN(0);
1500                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1501                         rc = -EIO;
1502         }
1503
1504         if (rc == 0) {
1505                 struct obdo *oa = aa->aa_oa;
1506                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1507                 unsigned long valid = 0;
1508                 struct cl_object *obj;
1509                 struct osc_async_page *last;
1510
1511                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1512                 obj = osc2cl(last->oap_obj);
1513
1514                 cl_object_attr_lock(obj);
1515                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1516                         attr->cat_blocks = oa->o_blocks;
1517                         valid |= CAT_BLOCKS;
1518                 }
1519                 if (oa->o_valid & OBD_MD_FLMTIME) {
1520                         attr->cat_mtime = oa->o_mtime;
1521                         valid |= CAT_MTIME;
1522                 }
1523                 if (oa->o_valid & OBD_MD_FLATIME) {
1524                         attr->cat_atime = oa->o_atime;
1525                         valid |= CAT_ATIME;
1526                 }
1527                 if (oa->o_valid & OBD_MD_FLCTIME) {
1528                         attr->cat_ctime = oa->o_ctime;
1529                         valid |= CAT_CTIME;
1530                 }
1531
1532                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1533                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1534                         loff_t last_off = last->oap_count + last->oap_obj_off +
1535                                 last->oap_page_off;
1536
1537                         /* Change file size if this is an out of quota or
1538                          * direct IO write and it extends the file size */
1539                         if (loi->loi_lvb.lvb_size < last_off) {
1540                                 attr->cat_size = last_off;
1541                                 valid |= CAT_SIZE;
1542                         }
1543                         /* Extend KMS if it's not a lockless write */
1544                         if (loi->loi_kms < last_off &&
1545                             oap2osc_page(last)->ops_srvlock == 0) {
1546                                 attr->cat_kms = last_off;
1547                                 valid |= CAT_KMS;
1548                         }
1549                 }
1550
1551                 if (valid != 0)
1552                         cl_object_attr_update(env, obj, attr, valid);
1553                 cl_object_attr_unlock(obj);
1554         }
1555         OBDO_FREE(aa->aa_oa);
1556
1557         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1558                 osc_inc_unstable_pages(req);
1559
1560         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1561                 list_del_init(&ext->oe_link);
1562                 osc_extent_finish(env, ext, 1, rc);
1563         }
1564         LASSERT(list_empty(&aa->aa_exts));
1565         LASSERT(list_empty(&aa->aa_oaps));
1566
1567         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1568                           req->rq_bulk->bd_nob_transferred);
1569         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1570         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1571
1572         spin_lock(&cli->cl_loi_list_lock);
1573         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1574          * is called so we know whether to go to sync BRWs or wait for more
1575          * RPCs to complete */
1576         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1577                 cli->cl_w_in_flight--;
1578         else
1579                 cli->cl_r_in_flight--;
1580         osc_wake_cache_waiters(cli);
1581         spin_unlock(&cli->cl_loi_list_lock);
1582
1583         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1584         RETURN(rc);
1585 }
1586
1587 static void brw_commit(struct ptlrpc_request *req)
1588 {
1589         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1590          * this called via the rq_commit_cb, I need to ensure
1591          * osc_dec_unstable_pages is still called. Otherwise unstable
1592          * pages may be leaked. */
1593         spin_lock(&req->rq_lock);
1594         if (likely(req->rq_unstable)) {
1595                 req->rq_unstable = 0;
1596                 spin_unlock(&req->rq_lock);
1597
1598                 osc_dec_unstable_pages(req);
1599         } else {
1600                 req->rq_committed = 1;
1601                 spin_unlock(&req->rq_lock);
1602         }
1603 }
1604
1605 /**
1606  * Build an RPC by the list of extent @ext_list. The caller must ensure
1607  * that the total pages in this list are NOT over max pages per RPC.
1608  * Extents in the list must be in OES_RPC state.
1609  */
1610 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1611                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1612 {
1613         struct ptlrpc_request           *req = NULL;
1614         struct osc_extent               *ext;
1615         struct brw_page                 **pga = NULL;
1616         struct osc_brw_async_args       *aa = NULL;
1617         struct obdo                     *oa = NULL;
1618         struct osc_async_page           *oap;
1619         struct osc_async_page           *tmp;
1620         struct cl_req                   *clerq = NULL;
1621         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1622                                                                       CRT_READ;
1623         struct cl_req_attr              *crattr = NULL;
1624         loff_t                          starting_offset = OBD_OBJECT_EOF;
1625         loff_t                          ending_offset = 0;
1626         int                             mpflag = 0;
1627         int                             mem_tight = 0;
1628         int                             page_count = 0;
1629         bool                            soft_sync = false;
1630         int                             i;
1631         int                             rc;
1632         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1633         struct ost_body                 *body;
1634         ENTRY;
1635         LASSERT(!list_empty(ext_list));
1636
1637         /* add pages into rpc_list to build BRW rpc */
1638         list_for_each_entry(ext, ext_list, oe_link) {
1639                 LASSERT(ext->oe_state == OES_RPC);
1640                 mem_tight |= ext->oe_memalloc;
1641                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1642                         ++page_count;
1643                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1644                         if (starting_offset == OBD_OBJECT_EOF ||
1645                             starting_offset > oap->oap_obj_off)
1646                                 starting_offset = oap->oap_obj_off;
1647                         else
1648                                 LASSERT(oap->oap_page_off == 0);
1649                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1650                                 ending_offset = oap->oap_obj_off +
1651                                                 oap->oap_count;
1652                         else
1653                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1654                                         PAGE_CACHE_SIZE);
1655                 }
1656         }
1657
1658         soft_sync = osc_over_unstable_soft_limit(cli);
1659         if (mem_tight)
1660                 mpflag = cfs_memory_pressure_get_and_set();
1661
1662         OBD_ALLOC(crattr, sizeof(*crattr));
1663         if (crattr == NULL)
1664                 GOTO(out, rc = -ENOMEM);
1665
1666         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1667         if (pga == NULL)
1668                 GOTO(out, rc = -ENOMEM);
1669
1670         OBDO_ALLOC(oa);
1671         if (oa == NULL)
1672                 GOTO(out, rc = -ENOMEM);
1673
1674         i = 0;
1675         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1676                 struct cl_page *page = oap2cl_page(oap);
1677                 if (clerq == NULL) {
1678                         clerq = cl_req_alloc(env, page, crt,
1679                                              1 /* only 1-object rpcs for now */);
1680                         if (IS_ERR(clerq))
1681                                 GOTO(out, rc = PTR_ERR(clerq));
1682                 }
1683                 if (mem_tight)
1684                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1685                 if (soft_sync)
1686                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1687                 pga[i] = &oap->oap_brw_page;
1688                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1689                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1690                        pga[i]->pg, page_index(oap->oap_page), oap,
1691                        pga[i]->flag);
1692                 i++;
1693                 cl_req_page_add(env, clerq, page);
1694         }
1695
1696         /* always get the data for the obdo for the rpc */
1697         LASSERT(clerq != NULL);
1698         crattr->cra_oa = oa;
1699         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1700
1701         rc = cl_req_prep(env, clerq);
1702         if (rc != 0) {
1703                 CERROR("cl_req_prep failed: %d\n", rc);
1704                 GOTO(out, rc);
1705         }
1706
1707         sort_brw_pages(pga, page_count);
1708         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1709         if (rc != 0) {
1710                 CERROR("prep_req failed: %d\n", rc);
1711                 GOTO(out, rc);
1712         }
1713
1714         req->rq_commit_cb = brw_commit;
1715         req->rq_interpret_reply = brw_interpret;
1716
1717         if (mem_tight != 0)
1718                 req->rq_memalloc = 1;
1719
1720         /* Need to update the timestamps after the request is built in case
1721          * we race with setattr (locally or in queue at OST).  If OST gets
1722          * later setattr before earlier BRW (as determined by the request xid),
1723          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1724          * way to do this in a single call.  bug 10150 */
1725         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1726         crattr->cra_oa = &body->oa;
1727         cl_req_attr_set(env, clerq, crattr,
1728                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1729
1730         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1731
1732         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1733         aa = ptlrpc_req_async_args(req);
1734         INIT_LIST_HEAD(&aa->aa_oaps);
1735         list_splice_init(&rpc_list, &aa->aa_oaps);
1736         INIT_LIST_HEAD(&aa->aa_exts);
1737         list_splice_init(ext_list, &aa->aa_exts);
1738         aa->aa_clerq = clerq;
1739
1740         /* queued sync pages can be torn down while the pages
1741          * were between the pending list and the rpc */
1742         tmp = NULL;
1743         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1744                 /* only one oap gets a request reference */
1745                 if (tmp == NULL)
1746                         tmp = oap;
1747                 if (oap->oap_interrupted && !req->rq_intr) {
1748                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1749                                         oap, req);
1750                         ptlrpc_mark_interrupted(req);
1751                 }
1752         }
1753         if (tmp != NULL)
1754                 tmp->oap_request = ptlrpc_request_addref(req);
1755
1756         spin_lock(&cli->cl_loi_list_lock);
1757         starting_offset >>= PAGE_CACHE_SHIFT;
1758         if (cmd == OBD_BRW_READ) {
1759                 cli->cl_r_in_flight++;
1760                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1761                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1762                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1763                                       starting_offset + 1);
1764         } else {
1765                 cli->cl_w_in_flight++;
1766                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1767                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1768                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1769                                       starting_offset + 1);
1770         }
1771         spin_unlock(&cli->cl_loi_list_lock);
1772
1773         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1774                   page_count, aa, cli->cl_r_in_flight,
1775                   cli->cl_w_in_flight);
1776
1777         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1778          * see which CPU/NUMA node the majority of pages were allocated
1779          * on, and try to assign the async RPC to the CPU core
1780          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1781          *
1782          * But on the other hand, we expect that multiple ptlrpcd
1783          * threads and the initial write sponsor can run in parallel,
1784          * especially when data checksum is enabled, which is CPU-bound
1785          * operation and single ptlrpcd thread cannot process in time.
1786          * So more ptlrpcd threads sharing BRW load
1787          * (with PDL_POLICY_ROUND) seems better.
1788          */
1789         ptlrpcd_add_req(req, pol, -1);
1790         rc = 0;
1791         EXIT;
1792
1793 out:
1794         if (mem_tight != 0)
1795                 cfs_memory_pressure_restore(mpflag);
1796
1797         if (crattr != NULL)
1798                 OBD_FREE(crattr, sizeof(*crattr));
1799
1800         if (rc != 0) {
1801                 LASSERT(req == NULL);
1802
1803                 if (oa)
1804                         OBDO_FREE(oa);
1805                 if (pga)
1806                         OBD_FREE(pga, sizeof(*pga) * page_count);
1807                 /* this should happen rarely and is pretty bad, it makes the
1808                  * pending list not follow the dirty order */
1809                 while (!list_empty(ext_list)) {
1810                         ext = list_entry(ext_list->next, struct osc_extent,
1811                                          oe_link);
1812                         list_del_init(&ext->oe_link);
1813                         osc_extent_finish(env, ext, 0, rc);
1814                 }
1815                 if (clerq && !IS_ERR(clerq))
1816                         cl_req_completion(env, clerq, rc);
1817         }
1818         RETURN(rc);
1819 }
1820
1821 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1822                                         struct ldlm_enqueue_info *einfo)
1823 {
1824         void *data = einfo->ei_cbdata;
1825         int set = 0;
1826
1827         LASSERT(lock != NULL);
1828         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1829         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1830         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1831         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1832
1833         lock_res_and_lock(lock);
1834
1835         if (lock->l_ast_data == NULL)
1836                 lock->l_ast_data = data;
1837         if (lock->l_ast_data == data)
1838                 set = 1;
1839
1840         unlock_res_and_lock(lock);
1841
1842         return set;
1843 }
1844
1845 static int osc_set_data_with_check(struct lustre_handle *lockh,
1846                                    struct ldlm_enqueue_info *einfo)
1847 {
1848         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1849         int set = 0;
1850
1851         if (lock != NULL) {
1852                 set = osc_set_lock_data_with_check(lock, einfo);
1853                 LDLM_LOCK_PUT(lock);
1854         } else
1855                 CERROR("lockh %p, data %p - client evicted?\n",
1856                        lockh, einfo->ei_cbdata);
1857         return set;
1858 }
1859
1860 static int osc_enqueue_fini(struct ptlrpc_request *req,
1861                             osc_enqueue_upcall_f upcall, void *cookie,
1862                             struct lustre_handle *lockh, ldlm_mode_t mode,
1863                             __u64 *flags, int agl, int errcode)
1864 {
1865         bool intent = *flags & LDLM_FL_HAS_INTENT;
1866         int rc;
1867         ENTRY;
1868
1869         /* The request was created before ldlm_cli_enqueue call. */
1870         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1871                 struct ldlm_reply *rep;
1872
1873                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1874                 LASSERT(rep != NULL);
1875
1876                 rep->lock_policy_res1 =
1877                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1878                 if (rep->lock_policy_res1)
1879                         errcode = rep->lock_policy_res1;
1880                 if (!agl)
1881                         *flags |= LDLM_FL_LVB_READY;
1882         } else if (errcode == ELDLM_OK) {
1883                 *flags |= LDLM_FL_LVB_READY;
1884         }
1885
1886         /* Call the update callback. */
1887         rc = (*upcall)(cookie, lockh, errcode);
1888
1889         /* release the reference taken in ldlm_cli_enqueue() */
1890         if (errcode == ELDLM_LOCK_MATCHED)
1891                 errcode = ELDLM_OK;
1892         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1893                 ldlm_lock_decref(lockh, mode);
1894
1895         RETURN(rc);
1896 }
1897
1898 static int osc_enqueue_interpret(const struct lu_env *env,
1899                                  struct ptlrpc_request *req,
1900                                  struct osc_enqueue_args *aa, int rc)
1901 {
1902         struct ldlm_lock *lock;
1903         struct lustre_handle *lockh = &aa->oa_lockh;
1904         ldlm_mode_t mode = aa->oa_mode;
1905         struct ost_lvb *lvb = aa->oa_lvb;
1906         __u32 lvb_len = sizeof(*lvb);
1907         __u64 flags = 0;
1908
1909         ENTRY;
1910
1911         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1912          * be valid. */
1913         lock = ldlm_handle2lock(lockh);
1914         LASSERTF(lock != NULL,
1915                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1916                  lockh->cookie, req, aa);
1917
1918         /* Take an additional reference so that a blocking AST that
1919          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1920          * to arrive after an upcall has been executed by
1921          * osc_enqueue_fini(). */
1922         ldlm_lock_addref(lockh, mode);
1923
1924         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1925         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1926
1927         /* Let CP AST to grant the lock first. */
1928         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1929
1930         if (aa->oa_agl) {
1931                 LASSERT(aa->oa_lvb == NULL);
1932                 LASSERT(aa->oa_flags == NULL);
1933                 aa->oa_flags = &flags;
1934         }
1935
1936         /* Complete obtaining the lock procedure. */
1937         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1938                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1939                                    lockh, rc);
1940         /* Complete osc stuff. */
1941         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1942                               aa->oa_flags, aa->oa_agl, rc);
1943
1944         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1945
1946         ldlm_lock_decref(lockh, mode);
1947         LDLM_LOCK_PUT(lock);
1948         RETURN(rc);
1949 }
1950
1951 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1952
1953 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1954  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1955  * other synchronous requests, however keeping some locks and trying to obtain
1956  * others may take a considerable amount of time in a case of ost failure; and
1957  * when other sync requests do not get released lock from a client, the client
1958  * is evicted from the cluster -- such scenarious make the life difficult, so
1959  * release locks just after they are obtained. */
1960 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1961                      __u64 *flags, ldlm_policy_data_t *policy,
1962                      struct ost_lvb *lvb, int kms_valid,
1963                      osc_enqueue_upcall_f upcall, void *cookie,
1964                      struct ldlm_enqueue_info *einfo,
1965                      struct ptlrpc_request_set *rqset, int async, int agl)
1966 {
1967         struct obd_device *obd = exp->exp_obd;
1968         struct lustre_handle lockh = { 0 };
1969         struct ptlrpc_request *req = NULL;
1970         int intent = *flags & LDLM_FL_HAS_INTENT;
1971         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1972         ldlm_mode_t mode;
1973         int rc;
1974         ENTRY;
1975
1976         /* Filesystem lock extents are extended to page boundaries so that
1977          * dealing with the page cache is a little smoother.  */
1978         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1979         policy->l_extent.end |= ~PAGE_MASK;
1980
1981         /*
1982          * kms is not valid when either object is completely fresh (so that no
1983          * locks are cached), or object was evicted. In the latter case cached
1984          * lock cannot be used, because it would prime inode state with
1985          * potentially stale LVB.
1986          */
1987         if (!kms_valid)
1988                 goto no_match;
1989
1990         /* Next, search for already existing extent locks that will cover us */
1991         /* If we're trying to read, we also search for an existing PW lock.  The
1992          * VFS and page cache already protect us locally, so lots of readers/
1993          * writers can share a single PW lock.
1994          *
1995          * There are problems with conversion deadlocks, so instead of
1996          * converting a read lock to a write lock, we'll just enqueue a new
1997          * one.
1998          *
1999          * At some point we should cancel the read lock instead of making them
2000          * send us a blocking callback, but there are problems with canceling
2001          * locks out from other users right now, too. */
2002         mode = einfo->ei_mode;
2003         if (einfo->ei_mode == LCK_PR)
2004                 mode |= LCK_PW;
2005         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2006                                einfo->ei_type, policy, mode, &lockh, 0);
2007         if (mode) {
2008                 struct ldlm_lock *matched;
2009
2010                 if (*flags & LDLM_FL_TEST_LOCK)
2011                         RETURN(ELDLM_OK);
2012
2013                 matched = ldlm_handle2lock(&lockh);
2014                 if (agl) {
2015                         /* AGL enqueues DLM locks speculatively. Therefore if
2016                          * it already exists a DLM lock, it wll just inform the
2017                          * caller to cancel the AGL process for this stripe. */
2018                         ldlm_lock_decref(&lockh, mode);
2019                         LDLM_LOCK_PUT(matched);
2020                         RETURN(-ECANCELED);
2021                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2022                         *flags |= LDLM_FL_LVB_READY;
2023
2024                         /* We already have a lock, and it's referenced. */
2025                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2026
2027                         ldlm_lock_decref(&lockh, mode);
2028                         LDLM_LOCK_PUT(matched);
2029                         RETURN(ELDLM_OK);
2030                 } else {
2031                         ldlm_lock_decref(&lockh, mode);
2032                         LDLM_LOCK_PUT(matched);
2033                 }
2034         }
2035
2036 no_match:
2037         if (*flags & LDLM_FL_TEST_LOCK)
2038                 RETURN(-ENOLCK);
2039
2040         if (intent) {
2041                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2042                                            &RQF_LDLM_ENQUEUE_LVB);
2043                 if (req == NULL)
2044                         RETURN(-ENOMEM);
2045
2046                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2047                 if (rc < 0) {
2048                         ptlrpc_request_free(req);
2049                         RETURN(rc);
2050                 }
2051
2052                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2053                                      sizeof *lvb);
2054                 ptlrpc_request_set_replen(req);
2055         }
2056
2057         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2058         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2059
2060         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2061                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2062         if (async) {
2063                 if (!rc) {
2064                         struct osc_enqueue_args *aa;
2065                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2066                         aa = ptlrpc_req_async_args(req);
2067                         aa->oa_exp    = exp;
2068                         aa->oa_mode   = einfo->ei_mode;
2069                         aa->oa_type   = einfo->ei_type;
2070                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2071                         aa->oa_upcall = upcall;
2072                         aa->oa_cookie = cookie;
2073                         aa->oa_agl    = !!agl;
2074                         if (!agl) {
2075                                 aa->oa_flags  = flags;
2076                                 aa->oa_lvb    = lvb;
2077                         } else {
2078                                 /* AGL is essentially to enqueue an DLM lock
2079                                  * in advance, so we don't care about the
2080                                  * result of AGL enqueue. */
2081                                 aa->oa_lvb    = NULL;
2082                                 aa->oa_flags  = NULL;
2083                         }
2084
2085                         req->rq_interpret_reply =
2086                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2087                         if (rqset == PTLRPCD_SET)
2088                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2089                         else
2090                                 ptlrpc_set_add_req(rqset, req);
2091                 } else if (intent) {
2092                         ptlrpc_req_finished(req);
2093                 }
2094                 RETURN(rc);
2095         }
2096
2097         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2098                               flags, agl, rc);
2099         if (intent)
2100                 ptlrpc_req_finished(req);
2101
2102         RETURN(rc);
2103 }
2104
2105 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2106                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2107                    __u64 *flags, void *data, struct lustre_handle *lockh,
2108                    int unref)
2109 {
2110         struct obd_device *obd = exp->exp_obd;
2111         __u64 lflags = *flags;
2112         ldlm_mode_t rc;
2113         ENTRY;
2114
2115         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2116                 RETURN(-EIO);
2117
2118         /* Filesystem lock extents are extended to page boundaries so that
2119          * dealing with the page cache is a little smoother */
2120         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2121         policy->l_extent.end |= ~PAGE_MASK;
2122
2123         /* Next, search for already existing extent locks that will cover us */
2124         /* If we're trying to read, we also search for an existing PW lock.  The
2125          * VFS and page cache already protect us locally, so lots of readers/
2126          * writers can share a single PW lock. */
2127         rc = mode;
2128         if (mode == LCK_PR)
2129                 rc |= LCK_PW;
2130         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2131                              res_id, type, policy, rc, lockh, unref);
2132         if (rc) {
2133                 if (data != NULL) {
2134                         if (!osc_set_data_with_check(lockh, data)) {
2135                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2136                                         ldlm_lock_decref(lockh, rc);
2137                                 RETURN(0);
2138                         }
2139                 }
2140                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2141                         ldlm_lock_addref(lockh, LCK_PR);
2142                         ldlm_lock_decref(lockh, LCK_PW);
2143                 }
2144                 RETURN(rc);
2145         }
2146         RETURN(rc);
2147 }
2148
2149 static int osc_statfs_interpret(const struct lu_env *env,
2150                                 struct ptlrpc_request *req,
2151                                 struct osc_async_args *aa, int rc)
2152 {
2153         struct obd_statfs *msfs;
2154         ENTRY;
2155
2156         if (rc == -EBADR)
2157                 /* The request has in fact never been sent
2158                  * due to issues at a higher level (LOV).
2159                  * Exit immediately since the caller is
2160                  * aware of the problem and takes care
2161                  * of the clean up */
2162                  RETURN(rc);
2163
2164         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2165             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2166                 GOTO(out, rc = 0);
2167
2168         if (rc != 0)
2169                 GOTO(out, rc);
2170
2171         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2172         if (msfs == NULL) {
2173                 GOTO(out, rc = -EPROTO);
2174         }
2175
2176         *aa->aa_oi->oi_osfs = *msfs;
2177 out:
2178         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2179         RETURN(rc);
2180 }
2181
2182 static int osc_statfs_async(struct obd_export *exp,
2183                             struct obd_info *oinfo, __u64 max_age,
2184                             struct ptlrpc_request_set *rqset)
2185 {
2186         struct obd_device     *obd = class_exp2obd(exp);
2187         struct ptlrpc_request *req;
2188         struct osc_async_args *aa;
2189         int                    rc;
2190         ENTRY;
2191
2192         /* We could possibly pass max_age in the request (as an absolute
2193          * timestamp or a "seconds.usec ago") so the target can avoid doing
2194          * extra calls into the filesystem if that isn't necessary (e.g.
2195          * during mount that would help a bit).  Having relative timestamps
2196          * is not so great if request processing is slow, while absolute
2197          * timestamps are not ideal because they need time synchronization. */
2198         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2199         if (req == NULL)
2200                 RETURN(-ENOMEM);
2201
2202         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2203         if (rc) {
2204                 ptlrpc_request_free(req);
2205                 RETURN(rc);
2206         }
2207         ptlrpc_request_set_replen(req);
2208         req->rq_request_portal = OST_CREATE_PORTAL;
2209         ptlrpc_at_set_req_timeout(req);
2210
2211         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2212                 /* procfs requests not want stat in wait for avoid deadlock */
2213                 req->rq_no_resend = 1;
2214                 req->rq_no_delay = 1;
2215         }
2216
2217         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2218         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2219         aa = ptlrpc_req_async_args(req);
2220         aa->aa_oi = oinfo;
2221
2222         ptlrpc_set_add_req(rqset, req);
2223         RETURN(0);
2224 }
2225
2226 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2227                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2228 {
2229         struct obd_device     *obd = class_exp2obd(exp);
2230         struct obd_statfs     *msfs;
2231         struct ptlrpc_request *req;
2232         struct obd_import     *imp = NULL;
2233         int rc;
2234         ENTRY;
2235
2236         /*Since the request might also come from lprocfs, so we need
2237          *sync this with client_disconnect_export Bug15684*/
2238         down_read(&obd->u.cli.cl_sem);
2239         if (obd->u.cli.cl_import)
2240                 imp = class_import_get(obd->u.cli.cl_import);
2241         up_read(&obd->u.cli.cl_sem);
2242         if (!imp)
2243                 RETURN(-ENODEV);
2244
2245         /* We could possibly pass max_age in the request (as an absolute
2246          * timestamp or a "seconds.usec ago") so the target can avoid doing
2247          * extra calls into the filesystem if that isn't necessary (e.g.
2248          * during mount that would help a bit).  Having relative timestamps
2249          * is not so great if request processing is slow, while absolute
2250          * timestamps are not ideal because they need time synchronization. */
2251         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2252
2253         class_import_put(imp);
2254
2255         if (req == NULL)
2256                 RETURN(-ENOMEM);
2257
2258         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2259         if (rc) {
2260                 ptlrpc_request_free(req);
2261                 RETURN(rc);
2262         }
2263         ptlrpc_request_set_replen(req);
2264         req->rq_request_portal = OST_CREATE_PORTAL;
2265         ptlrpc_at_set_req_timeout(req);
2266
2267         if (flags & OBD_STATFS_NODELAY) {
2268                 /* procfs requests not want stat in wait for avoid deadlock */
2269                 req->rq_no_resend = 1;
2270                 req->rq_no_delay = 1;
2271         }
2272
2273         rc = ptlrpc_queue_wait(req);
2274         if (rc)
2275                 GOTO(out, rc);
2276
2277         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2278         if (msfs == NULL) {
2279                 GOTO(out, rc = -EPROTO);
2280         }
2281
2282         *osfs = *msfs;
2283
2284         EXIT;
2285  out:
2286         ptlrpc_req_finished(req);
2287         return rc;
2288 }
2289
2290 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2291                          void *karg, void *uarg)
2292 {
2293         struct obd_device *obd = exp->exp_obd;
2294         struct obd_ioctl_data *data = karg;
2295         int err = 0;
2296         ENTRY;
2297
2298         if (!try_module_get(THIS_MODULE)) {
2299                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2300                        module_name(THIS_MODULE));
2301                 return -EINVAL;
2302         }
2303         switch (cmd) {
2304         case OBD_IOC_CLIENT_RECOVER:
2305                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2306                                             data->ioc_inlbuf1, 0);
2307                 if (err > 0)
2308                         err = 0;
2309                 GOTO(out, err);
2310         case IOC_OSC_SET_ACTIVE:
2311                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2312                                                data->ioc_offset);
2313                 GOTO(out, err);
2314         case OBD_IOC_PING_TARGET:
2315                 err = ptlrpc_obd_ping(obd);
2316                 GOTO(out, err);
2317         default:
2318                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2319                        cmd, current_comm());
2320                 GOTO(out, err = -ENOTTY);
2321         }
2322 out:
2323         module_put(THIS_MODULE);
2324         return err;
2325 }
2326
2327 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2328                               u32 keylen, void *key,
2329                               u32 vallen, void *val,
2330                               struct ptlrpc_request_set *set)
2331 {
2332         struct ptlrpc_request *req;
2333         struct obd_device     *obd = exp->exp_obd;
2334         struct obd_import     *imp = class_exp2cliimp(exp);
2335         char                  *tmp;
2336         int                    rc;
2337         ENTRY;
2338
2339         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2340
2341         if (KEY_IS(KEY_CHECKSUM)) {
2342                 if (vallen != sizeof(int))
2343                         RETURN(-EINVAL);
2344                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2345                 RETURN(0);
2346         }
2347
2348         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2349                 sptlrpc_conf_client_adapt(obd);
2350                 RETURN(0);
2351         }
2352
2353         if (KEY_IS(KEY_FLUSH_CTX)) {
2354                 sptlrpc_import_flush_my_ctx(imp);
2355                 RETURN(0);
2356         }
2357
2358         if (KEY_IS(KEY_CACHE_SET)) {
2359                 struct client_obd *cli = &obd->u.cli;
2360
2361                 LASSERT(cli->cl_cache == NULL); /* only once */
2362                 cli->cl_cache = (struct cl_client_cache *)val;
2363                 cl_cache_incref(cli->cl_cache);
2364                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2365
2366                 /* add this osc into entity list */
2367                 LASSERT(list_empty(&cli->cl_lru_osc));
2368                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2369                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2370                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2371
2372                 RETURN(0);
2373         }
2374
2375         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2376                 struct client_obd *cli = &obd->u.cli;
2377                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2378                 long target = *(long *)val;
2379
2380                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2381                 *(long *)val -= nr;
2382                 RETURN(0);
2383         }
2384
2385         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2386                 RETURN(-EINVAL);
2387
2388         /* We pass all other commands directly to OST. Since nobody calls osc
2389            methods directly and everybody is supposed to go through LOV, we
2390            assume lov checked invalid values for us.
2391            The only recognised values so far are evict_by_nid and mds_conn.
2392            Even if something bad goes through, we'd get a -EINVAL from OST
2393            anyway. */
2394
2395         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2396                                                 &RQF_OST_SET_GRANT_INFO :
2397                                                 &RQF_OBD_SET_INFO);
2398         if (req == NULL)
2399                 RETURN(-ENOMEM);
2400
2401         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2402                              RCL_CLIENT, keylen);
2403         if (!KEY_IS(KEY_GRANT_SHRINK))
2404                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2405                                      RCL_CLIENT, vallen);
2406         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2407         if (rc) {
2408                 ptlrpc_request_free(req);
2409                 RETURN(rc);
2410         }
2411
2412         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2413         memcpy(tmp, key, keylen);
2414         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2415                                                         &RMF_OST_BODY :
2416                                                         &RMF_SETINFO_VAL);
2417         memcpy(tmp, val, vallen);
2418
2419         if (KEY_IS(KEY_GRANT_SHRINK)) {
2420                 struct osc_grant_args *aa;
2421                 struct obdo *oa;
2422
2423                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2424                 aa = ptlrpc_req_async_args(req);
2425                 OBDO_ALLOC(oa);
2426                 if (!oa) {
2427                         ptlrpc_req_finished(req);
2428                         RETURN(-ENOMEM);
2429                 }
2430                 *oa = ((struct ost_body *)val)->oa;
2431                 aa->aa_oa = oa;
2432                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2433         }
2434
2435         ptlrpc_request_set_replen(req);
2436         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2437                 LASSERT(set != NULL);
2438                 ptlrpc_set_add_req(set, req);
2439                 ptlrpc_check_set(NULL, set);
2440         } else
2441                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2442
2443         RETURN(0);
2444 }
2445
2446 static int osc_reconnect(const struct lu_env *env,
2447                          struct obd_export *exp, struct obd_device *obd,
2448                          struct obd_uuid *cluuid,
2449                          struct obd_connect_data *data,
2450                          void *localdata)
2451 {
2452         struct client_obd *cli = &obd->u.cli;
2453
2454         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2455                 long lost_grant;
2456
2457                 spin_lock(&cli->cl_loi_list_lock);
2458                 data->ocd_grant = (cli->cl_avail_grant +
2459                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2460                                   2 * cli_brw_size(obd);
2461                 lost_grant = cli->cl_lost_grant;
2462                 cli->cl_lost_grant = 0;
2463                 spin_unlock(&cli->cl_loi_list_lock);
2464
2465                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2466                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2467                        data->ocd_version, data->ocd_grant, lost_grant);
2468         }
2469
2470         RETURN(0);
2471 }
2472
2473 static int osc_disconnect(struct obd_export *exp)
2474 {
2475         struct obd_device *obd = class_exp2obd(exp);
2476         int rc;
2477
2478         rc = client_disconnect_export(exp);
2479         /**
2480          * Initially we put del_shrink_grant before disconnect_export, but it
2481          * causes the following problem if setup (connect) and cleanup
2482          * (disconnect) are tangled together.
2483          *      connect p1                     disconnect p2
2484          *   ptlrpc_connect_import
2485          *     ...............               class_manual_cleanup
2486          *                                     osc_disconnect
2487          *                                     del_shrink_grant
2488          *   ptlrpc_connect_interrupt
2489          *     init_grant_shrink
2490          *   add this client to shrink list
2491          *                                      cleanup_osc
2492          * Bang! pinger trigger the shrink.
2493          * So the osc should be disconnected from the shrink list, after we
2494          * are sure the import has been destroyed. BUG18662
2495          */
2496         if (obd->u.cli.cl_import == NULL)
2497                 osc_del_shrink_grant(&obd->u.cli);
2498         return rc;
2499 }
2500
2501 static int osc_import_event(struct obd_device *obd,
2502                             struct obd_import *imp,
2503                             enum obd_import_event event)
2504 {
2505         struct client_obd *cli;
2506         int rc = 0;
2507
2508         ENTRY;
2509         LASSERT(imp->imp_obd == obd);
2510
2511         switch (event) {
2512         case IMP_EVENT_DISCON: {
2513                 cli = &obd->u.cli;
2514                 spin_lock(&cli->cl_loi_list_lock);
2515                 cli->cl_avail_grant = 0;
2516                 cli->cl_lost_grant = 0;
2517                 spin_unlock(&cli->cl_loi_list_lock);
2518                 break;
2519         }
2520         case IMP_EVENT_INACTIVE: {
2521                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2522                 break;
2523         }
2524         case IMP_EVENT_INVALIDATE: {
2525                 struct ldlm_namespace *ns = obd->obd_namespace;
2526                 struct lu_env         *env;
2527                 int                    refcheck;
2528
2529                 env = cl_env_get(&refcheck);
2530                 if (!IS_ERR(env)) {
2531                         /* Reset grants */
2532                         cli = &obd->u.cli;
2533                         /* all pages go to failing rpcs due to the invalid
2534                          * import */
2535                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2536
2537                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2538                         cl_env_put(env, &refcheck);
2539                 } else
2540                         rc = PTR_ERR(env);
2541                 break;
2542         }
2543         case IMP_EVENT_ACTIVE: {
2544                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2545                 break;
2546         }
2547         case IMP_EVENT_OCD: {
2548                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2549
2550                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2551                         osc_init_grant(&obd->u.cli, ocd);
2552
2553                 /* See bug 7198 */
2554                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2555                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2556
2557                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2558                 break;
2559         }
2560         case IMP_EVENT_DEACTIVATE: {
2561                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2562                 break;
2563         }
2564         case IMP_EVENT_ACTIVATE: {
2565                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2566                 break;
2567         }
2568         default:
2569                 CERROR("Unknown import event %d\n", event);
2570                 LBUG();
2571         }
2572         RETURN(rc);
2573 }
2574
2575 /**
2576  * Determine whether the lock can be canceled before replaying the lock
2577  * during recovery, see bug16774 for detailed information.
2578  *
2579  * \retval zero the lock can't be canceled
2580  * \retval other ok to cancel
2581  */
2582 static int osc_cancel_weight(struct ldlm_lock *lock)
2583 {
2584         /*
2585          * Cancel all unused and granted extent lock.
2586          */
2587         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2588             lock->l_granted_mode == lock->l_req_mode &&
2589             osc_ldlm_weigh_ast(lock) == 0)
2590                 RETURN(1);
2591
2592         RETURN(0);
2593 }
2594
2595 static int brw_queue_work(const struct lu_env *env, void *data)
2596 {
2597         struct client_obd *cli = data;
2598
2599         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2600
2601         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2602         RETURN(0);
2603 }
2604
2605 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2606 {
2607         struct client_obd *cli = &obd->u.cli;
2608         struct obd_type   *type;
2609         void              *handler;
2610         int                rc;
2611         ENTRY;
2612
2613         rc = ptlrpcd_addref();
2614         if (rc)
2615                 RETURN(rc);
2616
2617         rc = client_obd_setup(obd, lcfg);
2618         if (rc)
2619                 GOTO(out_ptlrpcd, rc);
2620
2621         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2622         if (IS_ERR(handler))
2623                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2624         cli->cl_writeback_work = handler;
2625
2626         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2627         if (IS_ERR(handler))
2628                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2629         cli->cl_lru_work = handler;
2630
2631         rc = osc_quota_setup(obd);
2632         if (rc)
2633                 GOTO(out_ptlrpcd_work, rc);
2634
2635         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2636
2637 #ifdef CONFIG_PROC_FS
2638         obd->obd_vars = lprocfs_osc_obd_vars;
2639 #endif
2640         /* If this is true then both client (osc) and server (osp) are on the
2641          * same node. The osp layer if loaded first will register the osc proc
2642          * directory. In that case this obd_device will be attached its proc
2643          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2644         type = class_search_type(LUSTRE_OSP_NAME);
2645         if (type && type->typ_procsym) {
2646                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2647                                                        type->typ_procsym,
2648                                                        obd->obd_vars, obd);
2649                 if (IS_ERR(obd->obd_proc_entry)) {
2650                         rc = PTR_ERR(obd->obd_proc_entry);
2651                         CERROR("error %d setting up lprocfs for %s\n", rc,
2652                                obd->obd_name);
2653                         obd->obd_proc_entry = NULL;
2654                 }
2655         } else {
2656                 rc = lprocfs_obd_setup(obd);
2657         }
2658
2659         /* If the basic OSC proc tree construction succeeded then
2660          * lets do the rest. */
2661         if (rc == 0) {
2662                 lproc_osc_attach_seqstat(obd);
2663                 sptlrpc_lprocfs_cliobd_attach(obd);
2664                 ptlrpc_lprocfs_register_obd(obd);
2665         }
2666
2667         /* We need to allocate a few requests more, because
2668          * brw_interpret tries to create new requests before freeing
2669          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2670          * reserved, but I'm afraid that might be too much wasted RAM
2671          * in fact, so 2 is just my guess and still should work. */
2672         cli->cl_import->imp_rq_pool =
2673                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2674                                     OST_MAXREQSIZE,
2675                                     ptlrpc_add_rqs_to_pool);
2676
2677         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2678         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2679         RETURN(0);
2680
2681 out_ptlrpcd_work:
2682         if (cli->cl_writeback_work != NULL) {
2683                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2684                 cli->cl_writeback_work = NULL;
2685         }
2686         if (cli->cl_lru_work != NULL) {
2687                 ptlrpcd_destroy_work(cli->cl_lru_work);
2688                 cli->cl_lru_work = NULL;
2689         }
2690 out_client_setup:
2691         client_obd_cleanup(obd);
2692 out_ptlrpcd:
2693         ptlrpcd_decref();
2694         RETURN(rc);
2695 }
2696
2697 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2698 {
2699         int rc = 0;
2700         ENTRY;
2701
2702         switch (stage) {
2703         case OBD_CLEANUP_EARLY: {
2704                 struct obd_import *imp;
2705                 imp = obd->u.cli.cl_import;
2706                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2707                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2708                 ptlrpc_deactivate_import(imp);
2709                 spin_lock(&imp->imp_lock);
2710                 imp->imp_pingable = 0;
2711                 spin_unlock(&imp->imp_lock);
2712                 break;
2713         }
2714         case OBD_CLEANUP_EXPORTS: {
2715                 struct client_obd *cli = &obd->u.cli;
2716                 /* LU-464
2717                  * for echo client, export may be on zombie list, wait for
2718                  * zombie thread to cull it, because cli.cl_import will be
2719                  * cleared in client_disconnect_export():
2720                  *   class_export_destroy() -> obd_cleanup() ->
2721                  *   echo_device_free() -> echo_client_cleanup() ->
2722                  *   obd_disconnect() -> osc_disconnect() ->
2723                  *   client_disconnect_export()
2724                  */
2725                 obd_zombie_barrier();
2726                 if (cli->cl_writeback_work) {
2727                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2728                         cli->cl_writeback_work = NULL;
2729                 }
2730                 if (cli->cl_lru_work) {
2731                         ptlrpcd_destroy_work(cli->cl_lru_work);
2732                         cli->cl_lru_work = NULL;
2733                 }
2734                 obd_cleanup_client_import(obd);
2735                 ptlrpc_lprocfs_unregister_obd(obd);
2736                 lprocfs_obd_cleanup(obd);
2737                 break;
2738                 }
2739         }
2740         RETURN(rc);
2741 }
2742
2743 int osc_cleanup(struct obd_device *obd)
2744 {
2745         struct client_obd *cli = &obd->u.cli;
2746         int rc;
2747
2748         ENTRY;
2749
2750         /* lru cleanup */
2751         if (cli->cl_cache != NULL) {
2752                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2753                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2754                 list_del_init(&cli->cl_lru_osc);
2755                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2756                 cli->cl_lru_left = NULL;
2757                 cl_cache_decref(cli->cl_cache);
2758                 cli->cl_cache = NULL;
2759         }
2760
2761         /* free memory of osc quota cache */
2762         osc_quota_cleanup(obd);
2763
2764         rc = client_obd_cleanup(obd);
2765
2766         ptlrpcd_decref();
2767         RETURN(rc);
2768 }
2769
2770 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2771 {
2772         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2773         return rc > 0 ? 0: rc;
2774 }
2775
2776 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2777 {
2778         return osc_process_config_base(obd, buf);
2779 }
2780
2781 static struct obd_ops osc_obd_ops = {
2782         .o_owner                = THIS_MODULE,
2783         .o_setup                = osc_setup,
2784         .o_precleanup           = osc_precleanup,
2785         .o_cleanup              = osc_cleanup,
2786         .o_add_conn             = client_import_add_conn,
2787         .o_del_conn             = client_import_del_conn,
2788         .o_connect              = client_connect_import,
2789         .o_reconnect            = osc_reconnect,
2790         .o_disconnect           = osc_disconnect,
2791         .o_statfs               = osc_statfs,
2792         .o_statfs_async         = osc_statfs_async,
2793         .o_create               = osc_create,
2794         .o_destroy              = osc_destroy,
2795         .o_getattr              = osc_getattr,
2796         .o_setattr              = osc_setattr,
2797         .o_iocontrol            = osc_iocontrol,
2798         .o_set_info_async       = osc_set_info_async,
2799         .o_import_event         = osc_import_event,
2800         .o_process_config       = osc_process_config,
2801         .o_quotactl             = osc_quotactl,
2802 };
2803
2804 static int __init osc_init(void)
2805 {
2806         bool enable_proc = true;
2807         struct obd_type *type;
2808         int rc;
2809         ENTRY;
2810
2811         /* print an address of _any_ initialized kernel symbol from this
2812          * module, to allow debugging with gdb that doesn't support data
2813          * symbols from modules.*/
2814         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2815
2816         rc = lu_kmem_init(osc_caches);
2817         if (rc)
2818                 RETURN(rc);
2819
2820         type = class_search_type(LUSTRE_OSP_NAME);
2821         if (type != NULL && type->typ_procsym != NULL)
2822                 enable_proc = false;
2823
2824         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2825                                  LUSTRE_OSC_NAME, &osc_device_type);
2826         if (rc) {
2827                 lu_kmem_fini(osc_caches);
2828                 RETURN(rc);
2829         }
2830
2831         RETURN(rc);
2832 }
2833
2834 static void /*__exit*/ osc_exit(void)
2835 {
2836         class_unregister_type(LUSTRE_OSC_NAME);
2837         lu_kmem_fini(osc_caches);
2838 }
2839
2840 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2841 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2842 MODULE_VERSION(LUSTRE_VERSION_STRING);
2843 MODULE_LICENSE("GPL");
2844
2845 module_init(osc_init);
2846 module_exit(osc_exit);