Whamcloud - gitweb
731ead8d0016de1ee456c9e0145229632ef997a4
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <uapi/linux/lustre/lustre_param.h>
45 #include <obd.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
49
50 #include "osc_internal.h"
51
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
55
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
59
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
62
63 #define osc_grant_args osc_brw_async_args
64
65 struct osc_setattr_args {
66         struct obdo             *sa_oa;
67         obd_enqueue_update_f     sa_upcall;
68         void                    *sa_cookie;
69 };
70
71 struct osc_fsync_args {
72         struct osc_object       *fa_obj;
73         struct obdo             *fa_oa;
74         obd_enqueue_update_f    fa_upcall;
75         void                    *fa_cookie;
76 };
77
78 struct osc_ladvise_args {
79         struct obdo             *la_oa;
80         obd_enqueue_update_f     la_upcall;
81         void                    *la_cookie;
82 };
83
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
86                          void *data, int rc);
87
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 {
90         struct ost_body *body;
91
92         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
93         LASSERT(body);
94
95         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
96 }
97
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
99                        struct obdo *oa)
100 {
101         struct ptlrpc_request   *req;
102         struct ost_body         *body;
103         int                      rc;
104
105         ENTRY;
106         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
107         if (req == NULL)
108                 RETURN(-ENOMEM);
109
110         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111         if (rc) {
112                 ptlrpc_request_free(req);
113                 RETURN(rc);
114         }
115
116         osc_pack_req_body(req, oa);
117
118         ptlrpc_request_set_replen(req);
119
120         rc = ptlrpc_queue_wait(req);
121         if (rc)
122                 GOTO(out, rc);
123
124         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125         if (body == NULL)
126                 GOTO(out, rc = -EPROTO);
127
128         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130
131         oa->o_blksize = cli_brw_size(exp->exp_obd);
132         oa->o_valid |= OBD_MD_FLBLKSZ;
133
134         EXIT;
135 out:
136         ptlrpc_req_finished(req);
137
138         return rc;
139 }
140
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
142                        struct obdo *oa)
143 {
144         struct ptlrpc_request   *req;
145         struct ost_body         *body;
146         int                      rc;
147
148         ENTRY;
149         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150
151         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
152         if (req == NULL)
153                 RETURN(-ENOMEM);
154
155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156         if (rc) {
157                 ptlrpc_request_free(req);
158                 RETURN(rc);
159         }
160
161         osc_pack_req_body(req, oa);
162
163         ptlrpc_request_set_replen(req);
164
165         rc = ptlrpc_queue_wait(req);
166         if (rc)
167                 GOTO(out, rc);
168
169         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170         if (body == NULL)
171                 GOTO(out, rc = -EPROTO);
172
173         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
174
175         EXIT;
176 out:
177         ptlrpc_req_finished(req);
178
179         RETURN(rc);
180 }
181
182 static int osc_setattr_interpret(const struct lu_env *env,
183                                  struct ptlrpc_request *req,
184                                  struct osc_setattr_args *sa, int rc)
185 {
186         struct ost_body *body;
187         ENTRY;
188
189         if (rc != 0)
190                 GOTO(out, rc);
191
192         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193         if (body == NULL)
194                 GOTO(out, rc = -EPROTO);
195
196         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
197                              &body->oa);
198 out:
199         rc = sa->sa_upcall(sa->sa_cookie, rc);
200         RETURN(rc);
201 }
202
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204                       obd_enqueue_update_f upcall, void *cookie,
205                       struct ptlrpc_request_set *rqset)
206 {
207         struct ptlrpc_request   *req;
208         struct osc_setattr_args *sa;
209         int                      rc;
210
211         ENTRY;
212
213         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
214         if (req == NULL)
215                 RETURN(-ENOMEM);
216
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oa);
224
225         ptlrpc_request_set_replen(req);
226
227         /* do mds to ost setattr asynchronously */
228         if (!rqset) {
229                 /* Do not wait for response. */
230                 ptlrpcd_add_req(req);
231         } else {
232                 req->rq_interpret_reply =
233                         (ptlrpc_interpterer_t)osc_setattr_interpret;
234
235                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
236                 sa = ptlrpc_req_async_args(req);
237                 sa->sa_oa = oa;
238                 sa->sa_upcall = upcall;
239                 sa->sa_cookie = cookie;
240
241                 if (rqset == PTLRPCD_SET)
242                         ptlrpcd_add_req(req);
243                 else
244                         ptlrpc_set_add_req(rqset, req);
245         }
246
247         RETURN(0);
248 }
249
250 static int osc_ladvise_interpret(const struct lu_env *env,
251                                  struct ptlrpc_request *req,
252                                  void *arg, int rc)
253 {
254         struct osc_ladvise_args *la = arg;
255         struct ost_body *body;
256         ENTRY;
257
258         if (rc != 0)
259                 GOTO(out, rc);
260
261         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262         if (body == NULL)
263                 GOTO(out, rc = -EPROTO);
264
265         *la->la_oa = body->oa;
266 out:
267         rc = la->la_upcall(la->la_cookie, rc);
268         RETURN(rc);
269 }
270
271 /**
272  * If rqset is NULL, do not wait for response. Upcall and cookie could also
273  * be NULL in this case
274  */
275 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
276                      struct ladvise_hdr *ladvise_hdr,
277                      obd_enqueue_update_f upcall, void *cookie,
278                      struct ptlrpc_request_set *rqset)
279 {
280         struct ptlrpc_request   *req;
281         struct ost_body         *body;
282         struct osc_ladvise_args *la;
283         int                      rc;
284         struct lu_ladvise       *req_ladvise;
285         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
286         int                      num_advise = ladvise_hdr->lah_count;
287         struct ladvise_hdr      *req_ladvise_hdr;
288         ENTRY;
289
290         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
291         if (req == NULL)
292                 RETURN(-ENOMEM);
293
294         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
295                              num_advise * sizeof(*ladvise));
296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297         if (rc != 0) {
298                 ptlrpc_request_free(req);
299                 RETURN(rc);
300         }
301         req->rq_request_portal = OST_IO_PORTAL;
302         ptlrpc_at_set_req_timeout(req);
303
304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305         LASSERT(body);
306         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
307                              oa);
308
309         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
310                                                  &RMF_OST_LADVISE_HDR);
311         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312
313         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
314         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
315         ptlrpc_request_set_replen(req);
316
317         if (rqset == NULL) {
318                 /* Do not wait for response. */
319                 ptlrpcd_add_req(req);
320                 RETURN(0);
321         }
322
323         req->rq_interpret_reply = osc_ladvise_interpret;
324         CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
325         la = ptlrpc_req_async_args(req);
326         la->la_oa = oa;
327         la->la_upcall = upcall;
328         la->la_cookie = cookie;
329
330         if (rqset == PTLRPCD_SET)
331                 ptlrpcd_add_req(req);
332         else
333                 ptlrpc_set_add_req(rqset, req);
334
335         RETURN(0);
336 }
337
338 static int osc_create(const struct lu_env *env, struct obd_export *exp,
339                       struct obdo *oa)
340 {
341         struct ptlrpc_request *req;
342         struct ost_body       *body;
343         int                    rc;
344         ENTRY;
345
346         LASSERT(oa != NULL);
347         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
348         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349
350         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351         if (req == NULL)
352                 GOTO(out, rc = -ENOMEM);
353
354         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355         if (rc) {
356                 ptlrpc_request_free(req);
357                 GOTO(out, rc);
358         }
359
360         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
361         LASSERT(body);
362
363         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364
365         ptlrpc_request_set_replen(req);
366
367         rc = ptlrpc_queue_wait(req);
368         if (rc)
369                 GOTO(out_req, rc);
370
371         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372         if (body == NULL)
373                 GOTO(out_req, rc = -EPROTO);
374
375         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
376         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377
378         oa->o_blksize = cli_brw_size(exp->exp_obd);
379         oa->o_valid |= OBD_MD_FLBLKSZ;
380
381         CDEBUG(D_HA, "transno: %lld\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
390                    obd_enqueue_update_f upcall, void *cookie)
391 {
392         struct ptlrpc_request *req;
393         struct osc_setattr_args *sa;
394         struct obd_import *imp = class_exp2cliimp(exp);
395         struct ost_body *body;
396         int rc;
397
398         ENTRY;
399
400         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
401         if (req == NULL)
402                 RETURN(-ENOMEM);
403
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc < 0) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409
410         osc_set_io_portal(req);
411
412         ptlrpc_at_set_req_timeout(req);
413
414         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415
416         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa = oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426
427         ptlrpcd_add_req(req);
428
429         RETURN(0);
430 }
431 EXPORT_SYMBOL(osc_punch_send);
432
433 static int osc_sync_interpret(const struct lu_env *env,
434                               struct ptlrpc_request *req,
435                               void *arg, int rc)
436 {
437         struct osc_fsync_args   *fa = arg;
438         struct ost_body         *body;
439         struct cl_attr          *attr = &osc_env_info(env)->oti_attr;
440         unsigned long           valid = 0;
441         struct cl_object        *obj;
442         ENTRY;
443
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
448         if (body == NULL) {
449                 CERROR("can't unpack ost_body\n");
450                 GOTO(out, rc = -EPROTO);
451         }
452
453         *fa->fa_oa = body->oa;
454         obj = osc2cl(fa->fa_obj);
455
456         /* Update osc object's blocks attribute */
457         cl_object_attr_lock(obj);
458         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
459                 attr->cat_blocks = body->oa.o_blocks;
460                 valid |= CAT_BLOCKS;
461         }
462
463         if (valid != 0)
464                 cl_object_attr_update(env, obj, attr, valid);
465         cl_object_attr_unlock(obj);
466
467 out:
468         rc = fa->fa_upcall(fa->fa_cookie, rc);
469         RETURN(rc);
470 }
471
472 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
473                   obd_enqueue_update_f upcall, void *cookie,
474                   struct ptlrpc_request_set *rqset)
475 {
476         struct obd_export     *exp = osc_export(obj);
477         struct ptlrpc_request *req;
478         struct ost_body       *body;
479         struct osc_fsync_args *fa;
480         int                    rc;
481         ENTRY;
482
483         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
484         if (req == NULL)
485                 RETURN(-ENOMEM);
486
487         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
488         if (rc) {
489                 ptlrpc_request_free(req);
490                 RETURN(rc);
491         }
492
493         /* overload the size and blocks fields in the oa with start/end */
494         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
495         LASSERT(body);
496         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
497
498         ptlrpc_request_set_replen(req);
499         req->rq_interpret_reply = osc_sync_interpret;
500
501         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
502         fa = ptlrpc_req_async_args(req);
503         fa->fa_obj = obj;
504         fa->fa_oa = oa;
505         fa->fa_upcall = upcall;
506         fa->fa_cookie = cookie;
507
508         if (rqset == PTLRPCD_SET)
509                 ptlrpcd_add_req(req);
510         else
511                 ptlrpc_set_add_req(rqset, req);
512
513         RETURN (0);
514 }
515
516 /* Find and cancel locally locks matched by @mode in the resource found by
517  * @objid. Found locks are added into @cancel list. Returns the amount of
518  * locks added to @cancels list. */
519 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
520                                    struct list_head *cancels,
521                                    enum ldlm_mode mode, __u64 lock_flags)
522 {
523         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
524         struct ldlm_res_id res_id;
525         struct ldlm_resource *res;
526         int count;
527         ENTRY;
528
529         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
530          * export) but disabled through procfs (flag in NS).
531          *
532          * This distinguishes from a case when ELC is not supported originally,
533          * when we still want to cancel locks in advance and just cancel them
534          * locally, without sending any RPC. */
535         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536                 RETURN(0);
537
538         ostid_build_res_name(&oa->o_oi, &res_id);
539         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
540         if (IS_ERR(res))
541                 RETURN(0);
542
543         LDLM_RESOURCE_ADDREF(res);
544         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
545                                            lock_flags, 0, NULL);
546         LDLM_RESOURCE_DELREF(res);
547         ldlm_resource_putref(res);
548         RETURN(count);
549 }
550
551 static int osc_destroy_interpret(const struct lu_env *env,
552                                  struct ptlrpc_request *req, void *data,
553                                  int rc)
554 {
555         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
556
557         atomic_dec(&cli->cl_destroy_in_flight);
558         wake_up(&cli->cl_destroy_waitq);
559         return 0;
560 }
561
562 static int osc_can_send_destroy(struct client_obd *cli)
563 {
564         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
565             cli->cl_max_rpcs_in_flight) {
566                 /* The destroy request can be sent */
567                 return 1;
568         }
569         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
570             cli->cl_max_rpcs_in_flight) {
571                 /*
572                  * The counter has been modified between the two atomic
573                  * operations.
574                  */
575                 wake_up(&cli->cl_destroy_waitq);
576         }
577         return 0;
578 }
579
580 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581                        struct obdo *oa)
582 {
583         struct client_obd     *cli = &exp->exp_obd->u.cli;
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         struct list_head       cancels = LIST_HEAD_INIT(cancels);
587         int rc, count;
588         ENTRY;
589
590         if (!oa) {
591                 CDEBUG(D_INFO, "oa NULL\n");
592                 RETURN(-EINVAL);
593         }
594
595         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
596                                         LDLM_FL_DISCARD_DATA);
597
598         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
599         if (req == NULL) {
600                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
601                 RETURN(-ENOMEM);
602         }
603
604         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605                                0, &cancels, count);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
612         ptlrpc_at_set_req_timeout(req);
613
614         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615         LASSERT(body);
616         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
617
618         ptlrpc_request_set_replen(req);
619
620         req->rq_interpret_reply = osc_destroy_interpret;
621         if (!osc_can_send_destroy(cli)) {
622                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623
624                 /*
625                  * Wait until the number of on-going destroy RPCs drops
626                  * under max_rpc_in_flight
627                  */
628                 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
629                                             osc_can_send_destroy(cli), &lwi);
630                 if (rc) {
631                         ptlrpc_req_finished(req);
632                         RETURN(rc);
633                 }
634         }
635
636         /* Do not wait for response */
637         ptlrpcd_add_req(req);
638         RETURN(0);
639 }
640
641 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642                                 long writing_bytes)
643 {
644         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
645
646         LASSERT(!(oa->o_valid & bits));
647
648         oa->o_valid |= bits;
649         spin_lock(&cli->cl_loi_list_lock);
650         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
651                 oa->o_dirty = cli->cl_dirty_grant;
652         else
653                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
654         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
655                 CERROR("dirty %lu > dirty_max %lu\n",
656                        cli->cl_dirty_pages,
657                        cli->cl_dirty_max_pages);
658                 oa->o_undirty = 0;
659         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
660                             (long)(obd_max_dirty_pages + 1))) {
661                 /* The atomic_read() allowing the atomic_inc() are
662                  * not covered by a lock thus they may safely race and trip
663                  * this CERROR() unless we add in a small fudge factor (+1). */
664                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
665                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
666                        obd_max_dirty_pages);
667                 oa->o_undirty = 0;
668         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
669                             0x7fffffff)) {
670                 CERROR("dirty %lu - dirty_max %lu too big???\n",
671                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
672                 oa->o_undirty = 0;
673         } else {
674                 unsigned long nrpages;
675                 unsigned long undirty;
676
677                 nrpages = cli->cl_max_pages_per_rpc;
678                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
679                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
680                 undirty = nrpages << PAGE_SHIFT;
681                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
682                                  GRANT_PARAM)) {
683                         int nrextents;
684
685                         /* take extent tax into account when asking for more
686                          * grant space */
687                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
688                                      cli->cl_max_extent_pages;
689                         undirty += nrextents * cli->cl_grant_extent_tax;
690                 }
691                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
692                  * to add extent tax, etc.
693                  */
694                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
695                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
696         }
697         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
698         oa->o_dropped = cli->cl_lost_grant;
699         cli->cl_lost_grant = 0;
700         spin_unlock(&cli->cl_loi_list_lock);
701         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
702                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
703 }
704
705 void osc_update_next_shrink(struct client_obd *cli)
706 {
707         cli->cl_next_shrink_grant = ktime_get_seconds() +
708                                     cli->cl_grant_shrink_interval;
709
710         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
711                cli->cl_next_shrink_grant);
712 }
713
714 static void __osc_update_grant(struct client_obd *cli, u64 grant)
715 {
716         spin_lock(&cli->cl_loi_list_lock);
717         cli->cl_avail_grant += grant;
718         spin_unlock(&cli->cl_loi_list_lock);
719 }
720
721 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
722 {
723         if (body->oa.o_valid & OBD_MD_FLGRANT) {
724                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
725                 __osc_update_grant(cli, body->oa.o_grant);
726         }
727 }
728
729 /**
730  * grant thread data for shrinking space.
731  */
732 struct grant_thread_data {
733         struct list_head        gtd_clients;
734         struct mutex            gtd_mutex;
735         unsigned long           gtd_stopped:1;
736 };
737 static struct grant_thread_data client_gtd;
738
739 static int osc_shrink_grant_interpret(const struct lu_env *env,
740                                       struct ptlrpc_request *req,
741                                       void *aa, int rc)
742 {
743         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
744         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
745         struct ost_body *body;
746
747         if (rc != 0) {
748                 __osc_update_grant(cli, oa->o_grant);
749                 GOTO(out, rc);
750         }
751
752         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
753         LASSERT(body);
754         osc_update_grant(cli, body);
755 out:
756         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
757         oa = NULL;
758         return rc;
759 }
760
761 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
762 {
763         spin_lock(&cli->cl_loi_list_lock);
764         oa->o_grant = cli->cl_avail_grant / 4;
765         cli->cl_avail_grant -= oa->o_grant;
766         spin_unlock(&cli->cl_loi_list_lock);
767         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
768                 oa->o_valid |= OBD_MD_FLFLAGS;
769                 oa->o_flags = 0;
770         }
771         oa->o_flags |= OBD_FL_SHRINK_GRANT;
772         osc_update_next_shrink(cli);
773 }
774
775 /* Shrink the current grant, either from some large amount to enough for a
776  * full set of in-flight RPCs, or if we have already shrunk to that limit
777  * then to enough for a single RPC.  This avoids keeping more grant than
778  * needed, and avoids shrinking the grant piecemeal. */
779 static int osc_shrink_grant(struct client_obd *cli)
780 {
781         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
782                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
783
784         spin_lock(&cli->cl_loi_list_lock);
785         if (cli->cl_avail_grant <= target_bytes)
786                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
787         spin_unlock(&cli->cl_loi_list_lock);
788
789         return osc_shrink_grant_to_target(cli, target_bytes);
790 }
791
792 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
793 {
794         int                     rc = 0;
795         struct ost_body        *body;
796         ENTRY;
797
798         spin_lock(&cli->cl_loi_list_lock);
799         /* Don't shrink if we are already above or below the desired limit
800          * We don't want to shrink below a single RPC, as that will negatively
801          * impact block allocation and long-term performance. */
802         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
803                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
804
805         if (target_bytes >= cli->cl_avail_grant) {
806                 spin_unlock(&cli->cl_loi_list_lock);
807                 RETURN(0);
808         }
809         spin_unlock(&cli->cl_loi_list_lock);
810
811         OBD_ALLOC_PTR(body);
812         if (!body)
813                 RETURN(-ENOMEM);
814
815         osc_announce_cached(cli, &body->oa, 0);
816
817         spin_lock(&cli->cl_loi_list_lock);
818         if (target_bytes >= cli->cl_avail_grant) {
819                 /* available grant has changed since target calculation */
820                 spin_unlock(&cli->cl_loi_list_lock);
821                 GOTO(out_free, rc = 0);
822         }
823         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
824         cli->cl_avail_grant = target_bytes;
825         spin_unlock(&cli->cl_loi_list_lock);
826         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
827                 body->oa.o_valid |= OBD_MD_FLFLAGS;
828                 body->oa.o_flags = 0;
829         }
830         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
831         osc_update_next_shrink(cli);
832
833         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
834                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
835                                 sizeof(*body), body, NULL);
836         if (rc != 0)
837                 __osc_update_grant(cli, body->oa.o_grant);
838 out_free:
839         OBD_FREE_PTR(body);
840         RETURN(rc);
841 }
842
843 static int osc_should_shrink_grant(struct client_obd *client)
844 {
845         time64_t next_shrink = client->cl_next_shrink_grant;
846
847         if (client->cl_import == NULL)
848                 return 0;
849
850         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
851             client->cl_import->imp_grant_shrink_disabled) {
852                 osc_update_next_shrink(client);
853                 return 0;
854         }
855
856         if (ktime_get_seconds() >= next_shrink - 5) {
857                 /* Get the current RPC size directly, instead of going via:
858                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859                  * Keep comment here so that it can be found by searching. */
860                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861
862                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863                     client->cl_avail_grant > brw_size)
864                         return 1;
865                 else
866                         osc_update_next_shrink(client);
867         }
868         return 0;
869 }
870
871 #define GRANT_SHRINK_RPC_BATCH  100
872
873 static struct delayed_work work;
874
875 static void osc_grant_work_handler(struct work_struct *data)
876 {
877         struct client_obd *cli;
878         int rpc_sent;
879         bool init_next_shrink = true;
880         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
881
882         rpc_sent = 0;
883         mutex_lock(&client_gtd.gtd_mutex);
884         list_for_each_entry(cli, &client_gtd.gtd_clients,
885                             cl_grant_chain) {
886                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
887                     osc_should_shrink_grant(cli)) {
888                         osc_shrink_grant(cli);
889                         rpc_sent++;
890                 }
891
892                 if (!init_next_shrink) {
893                         if (cli->cl_next_shrink_grant < next_shrink &&
894                             cli->cl_next_shrink_grant > ktime_get_seconds())
895                                 next_shrink = cli->cl_next_shrink_grant;
896                 } else {
897                         init_next_shrink = false;
898                         next_shrink = cli->cl_next_shrink_grant;
899                 }
900         }
901         mutex_unlock(&client_gtd.gtd_mutex);
902
903         if (client_gtd.gtd_stopped == 1)
904                 return;
905
906         if (next_shrink > ktime_get_seconds())
907                 schedule_delayed_work(&work, msecs_to_jiffies(
908                                         (next_shrink - ktime_get_seconds()) *
909                                         MSEC_PER_SEC));
910         else
911                 schedule_work(&work.work);
912 }
913
914 /**
915  * Start grant thread for returing grant to server for idle clients.
916  */
917 static int osc_start_grant_work(void)
918 {
919         client_gtd.gtd_stopped = 0;
920         mutex_init(&client_gtd.gtd_mutex);
921         INIT_LIST_HEAD(&client_gtd.gtd_clients);
922
923         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
924         schedule_work(&work.work);
925
926         return 0;
927 }
928
929 static void osc_stop_grant_work(void)
930 {
931         client_gtd.gtd_stopped = 1;
932         cancel_delayed_work_sync(&work);
933 }
934
935 static void osc_add_grant_list(struct client_obd *client)
936 {
937         mutex_lock(&client_gtd.gtd_mutex);
938         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
939         mutex_unlock(&client_gtd.gtd_mutex);
940 }
941
942 static void osc_del_grant_list(struct client_obd *client)
943 {
944         if (list_empty(&client->cl_grant_chain))
945                 return;
946
947         mutex_lock(&client_gtd.gtd_mutex);
948         list_del_init(&client->cl_grant_chain);
949         mutex_unlock(&client_gtd.gtd_mutex);
950 }
951
952 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
953 {
954         /*
955          * ocd_grant is the total grant amount we're expect to hold: if we've
956          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
957          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
958          * dirty.
959          *
960          * race is tolerable here: if we're evicted, but imp_state already
961          * left EVICTED state, then cl_dirty_pages must be 0 already.
962          */
963         spin_lock(&cli->cl_loi_list_lock);
964         cli->cl_avail_grant = ocd->ocd_grant;
965         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
966                 unsigned long consumed = cli->cl_reserved_grant;
967
968                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
969                         consumed += cli->cl_dirty_grant;
970                 else
971                         consumed += cli->cl_dirty_pages << PAGE_SHIFT;
972                 if (cli->cl_avail_grant < consumed) {
973                         CERROR("%s: granted %ld but already consumed %ld\n",
974                                cli_name(cli), cli->cl_avail_grant, consumed);
975                         cli->cl_avail_grant = 0;
976                 } else {
977                         cli->cl_avail_grant -= consumed;
978                 }
979         }
980
981         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
982                 u64 size;
983                 int chunk_mask;
984
985                 /* overhead for each extent insertion */
986                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
987                 /* determine the appropriate chunk size used by osc_extent. */
988                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
989                                           ocd->ocd_grant_blkbits);
990                 /* max_pages_per_rpc must be chunk aligned */
991                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
992                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
993                                              ~chunk_mask) & chunk_mask;
994                 /* determine maximum extent size, in #pages */
995                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
996                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
997                 if (cli->cl_max_extent_pages == 0)
998                         cli->cl_max_extent_pages = 1;
999         } else {
1000                 cli->cl_grant_extent_tax = 0;
1001                 cli->cl_chunkbits = PAGE_SHIFT;
1002                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1003         }
1004         spin_unlock(&cli->cl_loi_list_lock);
1005
1006         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1007                 "chunk bits: %d cl_max_extent_pages: %d\n",
1008                 cli_name(cli),
1009                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1010                 cli->cl_max_extent_pages);
1011
1012         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1013                 osc_add_grant_list(cli);
1014 }
1015 EXPORT_SYMBOL(osc_init_grant);
1016
1017 /* We assume that the reason this OSC got a short read is because it read
1018  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1019  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1020  * this stripe never got written at or beyond this stripe offset yet. */
1021 static void handle_short_read(int nob_read, size_t page_count,
1022                               struct brw_page **pga)
1023 {
1024         char *ptr;
1025         int i = 0;
1026
1027         /* skip bytes read OK */
1028         while (nob_read > 0) {
1029                 LASSERT (page_count > 0);
1030
1031                 if (pga[i]->count > nob_read) {
1032                         /* EOF inside this page */
1033                         ptr = kmap(pga[i]->pg) +
1034                                 (pga[i]->off & ~PAGE_MASK);
1035                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1036                         kunmap(pga[i]->pg);
1037                         page_count--;
1038                         i++;
1039                         break;
1040                 }
1041
1042                 nob_read -= pga[i]->count;
1043                 page_count--;
1044                 i++;
1045         }
1046
1047         /* zero remaining pages */
1048         while (page_count-- > 0) {
1049                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1050                 memset(ptr, 0, pga[i]->count);
1051                 kunmap(pga[i]->pg);
1052                 i++;
1053         }
1054 }
1055
1056 static int check_write_rcs(struct ptlrpc_request *req,
1057                            int requested_nob, int niocount,
1058                            size_t page_count, struct brw_page **pga)
1059 {
1060         int     i;
1061         __u32   *remote_rcs;
1062
1063         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1064                                                   sizeof(*remote_rcs) *
1065                                                   niocount);
1066         if (remote_rcs == NULL) {
1067                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1068                 return(-EPROTO);
1069         }
1070
1071         /* return error if any niobuf was in error */
1072         for (i = 0; i < niocount; i++) {
1073                 if ((int)remote_rcs[i] < 0)
1074                         return(remote_rcs[i]);
1075
1076                 if (remote_rcs[i] != 0) {
1077                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1078                                 i, remote_rcs[i], req);
1079                         return(-EPROTO);
1080                 }
1081         }
1082         if (req->rq_bulk != NULL &&
1083             req->rq_bulk->bd_nob_transferred != requested_nob) {
1084                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1085                        req->rq_bulk->bd_nob_transferred, requested_nob);
1086                 return(-EPROTO);
1087         }
1088
1089         return (0);
1090 }
1091
1092 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1093 {
1094         if (p1->flag != p2->flag) {
1095                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_SYNC |
1096                                   OBD_BRW_ASYNC | OBD_BRW_NOQUOTA |
1097                                   OBD_BRW_SOFT_SYNC);
1098
1099                 /* warn if we try to combine flags that we don't know to be
1100                  * safe to combine */
1101                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1102                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1103                               "report this at https://jira.whamcloud.com/\n",
1104                               p1->flag, p2->flag);
1105                 }
1106                 return 0;
1107         }
1108
1109         return (p1->off + p1->count == p2->off);
1110 }
1111
1112 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1113 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1114                                    size_t pg_count, struct brw_page **pga,
1115                                    int opc, obd_dif_csum_fn *fn,
1116                                    int sector_size,
1117                                    u32 *check_sum)
1118 {
1119         struct ahash_request *req;
1120         /* Used Adler as the default checksum type on top of DIF tags */
1121         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1122         struct page *__page;
1123         unsigned char *buffer;
1124         __u16 *guard_start;
1125         unsigned int bufsize;
1126         int guard_number;
1127         int used_number = 0;
1128         int used;
1129         u32 cksum;
1130         int rc = 0;
1131         int i = 0;
1132
1133         LASSERT(pg_count > 0);
1134
1135         __page = alloc_page(GFP_KERNEL);
1136         if (__page == NULL)
1137                 return -ENOMEM;
1138
1139         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1140         if (IS_ERR(req)) {
1141                 rc = PTR_ERR(req);
1142                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1143                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1144                 GOTO(out, rc);
1145         }
1146
1147         buffer = kmap(__page);
1148         guard_start = (__u16 *)buffer;
1149         guard_number = PAGE_SIZE / sizeof(*guard_start);
1150         while (nob > 0 && pg_count > 0) {
1151                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1152
1153                 /* corrupt the data before we compute the checksum, to
1154                  * simulate an OST->client data error */
1155                 if (unlikely(i == 0 && opc == OST_READ &&
1156                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1157                         unsigned char *ptr = kmap(pga[i]->pg);
1158                         int off = pga[i]->off & ~PAGE_MASK;
1159
1160                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1161                         kunmap(pga[i]->pg);
1162                 }
1163
1164                 /*
1165                  * The left guard number should be able to hold checksums of a
1166                  * whole page
1167                  */
1168                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1169                                                   pga[i]->off & ~PAGE_MASK,
1170                                                   count,
1171                                                   guard_start + used_number,
1172                                                   guard_number - used_number,
1173                                                   &used, sector_size,
1174                                                   fn);
1175                 if (rc)
1176                         break;
1177
1178                 used_number += used;
1179                 if (used_number == guard_number) {
1180                         cfs_crypto_hash_update_page(req, __page, 0,
1181                                 used_number * sizeof(*guard_start));
1182                         used_number = 0;
1183                 }
1184
1185                 nob -= pga[i]->count;
1186                 pg_count--;
1187                 i++;
1188         }
1189         kunmap(__page);
1190         if (rc)
1191                 GOTO(out, rc);
1192
1193         if (used_number != 0)
1194                 cfs_crypto_hash_update_page(req, __page, 0,
1195                         used_number * sizeof(*guard_start));
1196
1197         bufsize = sizeof(cksum);
1198         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1199
1200         /* For sending we only compute the wrong checksum instead
1201          * of corrupting the data so it is still correct on a redo */
1202         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1203                 cksum++;
1204
1205         *check_sum = cksum;
1206 out:
1207         __free_page(__page);
1208         return rc;
1209 }
1210 #else /* !CONFIG_CRC_T10DIF */
1211 #define obd_dif_ip_fn NULL
1212 #define obd_dif_crc_fn NULL
1213 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1214         -EOPNOTSUPP
1215 #endif /* CONFIG_CRC_T10DIF */
1216
1217 static int osc_checksum_bulk(int nob, size_t pg_count,
1218                              struct brw_page **pga, int opc,
1219                              enum cksum_types cksum_type,
1220                              u32 *cksum)
1221 {
1222         int                             i = 0;
1223         struct ahash_request           *req;
1224         unsigned int                    bufsize;
1225         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1226
1227         LASSERT(pg_count > 0);
1228
1229         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1230         if (IS_ERR(req)) {
1231                 CERROR("Unable to initialize checksum hash %s\n",
1232                        cfs_crypto_hash_name(cfs_alg));
1233                 return PTR_ERR(req);
1234         }
1235
1236         while (nob > 0 && pg_count > 0) {
1237                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1238
1239                 /* corrupt the data before we compute the checksum, to
1240                  * simulate an OST->client data error */
1241                 if (i == 0 && opc == OST_READ &&
1242                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1243                         unsigned char *ptr = kmap(pga[i]->pg);
1244                         int off = pga[i]->off & ~PAGE_MASK;
1245
1246                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1247                         kunmap(pga[i]->pg);
1248                 }
1249                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1250                                             pga[i]->off & ~PAGE_MASK,
1251                                             count);
1252                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1253                                (int)(pga[i]->off & ~PAGE_MASK));
1254
1255                 nob -= pga[i]->count;
1256                 pg_count--;
1257                 i++;
1258         }
1259
1260         bufsize = sizeof(*cksum);
1261         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1262
1263         /* For sending we only compute the wrong checksum instead
1264          * of corrupting the data so it is still correct on a redo */
1265         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1266                 (*cksum)++;
1267
1268         return 0;
1269 }
1270
1271 static int osc_checksum_bulk_rw(const char *obd_name,
1272                                 enum cksum_types cksum_type,
1273                                 int nob, size_t pg_count,
1274                                 struct brw_page **pga, int opc,
1275                                 u32 *check_sum)
1276 {
1277         obd_dif_csum_fn *fn = NULL;
1278         int sector_size = 0;
1279         int rc;
1280
1281         ENTRY;
1282         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1283
1284         if (fn)
1285                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1286                                              opc, fn, sector_size, check_sum);
1287         else
1288                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1289                                        check_sum);
1290
1291         RETURN(rc);
1292 }
1293
1294 static int
1295 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1296                      u32 page_count, struct brw_page **pga,
1297                      struct ptlrpc_request **reqp, int resend)
1298 {
1299         struct ptlrpc_request   *req;
1300         struct ptlrpc_bulk_desc *desc;
1301         struct ost_body         *body;
1302         struct obd_ioobj        *ioobj;
1303         struct niobuf_remote    *niobuf;
1304         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1305         struct osc_brw_async_args *aa;
1306         struct req_capsule      *pill;
1307         struct brw_page *pg_prev;
1308         void *short_io_buf;
1309         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1310
1311         ENTRY;
1312         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1313                 RETURN(-ENOMEM); /* Recoverable */
1314         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1315                 RETURN(-EINVAL); /* Fatal */
1316
1317         if ((cmd & OBD_BRW_WRITE) != 0) {
1318                 opc = OST_WRITE;
1319                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1320                                                 osc_rq_pool,
1321                                                 &RQF_OST_BRW_WRITE);
1322         } else {
1323                 opc = OST_READ;
1324                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1325         }
1326         if (req == NULL)
1327                 RETURN(-ENOMEM);
1328
1329         for (niocount = i = 1; i < page_count; i++) {
1330                 if (!can_merge_pages(pga[i - 1], pga[i]))
1331                         niocount++;
1332         }
1333
1334         pill = &req->rq_pill;
1335         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1336                              sizeof(*ioobj));
1337         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1338                              niocount * sizeof(*niobuf));
1339
1340         for (i = 0; i < page_count; i++)
1341                 short_io_size += pga[i]->count;
1342
1343         /* Check if read/write is small enough to be a short io. */
1344         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1345             !imp_connect_shortio(cli->cl_import))
1346                 short_io_size = 0;
1347
1348         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1349                              opc == OST_READ ? 0 : short_io_size);
1350         if (opc == OST_READ)
1351                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1352                                      short_io_size);
1353
1354         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1355         if (rc) {
1356                 ptlrpc_request_free(req);
1357                 RETURN(rc);
1358         }
1359         osc_set_io_portal(req);
1360
1361         ptlrpc_at_set_req_timeout(req);
1362         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1363          * retry logic */
1364         req->rq_no_retry_einprogress = 1;
1365
1366         if (short_io_size != 0) {
1367                 desc = NULL;
1368                 short_io_buf = NULL;
1369                 goto no_bulk;
1370         }
1371
1372         desc = ptlrpc_prep_bulk_imp(req, page_count,
1373                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1374                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1375                         PTLRPC_BULK_PUT_SINK) |
1376                         PTLRPC_BULK_BUF_KIOV,
1377                 OST_BULK_PORTAL,
1378                 &ptlrpc_bulk_kiov_pin_ops);
1379
1380         if (desc == NULL)
1381                 GOTO(out, rc = -ENOMEM);
1382         /* NB request now owns desc and will free it when it gets freed */
1383 no_bulk:
1384         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1385         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1386         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1387         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1388
1389         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1390
1391         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1392          * and from_kgid(), because they are asynchronous. Fortunately, variable
1393          * oa contains valid o_uid and o_gid in these two operations.
1394          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1395          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1396          * other process logic */
1397         body->oa.o_uid = oa->o_uid;
1398         body->oa.o_gid = oa->o_gid;
1399
1400         obdo_to_ioobj(oa, ioobj);
1401         ioobj->ioo_bufcnt = niocount;
1402         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1403          * that might be send for this request.  The actual number is decided
1404          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1405          * "max - 1" for old client compatibility sending "0", and also so the
1406          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1407         if (desc != NULL)
1408                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1409         else /* short io */
1410                 ioobj_max_brw_set(ioobj, 0);
1411
1412         if (short_io_size != 0) {
1413                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1414                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1415                         body->oa.o_flags = 0;
1416                 }
1417                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1418                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1419                        short_io_size);
1420                 if (opc == OST_WRITE) {
1421                         short_io_buf = req_capsule_client_get(pill,
1422                                                               &RMF_SHORT_IO);
1423                         LASSERT(short_io_buf != NULL);
1424                 }
1425         }
1426
1427         LASSERT(page_count > 0);
1428         pg_prev = pga[0];
1429         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1430                 struct brw_page *pg = pga[i];
1431                 int poff = pg->off & ~PAGE_MASK;
1432
1433                 LASSERT(pg->count > 0);
1434                 /* make sure there is no gap in the middle of page array */
1435                 LASSERTF(page_count == 1 ||
1436                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1437                           ergo(i > 0 && i < page_count - 1,
1438                                poff == 0 && pg->count == PAGE_SIZE)   &&
1439                           ergo(i == page_count - 1, poff == 0)),
1440                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1441                          i, page_count, pg, pg->off, pg->count);
1442                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1443                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1444                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1445                          i, page_count,
1446                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1447                          pg_prev->pg, page_private(pg_prev->pg),
1448                          pg_prev->pg->index, pg_prev->off);
1449                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1450                         (pg->flag & OBD_BRW_SRVLOCK));
1451                 if (short_io_size != 0 && opc == OST_WRITE) {
1452                         unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1453
1454                         LASSERT(short_io_size >= requested_nob + pg->count);
1455                         memcpy(short_io_buf + requested_nob,
1456                                ptr + poff,
1457                                pg->count);
1458                         ll_kunmap_atomic(ptr, KM_USER0);
1459                 } else if (short_io_size == 0) {
1460                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1461                                                          pg->count);
1462                 }
1463                 requested_nob += pg->count;
1464
1465                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1466                         niobuf--;
1467                         niobuf->rnb_len += pg->count;
1468                 } else {
1469                         niobuf->rnb_offset = pg->off;
1470                         niobuf->rnb_len    = pg->count;
1471                         niobuf->rnb_flags  = pg->flag;
1472                 }
1473                 pg_prev = pg;
1474         }
1475
1476         LASSERTF((void *)(niobuf - niocount) ==
1477                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1478                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1479                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1480
1481         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1482         if (resend) {
1483                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1484                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1485                         body->oa.o_flags = 0;
1486                 }
1487                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1488         }
1489
1490         if (osc_should_shrink_grant(cli))
1491                 osc_shrink_grant_local(cli, &body->oa);
1492
1493         /* size[REQ_REC_OFF] still sizeof (*body) */
1494         if (opc == OST_WRITE) {
1495                 if (cli->cl_checksum &&
1496                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1497                         /* store cl_cksum_type in a local variable since
1498                          * it can be changed via lprocfs */
1499                         enum cksum_types cksum_type = cli->cl_cksum_type;
1500
1501                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1502                                 body->oa.o_flags = 0;
1503
1504                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1505                                                                 cksum_type);
1506                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1507
1508                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1509                                                   requested_nob, page_count,
1510                                                   pga, OST_WRITE,
1511                                                   &body->oa.o_cksum);
1512                         if (rc < 0) {
1513                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1514                                        rc);
1515                                 GOTO(out, rc);
1516                         }
1517                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1518                                body->oa.o_cksum);
1519
1520                         /* save this in 'oa', too, for later checking */
1521                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1522                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1523                                                            cksum_type);
1524                 } else {
1525                         /* clear out the checksum flag, in case this is a
1526                          * resend but cl_checksum is no longer set. b=11238 */
1527                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1528                 }
1529                 oa->o_cksum = body->oa.o_cksum;
1530                 /* 1 RC per niobuf */
1531                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1532                                      sizeof(__u32) * niocount);
1533         } else {
1534                 if (cli->cl_checksum &&
1535                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1536                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1537                                 body->oa.o_flags = 0;
1538                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1539                                 cli->cl_cksum_type);
1540                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1541                 }
1542
1543                 /* Client cksum has been already copied to wire obdo in previous
1544                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1545                  * resent due to cksum error, this will allow Server to
1546                  * check+dump pages on its side */
1547         }
1548         ptlrpc_request_set_replen(req);
1549
1550         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1551         aa = ptlrpc_req_async_args(req);
1552         aa->aa_oa = oa;
1553         aa->aa_requested_nob = requested_nob;
1554         aa->aa_nio_count = niocount;
1555         aa->aa_page_count = page_count;
1556         aa->aa_resends = 0;
1557         aa->aa_ppga = pga;
1558         aa->aa_cli = cli;
1559         INIT_LIST_HEAD(&aa->aa_oaps);
1560
1561         *reqp = req;
1562         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1563         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1564                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1565                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1566         RETURN(0);
1567
1568  out:
1569         ptlrpc_req_finished(req);
1570         RETURN(rc);
1571 }
1572
1573 char dbgcksum_file_name[PATH_MAX];
1574
1575 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1576                                 struct brw_page **pga, __u32 server_cksum,
1577                                 __u32 client_cksum)
1578 {
1579         struct file *filp;
1580         int rc, i;
1581         unsigned int len;
1582         char *buf;
1583
1584         /* will only keep dump of pages on first error for the same range in
1585          * file/fid, not during the resends/retries. */
1586         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1587                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1588                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1589                   libcfs_debug_file_path_arr :
1590                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1591                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1592                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1593                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1594                  pga[0]->off,
1595                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1596                  client_cksum, server_cksum);
1597         filp = filp_open(dbgcksum_file_name,
1598                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1599         if (IS_ERR(filp)) {
1600                 rc = PTR_ERR(filp);
1601                 if (rc == -EEXIST)
1602                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1603                                "checksum error: rc = %d\n", dbgcksum_file_name,
1604                                rc);
1605                 else
1606                         CERROR("%s: can't open to dump pages with checksum "
1607                                "error: rc = %d\n", dbgcksum_file_name, rc);
1608                 return;
1609         }
1610
1611         for (i = 0; i < page_count; i++) {
1612                 len = pga[i]->count;
1613                 buf = kmap(pga[i]->pg);
1614                 while (len != 0) {
1615                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1616                         if (rc < 0) {
1617                                 CERROR("%s: wanted to write %u but got %d "
1618                                        "error\n", dbgcksum_file_name, len, rc);
1619                                 break;
1620                         }
1621                         len -= rc;
1622                         buf += rc;
1623                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1624                                dbgcksum_file_name, rc);
1625                 }
1626                 kunmap(pga[i]->pg);
1627         }
1628
1629         rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1630         if (rc)
1631                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1632         filp_close(filp, NULL);
1633         return;
1634 }
1635
1636 static int
1637 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1638                      __u32 client_cksum, __u32 server_cksum,
1639                      struct osc_brw_async_args *aa)
1640 {
1641         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1642         enum cksum_types cksum_type;
1643         obd_dif_csum_fn *fn = NULL;
1644         int sector_size = 0;
1645         __u32 new_cksum;
1646         char *msg;
1647         int rc;
1648
1649         if (server_cksum == client_cksum) {
1650                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1651                 return 0;
1652         }
1653
1654         if (aa->aa_cli->cl_checksum_dump)
1655                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1656                                     server_cksum, client_cksum);
1657
1658         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1659                                            oa->o_flags : 0);
1660
1661         switch (cksum_type) {
1662         case OBD_CKSUM_T10IP512:
1663                 fn = obd_dif_ip_fn;
1664                 sector_size = 512;
1665                 break;
1666         case OBD_CKSUM_T10IP4K:
1667                 fn = obd_dif_ip_fn;
1668                 sector_size = 4096;
1669                 break;
1670         case OBD_CKSUM_T10CRC512:
1671                 fn = obd_dif_crc_fn;
1672                 sector_size = 512;
1673                 break;
1674         case OBD_CKSUM_T10CRC4K:
1675                 fn = obd_dif_crc_fn;
1676                 sector_size = 4096;
1677                 break;
1678         default:
1679                 break;
1680         }
1681
1682         if (fn)
1683                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1684                                              aa->aa_page_count, aa->aa_ppga,
1685                                              OST_WRITE, fn, sector_size,
1686                                              &new_cksum);
1687         else
1688                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1689                                        aa->aa_ppga, OST_WRITE, cksum_type,
1690                                        &new_cksum);
1691
1692         if (rc < 0)
1693                 msg = "failed to calculate the client write checksum";
1694         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1695                 msg = "the server did not use the checksum type specified in "
1696                       "the original request - likely a protocol problem";
1697         else if (new_cksum == server_cksum)
1698                 msg = "changed on the client after we checksummed it - "
1699                       "likely false positive due to mmap IO (bug 11742)";
1700         else if (new_cksum == client_cksum)
1701                 msg = "changed in transit before arrival at OST";
1702         else
1703                 msg = "changed in transit AND doesn't match the original - "
1704                       "likely false positive due to mmap IO (bug 11742)";
1705
1706         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1707                            DFID " object "DOSTID" extent [%llu-%llu], original "
1708                            "client csum %x (type %x), server csum %x (type %x),"
1709                            " client csum now %x\n",
1710                            obd_name, msg, libcfs_nid2str(peer->nid),
1711                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1712                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1713                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1714                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1715                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1716                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1717                            client_cksum,
1718                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1719                            server_cksum, cksum_type, new_cksum);
1720         return 1;
1721 }
1722
1723 /* Note rc enters this function as number of bytes transferred */
1724 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1725 {
1726         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1727         struct client_obd *cli = aa->aa_cli;
1728         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1729         const struct lnet_process_id *peer =
1730                 &req->rq_import->imp_connection->c_peer;
1731         struct ost_body *body;
1732         u32 client_cksum = 0;
1733         ENTRY;
1734
1735         if (rc < 0 && rc != -EDQUOT) {
1736                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1737                 RETURN(rc);
1738         }
1739
1740         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1741         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1742         if (body == NULL) {
1743                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1744                 RETURN(-EPROTO);
1745         }
1746
1747         /* set/clear over quota flag for a uid/gid/projid */
1748         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1749             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1750                 unsigned qid[LL_MAXQUOTAS] = {
1751                                          body->oa.o_uid, body->oa.o_gid,
1752                                          body->oa.o_projid };
1753                 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1754                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1755                        body->oa.o_valid, body->oa.o_flags);
1756                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1757                                        body->oa.o_flags);
1758         }
1759
1760         osc_update_grant(cli, body);
1761
1762         if (rc < 0)
1763                 RETURN(rc);
1764
1765         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1766                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1767
1768         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1769                 if (rc > 0) {
1770                         CERROR("Unexpected +ve rc %d\n", rc);
1771                         RETURN(-EPROTO);
1772                 }
1773
1774                 if (req->rq_bulk != NULL &&
1775                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1776                         RETURN(-EAGAIN);
1777
1778                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1779                     check_write_checksum(&body->oa, peer, client_cksum,
1780                                          body->oa.o_cksum, aa))
1781                         RETURN(-EAGAIN);
1782
1783                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1784                                      aa->aa_page_count, aa->aa_ppga);
1785                 GOTO(out, rc);
1786         }
1787
1788         /* The rest of this function executes only for OST_READs */
1789
1790         if (req->rq_bulk == NULL) {
1791                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1792                                           RCL_SERVER);
1793                 LASSERT(rc == req->rq_status);
1794         } else {
1795                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1796                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1797         }
1798         if (rc < 0)
1799                 GOTO(out, rc = -EAGAIN);
1800
1801         if (rc > aa->aa_requested_nob) {
1802                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1803                        aa->aa_requested_nob);
1804                 RETURN(-EPROTO);
1805         }
1806
1807         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1808                 CERROR ("Unexpected rc %d (%d transferred)\n",
1809                         rc, req->rq_bulk->bd_nob_transferred);
1810                 return (-EPROTO);
1811         }
1812
1813         if (req->rq_bulk == NULL) {
1814                 /* short io */
1815                 int nob, pg_count, i = 0;
1816                 unsigned char *buf;
1817
1818                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1819                 pg_count = aa->aa_page_count;
1820                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1821                                                    rc);
1822                 nob = rc;
1823                 while (nob > 0 && pg_count > 0) {
1824                         unsigned char *ptr;
1825                         int count = aa->aa_ppga[i]->count > nob ?
1826                                     nob : aa->aa_ppga[i]->count;
1827
1828                         CDEBUG(D_CACHE, "page %p count %d\n",
1829                                aa->aa_ppga[i]->pg, count);
1830                         ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1831                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1832                                count);
1833                         ll_kunmap_atomic((void *) ptr, KM_USER0);
1834
1835                         buf += count;
1836                         nob -= count;
1837                         i++;
1838                         pg_count--;
1839                 }
1840         }
1841
1842         if (rc < aa->aa_requested_nob)
1843                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1844
1845         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1846                 static int cksum_counter;
1847                 u32        server_cksum = body->oa.o_cksum;
1848                 char      *via = "";
1849                 char      *router = "";
1850                 enum cksum_types cksum_type;
1851                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1852                         body->oa.o_flags : 0;
1853
1854                 cksum_type = obd_cksum_type_unpack(o_flags);
1855                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1856                                           aa->aa_page_count, aa->aa_ppga,
1857                                           OST_READ, &client_cksum);
1858                 if (rc < 0)
1859                         GOTO(out, rc);
1860
1861                 if (req->rq_bulk != NULL &&
1862                     peer->nid != req->rq_bulk->bd_sender) {
1863                         via = " via ";
1864                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1865                 }
1866
1867                 if (server_cksum != client_cksum) {
1868                         struct ost_body *clbody;
1869                         u32 page_count = aa->aa_page_count;
1870
1871                         clbody = req_capsule_client_get(&req->rq_pill,
1872                                                         &RMF_OST_BODY);
1873                         if (cli->cl_checksum_dump)
1874                                 dump_all_bulk_pages(&clbody->oa, page_count,
1875                                                     aa->aa_ppga, server_cksum,
1876                                                     client_cksum);
1877
1878                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1879                                            "%s%s%s inode "DFID" object "DOSTID
1880                                            " extent [%llu-%llu], client %x, "
1881                                            "server %x, cksum_type %x\n",
1882                                            obd_name,
1883                                            libcfs_nid2str(peer->nid),
1884                                            via, router,
1885                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1886                                                 clbody->oa.o_parent_seq : 0ULL,
1887                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1888                                                 clbody->oa.o_parent_oid : 0,
1889                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1890                                                 clbody->oa.o_parent_ver : 0,
1891                                            POSTID(&body->oa.o_oi),
1892                                            aa->aa_ppga[0]->off,
1893                                            aa->aa_ppga[page_count-1]->off +
1894                                            aa->aa_ppga[page_count-1]->count - 1,
1895                                            client_cksum, server_cksum,
1896                                            cksum_type);
1897                         cksum_counter = 0;
1898                         aa->aa_oa->o_cksum = client_cksum;
1899                         rc = -EAGAIN;
1900                 } else {
1901                         cksum_counter++;
1902                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1903                         rc = 0;
1904                 }
1905         } else if (unlikely(client_cksum)) {
1906                 static int cksum_missed;
1907
1908                 cksum_missed++;
1909                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1910                         CERROR("Checksum %u requested from %s but not sent\n",
1911                                cksum_missed, libcfs_nid2str(peer->nid));
1912         } else {
1913                 rc = 0;
1914         }
1915 out:
1916         if (rc >= 0)
1917                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1918                                      aa->aa_oa, &body->oa);
1919
1920         RETURN(rc);
1921 }
1922
1923 static int osc_brw_redo_request(struct ptlrpc_request *request,
1924                                 struct osc_brw_async_args *aa, int rc)
1925 {
1926         struct ptlrpc_request *new_req;
1927         struct osc_brw_async_args *new_aa;
1928         struct osc_async_page *oap;
1929         ENTRY;
1930
1931         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1932                   "redo for recoverable error %d", rc);
1933
1934         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1935                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1936                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1937                                   aa->aa_ppga, &new_req, 1);
1938         if (rc)
1939                 RETURN(rc);
1940
1941         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1942                 if (oap->oap_request != NULL) {
1943                         LASSERTF(request == oap->oap_request,
1944                                  "request %p != oap_request %p\n",
1945                                  request, oap->oap_request);
1946                         if (oap->oap_interrupted) {
1947                                 ptlrpc_req_finished(new_req);
1948                                 RETURN(-EINTR);
1949                         }
1950                 }
1951         }
1952         /* New request takes over pga and oaps from old request.
1953          * Note that copying a list_head doesn't work, need to move it... */
1954         aa->aa_resends++;
1955         new_req->rq_interpret_reply = request->rq_interpret_reply;
1956         new_req->rq_async_args = request->rq_async_args;
1957         new_req->rq_commit_cb = request->rq_commit_cb;
1958         /* cap resend delay to the current request timeout, this is similar to
1959          * what ptlrpc does (see after_reply()) */
1960         if (aa->aa_resends > new_req->rq_timeout)
1961                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1962         else
1963                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1964         new_req->rq_generation_set = 1;
1965         new_req->rq_import_generation = request->rq_import_generation;
1966
1967         new_aa = ptlrpc_req_async_args(new_req);
1968
1969         INIT_LIST_HEAD(&new_aa->aa_oaps);
1970         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1971         INIT_LIST_HEAD(&new_aa->aa_exts);
1972         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1973         new_aa->aa_resends = aa->aa_resends;
1974
1975         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1976                 if (oap->oap_request) {
1977                         ptlrpc_req_finished(oap->oap_request);
1978                         oap->oap_request = ptlrpc_request_addref(new_req);
1979                 }
1980         }
1981
1982         /* XXX: This code will run into problem if we're going to support
1983          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1984          * and wait for all of them to be finished. We should inherit request
1985          * set from old request. */
1986         ptlrpcd_add_req(new_req);
1987
1988         DEBUG_REQ(D_INFO, new_req, "new request");
1989         RETURN(0);
1990 }
1991
1992 /*
1993  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1994  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1995  * fine for our small page arrays and doesn't require allocation.  its an
1996  * insertion sort that swaps elements that are strides apart, shrinking the
1997  * stride down until its '1' and the array is sorted.
1998  */
1999 static void sort_brw_pages(struct brw_page **array, int num)
2000 {
2001         int stride, i, j;
2002         struct brw_page *tmp;
2003
2004         if (num == 1)
2005                 return;
2006         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2007                 ;
2008
2009         do {
2010                 stride /= 3;
2011                 for (i = stride ; i < num ; i++) {
2012                         tmp = array[i];
2013                         j = i;
2014                         while (j >= stride && array[j - stride]->off > tmp->off) {
2015                                 array[j] = array[j - stride];
2016                                 j -= stride;
2017                         }
2018                         array[j] = tmp;
2019                 }
2020         } while (stride > 1);
2021 }
2022
2023 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2024 {
2025         LASSERT(ppga != NULL);
2026         OBD_FREE(ppga, sizeof(*ppga) * count);
2027 }
2028
2029 static int brw_interpret(const struct lu_env *env,
2030                          struct ptlrpc_request *req, void *data, int rc)
2031 {
2032         struct osc_brw_async_args *aa = data;
2033         struct osc_extent *ext;
2034         struct osc_extent *tmp;
2035         struct client_obd *cli = aa->aa_cli;
2036         unsigned long           transferred = 0;
2037         ENTRY;
2038
2039         rc = osc_brw_fini_request(req, rc);
2040         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2041         /* When server return -EINPROGRESS, client should always retry
2042          * regardless of the number of times the bulk was resent already. */
2043         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2044                 if (req->rq_import_generation !=
2045                     req->rq_import->imp_generation) {
2046                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2047                                ""DOSTID", rc = %d.\n",
2048                                req->rq_import->imp_obd->obd_name,
2049                                POSTID(&aa->aa_oa->o_oi), rc);
2050                 } else if (rc == -EINPROGRESS ||
2051                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2052                         rc = osc_brw_redo_request(req, aa, rc);
2053                 } else {
2054                         CERROR("%s: too many resent retries for object: "
2055                                "%llu:%llu, rc = %d.\n",
2056                                req->rq_import->imp_obd->obd_name,
2057                                POSTID(&aa->aa_oa->o_oi), rc);
2058                 }
2059
2060                 if (rc == 0)
2061                         RETURN(0);
2062                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2063                         rc = -EIO;
2064         }
2065
2066         if (rc == 0) {
2067                 struct obdo *oa = aa->aa_oa;
2068                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2069                 unsigned long valid = 0;
2070                 struct cl_object *obj;
2071                 struct osc_async_page *last;
2072
2073                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2074                 obj = osc2cl(last->oap_obj);
2075
2076                 cl_object_attr_lock(obj);
2077                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2078                         attr->cat_blocks = oa->o_blocks;
2079                         valid |= CAT_BLOCKS;
2080                 }
2081                 if (oa->o_valid & OBD_MD_FLMTIME) {
2082                         attr->cat_mtime = oa->o_mtime;
2083                         valid |= CAT_MTIME;
2084                 }
2085                 if (oa->o_valid & OBD_MD_FLATIME) {
2086                         attr->cat_atime = oa->o_atime;
2087                         valid |= CAT_ATIME;
2088                 }
2089                 if (oa->o_valid & OBD_MD_FLCTIME) {
2090                         attr->cat_ctime = oa->o_ctime;
2091                         valid |= CAT_CTIME;
2092                 }
2093
2094                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2095                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2096                         loff_t last_off = last->oap_count + last->oap_obj_off +
2097                                 last->oap_page_off;
2098
2099                         /* Change file size if this is an out of quota or
2100                          * direct IO write and it extends the file size */
2101                         if (loi->loi_lvb.lvb_size < last_off) {
2102                                 attr->cat_size = last_off;
2103                                 valid |= CAT_SIZE;
2104                         }
2105                         /* Extend KMS if it's not a lockless write */
2106                         if (loi->loi_kms < last_off &&
2107                             oap2osc_page(last)->ops_srvlock == 0) {
2108                                 attr->cat_kms = last_off;
2109                                 valid |= CAT_KMS;
2110                         }
2111                 }
2112
2113                 if (valid != 0)
2114                         cl_object_attr_update(env, obj, attr, valid);
2115                 cl_object_attr_unlock(obj);
2116         }
2117         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2118         aa->aa_oa = NULL;
2119
2120         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2121                 osc_inc_unstable_pages(req);
2122
2123         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2124                 list_del_init(&ext->oe_link);
2125                 osc_extent_finish(env, ext, 1,
2126                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2127         }
2128         LASSERT(list_empty(&aa->aa_exts));
2129         LASSERT(list_empty(&aa->aa_oaps));
2130
2131         transferred = (req->rq_bulk == NULL ? /* short io */
2132                        aa->aa_requested_nob :
2133                        req->rq_bulk->bd_nob_transferred);
2134
2135         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2136         ptlrpc_lprocfs_brw(req, transferred);
2137
2138         spin_lock(&cli->cl_loi_list_lock);
2139         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2140          * is called so we know whether to go to sync BRWs or wait for more
2141          * RPCs to complete */
2142         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2143                 cli->cl_w_in_flight--;
2144         else
2145                 cli->cl_r_in_flight--;
2146         osc_wake_cache_waiters(cli);
2147         spin_unlock(&cli->cl_loi_list_lock);
2148
2149         osc_io_unplug(env, cli, NULL);
2150         RETURN(rc);
2151 }
2152
2153 static void brw_commit(struct ptlrpc_request *req)
2154 {
2155         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2156          * this called via the rq_commit_cb, I need to ensure
2157          * osc_dec_unstable_pages is still called. Otherwise unstable
2158          * pages may be leaked. */
2159         spin_lock(&req->rq_lock);
2160         if (likely(req->rq_unstable)) {
2161                 req->rq_unstable = 0;
2162                 spin_unlock(&req->rq_lock);
2163
2164                 osc_dec_unstable_pages(req);
2165         } else {
2166                 req->rq_committed = 1;
2167                 spin_unlock(&req->rq_lock);
2168         }
2169 }
2170
2171 /**
2172  * Build an RPC by the list of extent @ext_list. The caller must ensure
2173  * that the total pages in this list are NOT over max pages per RPC.
2174  * Extents in the list must be in OES_RPC state.
2175  */
2176 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2177                   struct list_head *ext_list, int cmd)
2178 {
2179         struct ptlrpc_request           *req = NULL;
2180         struct osc_extent               *ext;
2181         struct brw_page                 **pga = NULL;
2182         struct osc_brw_async_args       *aa = NULL;
2183         struct obdo                     *oa = NULL;
2184         struct osc_async_page           *oap;
2185         struct osc_object               *obj = NULL;
2186         struct cl_req_attr              *crattr = NULL;
2187         loff_t                          starting_offset = OBD_OBJECT_EOF;
2188         loff_t                          ending_offset = 0;
2189         int                             mpflag = 0;
2190         int                             mem_tight = 0;
2191         int                             page_count = 0;
2192         bool                            soft_sync = false;
2193         bool                            interrupted = false;
2194         bool                            ndelay = false;
2195         int                             i;
2196         int                             grant = 0;
2197         int                             rc;
2198         __u32                           layout_version = 0;
2199         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
2200         struct ost_body                 *body;
2201         ENTRY;
2202         LASSERT(!list_empty(ext_list));
2203
2204         /* add pages into rpc_list to build BRW rpc */
2205         list_for_each_entry(ext, ext_list, oe_link) {
2206                 LASSERT(ext->oe_state == OES_RPC);
2207                 mem_tight |= ext->oe_memalloc;
2208                 grant += ext->oe_grants;
2209                 page_count += ext->oe_nr_pages;
2210                 layout_version = MAX(layout_version, ext->oe_layout_version);
2211                 if (obj == NULL)
2212                         obj = ext->oe_obj;
2213         }
2214
2215         soft_sync = osc_over_unstable_soft_limit(cli);
2216         if (mem_tight)
2217                 mpflag = cfs_memory_pressure_get_and_set();
2218
2219         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2220         if (pga == NULL)
2221                 GOTO(out, rc = -ENOMEM);
2222
2223         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2224         if (oa == NULL)
2225                 GOTO(out, rc = -ENOMEM);
2226
2227         i = 0;
2228         list_for_each_entry(ext, ext_list, oe_link) {
2229                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2230                         if (mem_tight)
2231                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2232                         if (soft_sync)
2233                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2234                         pga[i] = &oap->oap_brw_page;
2235                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2236                         i++;
2237
2238                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2239                         if (starting_offset == OBD_OBJECT_EOF ||
2240                             starting_offset > oap->oap_obj_off)
2241                                 starting_offset = oap->oap_obj_off;
2242                         else
2243                                 LASSERT(oap->oap_page_off == 0);
2244                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2245                                 ending_offset = oap->oap_obj_off +
2246                                                 oap->oap_count;
2247                         else
2248                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2249                                         PAGE_SIZE);
2250                         if (oap->oap_interrupted)
2251                                 interrupted = true;
2252                 }
2253                 if (ext->oe_ndelay)
2254                         ndelay = true;
2255         }
2256
2257         /* first page in the list */
2258         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2259
2260         crattr = &osc_env_info(env)->oti_req_attr;
2261         memset(crattr, 0, sizeof(*crattr));
2262         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2263         crattr->cra_flags = ~0ULL;
2264         crattr->cra_page = oap2cl_page(oap);
2265         crattr->cra_oa = oa;
2266         cl_req_attr_set(env, osc2cl(obj), crattr);
2267
2268         if (cmd == OBD_BRW_WRITE) {
2269                 oa->o_grant_used = grant;
2270                 if (layout_version > 0) {
2271                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2272                                PFID(&oa->o_oi.oi_fid), layout_version);
2273
2274                         oa->o_layout_version = layout_version;
2275                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2276                 }
2277         }
2278
2279         sort_brw_pages(pga, page_count);
2280         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2281         if (rc != 0) {
2282                 CERROR("prep_req failed: %d\n", rc);
2283                 GOTO(out, rc);
2284         }
2285
2286         req->rq_commit_cb = brw_commit;
2287         req->rq_interpret_reply = brw_interpret;
2288         req->rq_memalloc = mem_tight != 0;
2289         oap->oap_request = ptlrpc_request_addref(req);
2290         if (interrupted && !req->rq_intr)
2291                 ptlrpc_mark_interrupted(req);
2292         if (ndelay) {
2293                 req->rq_no_resend = req->rq_no_delay = 1;
2294                 /* probably set a shorter timeout value.
2295                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2296                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2297         }
2298
2299         /* Need to update the timestamps after the request is built in case
2300          * we race with setattr (locally or in queue at OST).  If OST gets
2301          * later setattr before earlier BRW (as determined by the request xid),
2302          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2303          * way to do this in a single call.  bug 10150 */
2304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305         crattr->cra_oa = &body->oa;
2306         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2307         cl_req_attr_set(env, osc2cl(obj), crattr);
2308         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2309
2310         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2311         aa = ptlrpc_req_async_args(req);
2312         INIT_LIST_HEAD(&aa->aa_oaps);
2313         list_splice_init(&rpc_list, &aa->aa_oaps);
2314         INIT_LIST_HEAD(&aa->aa_exts);
2315         list_splice_init(ext_list, &aa->aa_exts);
2316
2317         spin_lock(&cli->cl_loi_list_lock);
2318         starting_offset >>= PAGE_SHIFT;
2319         if (cmd == OBD_BRW_READ) {
2320                 cli->cl_r_in_flight++;
2321                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2322                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2323                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2324                                       starting_offset + 1);
2325         } else {
2326                 cli->cl_w_in_flight++;
2327                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2328                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2329                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2330                                       starting_offset + 1);
2331         }
2332         spin_unlock(&cli->cl_loi_list_lock);
2333
2334         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2335                   page_count, aa, cli->cl_r_in_flight,
2336                   cli->cl_w_in_flight);
2337         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2338
2339         ptlrpcd_add_req(req);
2340         rc = 0;
2341         EXIT;
2342
2343 out:
2344         if (mem_tight != 0)
2345                 cfs_memory_pressure_restore(mpflag);
2346
2347         if (rc != 0) {
2348                 LASSERT(req == NULL);
2349
2350                 if (oa)
2351                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2352                 if (pga)
2353                         OBD_FREE(pga, sizeof(*pga) * page_count);
2354                 /* this should happen rarely and is pretty bad, it makes the
2355                  * pending list not follow the dirty order */
2356                 while (!list_empty(ext_list)) {
2357                         ext = list_entry(ext_list->next, struct osc_extent,
2358                                          oe_link);
2359                         list_del_init(&ext->oe_link);
2360                         osc_extent_finish(env, ext, 0, rc);
2361                 }
2362         }
2363         RETURN(rc);
2364 }
2365
2366 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2367 {
2368         int set = 0;
2369
2370         LASSERT(lock != NULL);
2371
2372         lock_res_and_lock(lock);
2373
2374         if (lock->l_ast_data == NULL)
2375                 lock->l_ast_data = data;
2376         if (lock->l_ast_data == data)
2377                 set = 1;
2378
2379         unlock_res_and_lock(lock);
2380
2381         return set;
2382 }
2383
2384 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2385                      void *cookie, struct lustre_handle *lockh,
2386                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2387                      int errcode)
2388 {
2389         bool intent = *flags & LDLM_FL_HAS_INTENT;
2390         int rc;
2391         ENTRY;
2392
2393         /* The request was created before ldlm_cli_enqueue call. */
2394         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2395                 struct ldlm_reply *rep;
2396
2397                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2398                 LASSERT(rep != NULL);
2399
2400                 rep->lock_policy_res1 =
2401                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2402                 if (rep->lock_policy_res1)
2403                         errcode = rep->lock_policy_res1;
2404                 if (!speculative)
2405                         *flags |= LDLM_FL_LVB_READY;
2406         } else if (errcode == ELDLM_OK) {
2407                 *flags |= LDLM_FL_LVB_READY;
2408         }
2409
2410         /* Call the update callback. */
2411         rc = (*upcall)(cookie, lockh, errcode);
2412
2413         /* release the reference taken in ldlm_cli_enqueue() */
2414         if (errcode == ELDLM_LOCK_MATCHED)
2415                 errcode = ELDLM_OK;
2416         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2417                 ldlm_lock_decref(lockh, mode);
2418
2419         RETURN(rc);
2420 }
2421
2422 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2423                           struct osc_enqueue_args *aa, int rc)
2424 {
2425         struct ldlm_lock *lock;
2426         struct lustre_handle *lockh = &aa->oa_lockh;
2427         enum ldlm_mode mode = aa->oa_mode;
2428         struct ost_lvb *lvb = aa->oa_lvb;
2429         __u32 lvb_len = sizeof(*lvb);
2430         __u64 flags = 0;
2431
2432         ENTRY;
2433
2434         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2435          * be valid. */
2436         lock = ldlm_handle2lock(lockh);
2437         LASSERTF(lock != NULL,
2438                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2439                  lockh->cookie, req, aa);
2440
2441         /* Take an additional reference so that a blocking AST that
2442          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2443          * to arrive after an upcall has been executed by
2444          * osc_enqueue_fini(). */
2445         ldlm_lock_addref(lockh, mode);
2446
2447         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2448         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2449
2450         /* Let CP AST to grant the lock first. */
2451         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2452
2453         if (aa->oa_speculative) {
2454                 LASSERT(aa->oa_lvb == NULL);
2455                 LASSERT(aa->oa_flags == NULL);
2456                 aa->oa_flags = &flags;
2457         }
2458
2459         /* Complete obtaining the lock procedure. */
2460         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2461                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2462                                    lockh, rc);
2463         /* Complete osc stuff. */
2464         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2465                               aa->oa_flags, aa->oa_speculative, rc);
2466
2467         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2468
2469         ldlm_lock_decref(lockh, mode);
2470         LDLM_LOCK_PUT(lock);
2471         RETURN(rc);
2472 }
2473
2474 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2475
2476 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2477  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2478  * other synchronous requests, however keeping some locks and trying to obtain
2479  * others may take a considerable amount of time in a case of ost failure; and
2480  * when other sync requests do not get released lock from a client, the client
2481  * is evicted from the cluster -- such scenarious make the life difficult, so
2482  * release locks just after they are obtained. */
2483 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2484                      __u64 *flags, union ldlm_policy_data *policy,
2485                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2486                      void *cookie, struct ldlm_enqueue_info *einfo,
2487                      struct ptlrpc_request_set *rqset, int async,
2488                      bool speculative)
2489 {
2490         struct obd_device *obd = exp->exp_obd;
2491         struct lustre_handle lockh = { 0 };
2492         struct ptlrpc_request *req = NULL;
2493         int intent = *flags & LDLM_FL_HAS_INTENT;
2494         __u64 match_flags = *flags;
2495         enum ldlm_mode mode;
2496         int rc;
2497         ENTRY;
2498
2499         /* Filesystem lock extents are extended to page boundaries so that
2500          * dealing with the page cache is a little smoother.  */
2501         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2502         policy->l_extent.end |= ~PAGE_MASK;
2503
2504         /* Next, search for already existing extent locks that will cover us */
2505         /* If we're trying to read, we also search for an existing PW lock.  The
2506          * VFS and page cache already protect us locally, so lots of readers/
2507          * writers can share a single PW lock.
2508          *
2509          * There are problems with conversion deadlocks, so instead of
2510          * converting a read lock to a write lock, we'll just enqueue a new
2511          * one.
2512          *
2513          * At some point we should cancel the read lock instead of making them
2514          * send us a blocking callback, but there are problems with canceling
2515          * locks out from other users right now, too. */
2516         mode = einfo->ei_mode;
2517         if (einfo->ei_mode == LCK_PR)
2518                 mode |= LCK_PW;
2519         /* Normal lock requests must wait for the LVB to be ready before
2520          * matching a lock; speculative lock requests do not need to,
2521          * because they will not actually use the lock. */
2522         if (!speculative)
2523                 match_flags |= LDLM_FL_LVB_READY;
2524         if (intent != 0)
2525                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2526         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2527                                einfo->ei_type, policy, mode, &lockh, 0);
2528         if (mode) {
2529                 struct ldlm_lock *matched;
2530
2531                 if (*flags & LDLM_FL_TEST_LOCK)
2532                         RETURN(ELDLM_OK);
2533
2534                 matched = ldlm_handle2lock(&lockh);
2535                 if (speculative) {
2536                         /* This DLM lock request is speculative, and does not
2537                          * have an associated IO request. Therefore if there
2538                          * is already a DLM lock, it wll just inform the
2539                          * caller to cancel the request for this stripe.*/
2540                         lock_res_and_lock(matched);
2541                         if (ldlm_extent_equal(&policy->l_extent,
2542                             &matched->l_policy_data.l_extent))
2543                                 rc = -EEXIST;
2544                         else
2545                                 rc = -ECANCELED;
2546                         unlock_res_and_lock(matched);
2547
2548                         ldlm_lock_decref(&lockh, mode);
2549                         LDLM_LOCK_PUT(matched);
2550                         RETURN(rc);
2551                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2552                         *flags |= LDLM_FL_LVB_READY;
2553
2554                         /* We already have a lock, and it's referenced. */
2555                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2556
2557                         ldlm_lock_decref(&lockh, mode);
2558                         LDLM_LOCK_PUT(matched);
2559                         RETURN(ELDLM_OK);
2560                 } else {
2561                         ldlm_lock_decref(&lockh, mode);
2562                         LDLM_LOCK_PUT(matched);
2563                 }
2564         }
2565
2566         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2567                 RETURN(-ENOLCK);
2568
2569         if (intent) {
2570                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2571                                            &RQF_LDLM_ENQUEUE_LVB);
2572                 if (req == NULL)
2573                         RETURN(-ENOMEM);
2574
2575                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2576                 if (rc) {
2577                         ptlrpc_request_free(req);
2578                         RETURN(rc);
2579                 }
2580
2581                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2582                                      sizeof *lvb);
2583                 ptlrpc_request_set_replen(req);
2584         }
2585
2586         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2587         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2588
2589         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2590                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2591         if (async) {
2592                 if (!rc) {
2593                         struct osc_enqueue_args *aa;
2594                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2595                         aa = ptlrpc_req_async_args(req);
2596                         aa->oa_exp         = exp;
2597                         aa->oa_mode        = einfo->ei_mode;
2598                         aa->oa_type        = einfo->ei_type;
2599                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2600                         aa->oa_upcall      = upcall;
2601                         aa->oa_cookie      = cookie;
2602                         aa->oa_speculative = speculative;
2603                         if (!speculative) {
2604                                 aa->oa_flags  = flags;
2605                                 aa->oa_lvb    = lvb;
2606                         } else {
2607                                 /* speculative locks are essentially to enqueue
2608                                  * a DLM lock  in advance, so we don't care
2609                                  * about the result of the enqueue. */
2610                                 aa->oa_lvb    = NULL;
2611                                 aa->oa_flags  = NULL;
2612                         }
2613
2614                         req->rq_interpret_reply =
2615                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2616                         if (rqset == PTLRPCD_SET)
2617                                 ptlrpcd_add_req(req);
2618                         else
2619                                 ptlrpc_set_add_req(rqset, req);
2620                 } else if (intent) {
2621                         ptlrpc_req_finished(req);
2622                 }
2623                 RETURN(rc);
2624         }
2625
2626         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2627                               flags, speculative, rc);
2628         if (intent)
2629                 ptlrpc_req_finished(req);
2630
2631         RETURN(rc);
2632 }
2633
2634 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2635                    struct ldlm_res_id *res_id, enum ldlm_type type,
2636                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2637                    __u64 *flags, struct osc_object *obj,
2638                    struct lustre_handle *lockh, int unref)
2639 {
2640         struct obd_device *obd = exp->exp_obd;
2641         __u64 lflags = *flags;
2642         enum ldlm_mode rc;
2643         ENTRY;
2644
2645         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2646                 RETURN(-EIO);
2647
2648         /* Filesystem lock extents are extended to page boundaries so that
2649          * dealing with the page cache is a little smoother */
2650         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2651         policy->l_extent.end |= ~PAGE_MASK;
2652
2653         /* Next, search for already existing extent locks that will cover us */
2654         /* If we're trying to read, we also search for an existing PW lock.  The
2655          * VFS and page cache already protect us locally, so lots of readers/
2656          * writers can share a single PW lock. */
2657         rc = mode;
2658         if (mode == LCK_PR)
2659                 rc |= LCK_PW;
2660         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2661                              res_id, type, policy, rc, lockh, unref);
2662         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2663                 RETURN(rc);
2664
2665         if (obj != NULL) {
2666                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2667
2668                 LASSERT(lock != NULL);
2669                 if (osc_set_lock_data(lock, obj)) {
2670                         lock_res_and_lock(lock);
2671                         if (!ldlm_is_lvb_cached(lock)) {
2672                                 LASSERT(lock->l_ast_data == obj);
2673                                 osc_lock_lvb_update(env, obj, lock, NULL);
2674                                 ldlm_set_lvb_cached(lock);
2675                         }
2676                         unlock_res_and_lock(lock);
2677                 } else {
2678                         ldlm_lock_decref(lockh, rc);
2679                         rc = 0;
2680                 }
2681                 LDLM_LOCK_PUT(lock);
2682         }
2683         RETURN(rc);
2684 }
2685
2686 static int osc_statfs_interpret(const struct lu_env *env,
2687                                 struct ptlrpc_request *req,
2688                                 struct osc_async_args *aa, int rc)
2689 {
2690         struct obd_statfs *msfs;
2691         ENTRY;
2692
2693         if (rc == -EBADR)
2694                 /* The request has in fact never been sent
2695                  * due to issues at a higher level (LOV).
2696                  * Exit immediately since the caller is
2697                  * aware of the problem and takes care
2698                  * of the clean up */
2699                  RETURN(rc);
2700
2701         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2702             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2703                 GOTO(out, rc = 0);
2704
2705         if (rc != 0)
2706                 GOTO(out, rc);
2707
2708         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2709         if (msfs == NULL) {
2710                 GOTO(out, rc = -EPROTO);
2711         }
2712
2713         *aa->aa_oi->oi_osfs = *msfs;
2714 out:
2715         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2716         RETURN(rc);
2717 }
2718
2719 static int osc_statfs_async(struct obd_export *exp,
2720                             struct obd_info *oinfo, time64_t max_age,
2721                             struct ptlrpc_request_set *rqset)
2722 {
2723         struct obd_device     *obd = class_exp2obd(exp);
2724         struct ptlrpc_request *req;
2725         struct osc_async_args *aa;
2726         int rc;
2727         ENTRY;
2728
2729         /* We could possibly pass max_age in the request (as an absolute
2730          * timestamp or a "seconds.usec ago") so the target can avoid doing
2731          * extra calls into the filesystem if that isn't necessary (e.g.
2732          * during mount that would help a bit).  Having relative timestamps
2733          * is not so great if request processing is slow, while absolute
2734          * timestamps are not ideal because they need time synchronization. */
2735         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2736         if (req == NULL)
2737                 RETURN(-ENOMEM);
2738
2739         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2740         if (rc) {
2741                 ptlrpc_request_free(req);
2742                 RETURN(rc);
2743         }
2744         ptlrpc_request_set_replen(req);
2745         req->rq_request_portal = OST_CREATE_PORTAL;
2746         ptlrpc_at_set_req_timeout(req);
2747
2748         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2749                 /* procfs requests not want stat in wait for avoid deadlock */
2750                 req->rq_no_resend = 1;
2751                 req->rq_no_delay = 1;
2752         }
2753
2754         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2755         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2756         aa = ptlrpc_req_async_args(req);
2757         aa->aa_oi = oinfo;
2758
2759         ptlrpc_set_add_req(rqset, req);
2760         RETURN(0);
2761 }
2762
2763 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2764                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2765 {
2766         struct obd_device     *obd = class_exp2obd(exp);
2767         struct obd_statfs     *msfs;
2768         struct ptlrpc_request *req;
2769         struct obd_import     *imp = NULL;
2770         int rc;
2771         ENTRY;
2772
2773
2774         /*Since the request might also come from lprocfs, so we need
2775          *sync this with client_disconnect_export Bug15684*/
2776         down_read(&obd->u.cli.cl_sem);
2777         if (obd->u.cli.cl_import)
2778                 imp = class_import_get(obd->u.cli.cl_import);
2779         up_read(&obd->u.cli.cl_sem);
2780         if (!imp)
2781                 RETURN(-ENODEV);
2782
2783         /* We could possibly pass max_age in the request (as an absolute
2784          * timestamp or a "seconds.usec ago") so the target can avoid doing
2785          * extra calls into the filesystem if that isn't necessary (e.g.
2786          * during mount that would help a bit).  Having relative timestamps
2787          * is not so great if request processing is slow, while absolute
2788          * timestamps are not ideal because they need time synchronization. */
2789         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2790
2791         class_import_put(imp);
2792
2793         if (req == NULL)
2794                 RETURN(-ENOMEM);
2795
2796         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2797         if (rc) {
2798                 ptlrpc_request_free(req);
2799                 RETURN(rc);
2800         }
2801         ptlrpc_request_set_replen(req);
2802         req->rq_request_portal = OST_CREATE_PORTAL;
2803         ptlrpc_at_set_req_timeout(req);
2804
2805         if (flags & OBD_STATFS_NODELAY) {
2806                 /* procfs requests not want stat in wait for avoid deadlock */
2807                 req->rq_no_resend = 1;
2808                 req->rq_no_delay = 1;
2809         }
2810
2811         rc = ptlrpc_queue_wait(req);
2812         if (rc)
2813                 GOTO(out, rc);
2814
2815         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2816         if (msfs == NULL)
2817                 GOTO(out, rc = -EPROTO);
2818
2819         *osfs = *msfs;
2820
2821         EXIT;
2822 out:
2823         ptlrpc_req_finished(req);
2824         return rc;
2825 }
2826
2827 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2828                          void *karg, void __user *uarg)
2829 {
2830         struct obd_device *obd = exp->exp_obd;
2831         struct obd_ioctl_data *data = karg;
2832         int err = 0;
2833         ENTRY;
2834
2835         if (!try_module_get(THIS_MODULE)) {
2836                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2837                        module_name(THIS_MODULE));
2838                 return -EINVAL;
2839         }
2840         switch (cmd) {
2841         case OBD_IOC_CLIENT_RECOVER:
2842                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2843                                             data->ioc_inlbuf1, 0);
2844                 if (err > 0)
2845                         err = 0;
2846                 GOTO(out, err);
2847         case IOC_OSC_SET_ACTIVE:
2848                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2849                                                data->ioc_offset);
2850                 GOTO(out, err);
2851         case OBD_IOC_PING_TARGET:
2852                 err = ptlrpc_obd_ping(obd);
2853                 GOTO(out, err);
2854         default:
2855                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2856                        cmd, current_comm());
2857                 GOTO(out, err = -ENOTTY);
2858         }
2859 out:
2860         module_put(THIS_MODULE);
2861         return err;
2862 }
2863
2864 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2865                        u32 keylen, void *key, u32 vallen, void *val,
2866                        struct ptlrpc_request_set *set)
2867 {
2868         struct ptlrpc_request *req;
2869         struct obd_device     *obd = exp->exp_obd;
2870         struct obd_import     *imp = class_exp2cliimp(exp);
2871         char                  *tmp;
2872         int                    rc;
2873         ENTRY;
2874
2875         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2876
2877         if (KEY_IS(KEY_CHECKSUM)) {
2878                 if (vallen != sizeof(int))
2879                         RETURN(-EINVAL);
2880                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2881                 RETURN(0);
2882         }
2883
2884         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2885                 sptlrpc_conf_client_adapt(obd);
2886                 RETURN(0);
2887         }
2888
2889         if (KEY_IS(KEY_FLUSH_CTX)) {
2890                 sptlrpc_import_flush_my_ctx(imp);
2891                 RETURN(0);
2892         }
2893
2894         if (KEY_IS(KEY_CACHE_SET)) {
2895                 struct client_obd *cli = &obd->u.cli;
2896
2897                 LASSERT(cli->cl_cache == NULL); /* only once */
2898                 cli->cl_cache = (struct cl_client_cache *)val;
2899                 cl_cache_incref(cli->cl_cache);
2900                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2901
2902                 /* add this osc into entity list */
2903                 LASSERT(list_empty(&cli->cl_lru_osc));
2904                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2905                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2906                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2907
2908                 RETURN(0);
2909         }
2910
2911         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2912                 struct client_obd *cli = &obd->u.cli;
2913                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2914                 long target = *(long *)val;
2915
2916                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2917                 *(long *)val -= nr;
2918                 RETURN(0);
2919         }
2920
2921         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2922                 RETURN(-EINVAL);
2923
2924         /* We pass all other commands directly to OST. Since nobody calls osc
2925            methods directly and everybody is supposed to go through LOV, we
2926            assume lov checked invalid values for us.
2927            The only recognised values so far are evict_by_nid and mds_conn.
2928            Even if something bad goes through, we'd get a -EINVAL from OST
2929            anyway. */
2930
2931         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2932                                                 &RQF_OST_SET_GRANT_INFO :
2933                                                 &RQF_OBD_SET_INFO);
2934         if (req == NULL)
2935                 RETURN(-ENOMEM);
2936
2937         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2938                              RCL_CLIENT, keylen);
2939         if (!KEY_IS(KEY_GRANT_SHRINK))
2940                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2941                                      RCL_CLIENT, vallen);
2942         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2943         if (rc) {
2944                 ptlrpc_request_free(req);
2945                 RETURN(rc);
2946         }
2947
2948         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2949         memcpy(tmp, key, keylen);
2950         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2951                                                         &RMF_OST_BODY :
2952                                                         &RMF_SETINFO_VAL);
2953         memcpy(tmp, val, vallen);
2954
2955         if (KEY_IS(KEY_GRANT_SHRINK)) {
2956                 struct osc_grant_args *aa;
2957                 struct obdo *oa;
2958
2959                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2960                 aa = ptlrpc_req_async_args(req);
2961                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2962                 if (!oa) {
2963                         ptlrpc_req_finished(req);
2964                         RETURN(-ENOMEM);
2965                 }
2966                 *oa = ((struct ost_body *)val)->oa;
2967                 aa->aa_oa = oa;
2968                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2969         }
2970
2971         ptlrpc_request_set_replen(req);
2972         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2973                 LASSERT(set != NULL);
2974                 ptlrpc_set_add_req(set, req);
2975                 ptlrpc_check_set(NULL, set);
2976         } else {
2977                 ptlrpcd_add_req(req);
2978         }
2979
2980         RETURN(0);
2981 }
2982 EXPORT_SYMBOL(osc_set_info_async);
2983
2984 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2985                   struct obd_device *obd, struct obd_uuid *cluuid,
2986                   struct obd_connect_data *data, void *localdata)
2987 {
2988         struct client_obd *cli = &obd->u.cli;
2989
2990         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2991                 long lost_grant;
2992                 long grant;
2993
2994                 spin_lock(&cli->cl_loi_list_lock);
2995                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2996                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2997                         /* restore ocd_grant_blkbits as client page bits */
2998                         data->ocd_grant_blkbits = PAGE_SHIFT;
2999                         grant += cli->cl_dirty_grant;
3000                 } else {
3001                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3002                 }
3003                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3004                 lost_grant = cli->cl_lost_grant;
3005                 cli->cl_lost_grant = 0;
3006                 spin_unlock(&cli->cl_loi_list_lock);
3007
3008                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3009                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3010                        data->ocd_version, data->ocd_grant, lost_grant);
3011         }
3012
3013         RETURN(0);
3014 }
3015 EXPORT_SYMBOL(osc_reconnect);
3016
3017 int osc_disconnect(struct obd_export *exp)
3018 {
3019         struct obd_device *obd = class_exp2obd(exp);
3020         int rc;
3021
3022         rc = client_disconnect_export(exp);
3023         /**
3024          * Initially we put del_shrink_grant before disconnect_export, but it
3025          * causes the following problem if setup (connect) and cleanup
3026          * (disconnect) are tangled together.
3027          *      connect p1                     disconnect p2
3028          *   ptlrpc_connect_import
3029          *     ...............               class_manual_cleanup
3030          *                                     osc_disconnect
3031          *                                     del_shrink_grant
3032          *   ptlrpc_connect_interrupt
3033          *     osc_init_grant
3034          *   add this client to shrink list
3035          *                                      cleanup_osc
3036          * Bang! grant shrink thread trigger the shrink. BUG18662
3037          */
3038         osc_del_grant_list(&obd->u.cli);
3039         return rc;
3040 }
3041 EXPORT_SYMBOL(osc_disconnect);
3042
3043 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3044                                  struct hlist_node *hnode, void *arg)
3045 {
3046         struct lu_env *env = arg;
3047         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3048         struct ldlm_lock *lock;
3049         struct osc_object *osc = NULL;
3050         ENTRY;
3051
3052         lock_res(res);
3053         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3054                 if (lock->l_ast_data != NULL && osc == NULL) {
3055                         osc = lock->l_ast_data;
3056                         cl_object_get(osc2cl(osc));
3057                 }
3058
3059                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3060                  * by the 2nd round of ldlm_namespace_clean() call in
3061                  * osc_import_event(). */
3062                 ldlm_clear_cleaned(lock);
3063         }
3064         unlock_res(res);
3065
3066         if (osc != NULL) {
3067                 osc_object_invalidate(env, osc);
3068                 cl_object_put(env, osc2cl(osc));
3069         }
3070
3071         RETURN(0);
3072 }
3073 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3074
3075 static int osc_import_event(struct obd_device *obd,
3076                             struct obd_import *imp,
3077                             enum obd_import_event event)
3078 {
3079         struct client_obd *cli;
3080         int rc = 0;
3081
3082         ENTRY;
3083         LASSERT(imp->imp_obd == obd);
3084
3085         switch (event) {
3086         case IMP_EVENT_DISCON: {
3087                 cli = &obd->u.cli;
3088                 spin_lock(&cli->cl_loi_list_lock);
3089                 cli->cl_avail_grant = 0;
3090                 cli->cl_lost_grant = 0;
3091                 spin_unlock(&cli->cl_loi_list_lock);
3092                 break;
3093         }
3094         case IMP_EVENT_INACTIVE: {
3095                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3096                 break;
3097         }
3098         case IMP_EVENT_INVALIDATE: {
3099                 struct ldlm_namespace *ns = obd->obd_namespace;
3100                 struct lu_env         *env;
3101                 __u16                  refcheck;
3102
3103                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3104
3105                 env = cl_env_get(&refcheck);
3106                 if (!IS_ERR(env)) {
3107                         osc_io_unplug(env, &obd->u.cli, NULL);
3108
3109                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3110                                                  osc_ldlm_resource_invalidate,
3111                                                  env, 0);
3112                         cl_env_put(env, &refcheck);
3113
3114                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3115                 } else
3116                         rc = PTR_ERR(env);
3117                 break;
3118         }
3119         case IMP_EVENT_ACTIVE: {
3120                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3121                 break;
3122         }
3123         case IMP_EVENT_OCD: {
3124                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3125
3126                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3127                         osc_init_grant(&obd->u.cli, ocd);
3128
3129                 /* See bug 7198 */
3130                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3131                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3132
3133                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3134                 break;
3135         }
3136         case IMP_EVENT_DEACTIVATE: {
3137                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3138                 break;
3139         }
3140         case IMP_EVENT_ACTIVATE: {
3141                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3142                 break;
3143         }
3144         default:
3145                 CERROR("Unknown import event %d\n", event);
3146                 LBUG();
3147         }
3148         RETURN(rc);
3149 }
3150
3151 /**
3152  * Determine whether the lock can be canceled before replaying the lock
3153  * during recovery, see bug16774 for detailed information.
3154  *
3155  * \retval zero the lock can't be canceled
3156  * \retval other ok to cancel
3157  */
3158 static int osc_cancel_weight(struct ldlm_lock *lock)
3159 {
3160         /*
3161          * Cancel all unused and granted extent lock.
3162          */
3163         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3164             ldlm_is_granted(lock) &&
3165             osc_ldlm_weigh_ast(lock) == 0)
3166                 RETURN(1);
3167
3168         RETURN(0);
3169 }
3170
3171 static int brw_queue_work(const struct lu_env *env, void *data)
3172 {
3173         struct client_obd *cli = data;
3174
3175         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3176
3177         osc_io_unplug(env, cli, NULL);
3178         RETURN(0);
3179 }
3180
3181 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3182 {
3183         struct client_obd *cli = &obd->u.cli;
3184         void *handler;
3185         int rc;
3186
3187         ENTRY;
3188
3189         rc = ptlrpcd_addref();
3190         if (rc)
3191                 RETURN(rc);
3192
3193         rc = client_obd_setup(obd, lcfg);
3194         if (rc)
3195                 GOTO(out_ptlrpcd, rc);
3196
3197
3198         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3199         if (IS_ERR(handler))
3200                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3201         cli->cl_writeback_work = handler;
3202
3203         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3204         if (IS_ERR(handler))
3205                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3206         cli->cl_lru_work = handler;
3207
3208         rc = osc_quota_setup(obd);
3209         if (rc)
3210                 GOTO(out_ptlrpcd_work, rc);
3211
3212         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3213         osc_update_next_shrink(cli);
3214
3215         RETURN(rc);
3216
3217 out_ptlrpcd_work:
3218         if (cli->cl_writeback_work != NULL) {
3219                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3220                 cli->cl_writeback_work = NULL;
3221         }
3222         if (cli->cl_lru_work != NULL) {
3223                 ptlrpcd_destroy_work(cli->cl_lru_work);
3224                 cli->cl_lru_work = NULL;
3225         }
3226         client_obd_cleanup(obd);
3227 out_ptlrpcd:
3228         ptlrpcd_decref();
3229         RETURN(rc);
3230 }
3231 EXPORT_SYMBOL(osc_setup_common);
3232
3233 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3234 {
3235         struct client_obd *cli = &obd->u.cli;
3236         int                adding;
3237         int                added;
3238         int                req_count;
3239         int                rc;
3240
3241         ENTRY;
3242
3243         rc = osc_setup_common(obd, lcfg);
3244         if (rc < 0)
3245                 RETURN(rc);
3246
3247         rc = osc_tunables_init(obd);
3248         if (rc)
3249                 RETURN(rc);
3250
3251         /*
3252          * We try to control the total number of requests with a upper limit
3253          * osc_reqpool_maxreqcount. There might be some race which will cause
3254          * over-limit allocation, but it is fine.
3255          */
3256         req_count = atomic_read(&osc_pool_req_count);
3257         if (req_count < osc_reqpool_maxreqcount) {
3258                 adding = cli->cl_max_rpcs_in_flight + 2;
3259                 if (req_count + adding > osc_reqpool_maxreqcount)
3260                         adding = osc_reqpool_maxreqcount - req_count;
3261
3262                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3263                 atomic_add(added, &osc_pool_req_count);
3264         }
3265
3266         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3267
3268         spin_lock(&osc_shrink_lock);
3269         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3270         spin_unlock(&osc_shrink_lock);
3271         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3272         cli->cl_import->imp_idle_debug = D_HA;
3273
3274         RETURN(0);
3275 }
3276
3277 int osc_precleanup_common(struct obd_device *obd)
3278 {
3279         struct client_obd *cli = &obd->u.cli;
3280         ENTRY;
3281
3282         /* LU-464
3283          * for echo client, export may be on zombie list, wait for
3284          * zombie thread to cull it, because cli.cl_import will be
3285          * cleared in client_disconnect_export():
3286          *   class_export_destroy() -> obd_cleanup() ->
3287          *   echo_device_free() -> echo_client_cleanup() ->
3288          *   obd_disconnect() -> osc_disconnect() ->
3289          *   client_disconnect_export()
3290          */
3291         obd_zombie_barrier();
3292         if (cli->cl_writeback_work) {
3293                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3294                 cli->cl_writeback_work = NULL;
3295         }
3296
3297         if (cli->cl_lru_work) {
3298                 ptlrpcd_destroy_work(cli->cl_lru_work);
3299                 cli->cl_lru_work = NULL;
3300         }
3301
3302         obd_cleanup_client_import(obd);
3303         RETURN(0);
3304 }
3305 EXPORT_SYMBOL(osc_precleanup_common);
3306
3307 static int osc_precleanup(struct obd_device *obd)
3308 {
3309         ENTRY;
3310
3311         osc_precleanup_common(obd);
3312
3313         ptlrpc_lprocfs_unregister_obd(obd);
3314         RETURN(0);
3315 }
3316
3317 int osc_cleanup_common(struct obd_device *obd)
3318 {
3319         struct client_obd *cli = &obd->u.cli;
3320         int rc;
3321
3322         ENTRY;
3323
3324         spin_lock(&osc_shrink_lock);
3325         list_del(&cli->cl_shrink_list);
3326         spin_unlock(&osc_shrink_lock);
3327
3328         /* lru cleanup */
3329         if (cli->cl_cache != NULL) {
3330                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3331                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3332                 list_del_init(&cli->cl_lru_osc);
3333                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3334                 cli->cl_lru_left = NULL;
3335                 cl_cache_decref(cli->cl_cache);
3336                 cli->cl_cache = NULL;
3337         }
3338
3339         /* free memory of osc quota cache */
3340         osc_quota_cleanup(obd);
3341
3342         rc = client_obd_cleanup(obd);
3343
3344         ptlrpcd_decref();
3345         RETURN(rc);
3346 }
3347 EXPORT_SYMBOL(osc_cleanup_common);
3348
3349 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3350 {
3351         ssize_t count  = class_modify_config(lcfg, PARAM_OSC,
3352                                              &obd->obd_kset.kobj);
3353         return count > 0 ? 0 : count;
3354 }
3355
3356 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3357 {
3358         return osc_process_config_base(obd, buf);
3359 }
3360
3361 static struct obd_ops osc_obd_ops = {
3362         .o_owner                = THIS_MODULE,
3363         .o_setup                = osc_setup,
3364         .o_precleanup           = osc_precleanup,
3365         .o_cleanup              = osc_cleanup_common,
3366         .o_add_conn             = client_import_add_conn,
3367         .o_del_conn             = client_import_del_conn,
3368         .o_connect              = client_connect_import,
3369         .o_reconnect            = osc_reconnect,
3370         .o_disconnect           = osc_disconnect,
3371         .o_statfs               = osc_statfs,
3372         .o_statfs_async         = osc_statfs_async,
3373         .o_create               = osc_create,
3374         .o_destroy              = osc_destroy,
3375         .o_getattr              = osc_getattr,
3376         .o_setattr              = osc_setattr,
3377         .o_iocontrol            = osc_iocontrol,
3378         .o_set_info_async       = osc_set_info_async,
3379         .o_import_event         = osc_import_event,
3380         .o_process_config       = osc_process_config,
3381         .o_quotactl             = osc_quotactl,
3382 };
3383
3384 static struct shrinker *osc_cache_shrinker;
3385 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3386 DEFINE_SPINLOCK(osc_shrink_lock);
3387
3388 #ifndef HAVE_SHRINKER_COUNT
3389 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3390 {
3391         struct shrink_control scv = {
3392                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3393                 .gfp_mask   = shrink_param(sc, gfp_mask)
3394         };
3395 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3396         struct shrinker *shrinker = NULL;
3397 #endif
3398
3399         (void)osc_cache_shrink_scan(shrinker, &scv);
3400
3401         return osc_cache_shrink_count(shrinker, &scv);
3402 }
3403 #endif
3404
3405 static int __init osc_init(void)
3406 {
3407         bool enable_proc = true;
3408         struct obd_type *type;
3409         unsigned int reqpool_size;
3410         unsigned int reqsize;
3411         int rc;
3412         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3413                          osc_cache_shrink_count, osc_cache_shrink_scan);
3414         ENTRY;
3415
3416         /* print an address of _any_ initialized kernel symbol from this
3417          * module, to allow debugging with gdb that doesn't support data
3418          * symbols from modules.*/
3419         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3420
3421         rc = lu_kmem_init(osc_caches);
3422         if (rc)
3423                 RETURN(rc);
3424
3425         type = class_search_type(LUSTRE_OSP_NAME);
3426         if (type != NULL && type->typ_procsym != NULL)
3427                 enable_proc = false;
3428
3429         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3430                                  LUSTRE_OSC_NAME, &osc_device_type);
3431         if (rc)
3432                 GOTO(out_kmem, rc);
3433
3434         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3435
3436         /* This is obviously too much memory, only prevent overflow here */
3437         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3438                 GOTO(out_type, rc = -EINVAL);
3439
3440         reqpool_size = osc_reqpool_mem_max << 20;
3441
3442         reqsize = 1;
3443         while (reqsize < OST_IO_MAXREQSIZE)
3444                 reqsize = reqsize << 1;
3445
3446         /*
3447          * We don't enlarge the request count in OSC pool according to
3448          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3449          * tried after normal allocation failed. So a small OSC pool won't
3450          * cause much performance degression in most of cases.
3451          */
3452         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3453
3454         atomic_set(&osc_pool_req_count, 0);
3455         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3456                                           ptlrpc_add_rqs_to_pool);
3457
3458         if (osc_rq_pool == NULL)
3459                 GOTO(out_type, rc = -ENOMEM);
3460
3461         rc = osc_start_grant_work();
3462         if (rc != 0)
3463                 GOTO(out_req_pool, rc);
3464
3465         RETURN(rc);
3466
3467 out_req_pool:
3468         ptlrpc_free_rq_pool(osc_rq_pool);
3469 out_type:
3470         class_unregister_type(LUSTRE_OSC_NAME);
3471 out_kmem:
3472         lu_kmem_fini(osc_caches);
3473
3474         RETURN(rc);
3475 }
3476
3477 static void __exit osc_exit(void)
3478 {
3479         osc_stop_grant_work();
3480         remove_shrinker(osc_cache_shrinker);
3481         class_unregister_type(LUSTRE_OSC_NAME);
3482         lu_kmem_fini(osc_caches);
3483         ptlrpc_free_rq_pool(osc_rq_pool);
3484 }
3485
3486 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3487 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3488 MODULE_VERSION(LUSTRE_VERSION_STRING);
3489 MODULE_LICENSE("GPL");
3490
3491 module_init(osc_init);
3492 module_exit(osc_exit);