Whamcloud - gitweb
LU-3606 fallocate: Implement fallocate preallocate operation
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_OSC
34
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
46 #include <obd.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
51
52 #include "osc_internal.h"
53
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
57
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
61
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
64
65 #define osc_grant_args osc_brw_async_args
66
67 struct osc_setattr_args {
68         struct obdo             *sa_oa;
69         obd_enqueue_update_f     sa_upcall;
70         void                    *sa_cookie;
71 };
72
73 struct osc_fsync_args {
74         struct osc_object       *fa_obj;
75         struct obdo             *fa_oa;
76         obd_enqueue_update_f    fa_upcall;
77         void                    *fa_cookie;
78 };
79
80 struct osc_ladvise_args {
81         struct obdo             *la_oa;
82         obd_enqueue_update_f     la_upcall;
83         void                    *la_cookie;
84 };
85
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88                          void *data, int rc);
89
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 {
92         struct ost_body *body;
93
94         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95         LASSERT(body);
96
97         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 }
99
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101                        struct obdo *oa)
102 {
103         struct ptlrpc_request   *req;
104         struct ost_body         *body;
105         int                      rc;
106
107         ENTRY;
108         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109         if (req == NULL)
110                 RETURN(-ENOMEM);
111
112         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113         if (rc) {
114                 ptlrpc_request_free(req);
115                 RETURN(rc);
116         }
117
118         osc_pack_req_body(req, oa);
119
120         ptlrpc_request_set_replen(req);
121
122         rc = ptlrpc_queue_wait(req);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127         if (body == NULL)
128                 GOTO(out, rc = -EPROTO);
129
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132
133         oa->o_blksize = cli_brw_size(exp->exp_obd);
134         oa->o_valid |= OBD_MD_FLBLKSZ;
135
136         EXIT;
137 out:
138         ptlrpc_req_finished(req);
139
140         return rc;
141 }
142
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144                        struct obdo *oa)
145 {
146         struct ptlrpc_request   *req;
147         struct ost_body         *body;
148         int                      rc;
149
150         ENTRY;
151         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152
153         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158         if (rc) {
159                 ptlrpc_request_free(req);
160                 RETURN(rc);
161         }
162
163         osc_pack_req_body(req, oa);
164
165         ptlrpc_request_set_replen(req);
166
167         rc = ptlrpc_queue_wait(req);
168         if (rc)
169                 GOTO(out, rc);
170
171         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172         if (body == NULL)
173                 GOTO(out, rc = -EPROTO);
174
175         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176
177         EXIT;
178 out:
179         ptlrpc_req_finished(req);
180
181         RETURN(rc);
182 }
183
184 static int osc_setattr_interpret(const struct lu_env *env,
185                                  struct ptlrpc_request *req, void *args, int rc)
186 {
187         struct osc_setattr_args *sa = args;
188         struct ost_body *body;
189
190         ENTRY;
191
192         if (rc != 0)
193                 GOTO(out, rc);
194
195         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196         if (body == NULL)
197                 GOTO(out, rc = -EPROTO);
198
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200                              &body->oa);
201 out:
202         rc = sa->sa_upcall(sa->sa_cookie, rc);
203         RETURN(rc);
204 }
205
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207                       obd_enqueue_update_f upcall, void *cookie,
208                       struct ptlrpc_request_set *rqset)
209 {
210         struct ptlrpc_request   *req;
211         struct osc_setattr_args *sa;
212         int                      rc;
213
214         ENTRY;
215
216         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217         if (req == NULL)
218                 RETURN(-ENOMEM);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         osc_pack_req_body(req, oa);
227
228         ptlrpc_request_set_replen(req);
229
230         /* do mds to ost setattr asynchronously */
231         if (!rqset) {
232                 /* Do not wait for response. */
233                 ptlrpcd_add_req(req);
234         } else {
235                 req->rq_interpret_reply = osc_setattr_interpret;
236
237                 sa = ptlrpc_req_async_args(sa, req);
238                 sa->sa_oa = oa;
239                 sa->sa_upcall = upcall;
240                 sa->sa_cookie = cookie;
241
242                 ptlrpc_set_add_req(rqset, req);
243         }
244
245         RETURN(0);
246 }
247
248 static int osc_ladvise_interpret(const struct lu_env *env,
249                                  struct ptlrpc_request *req,
250                                  void *arg, int rc)
251 {
252         struct osc_ladvise_args *la = arg;
253         struct ost_body *body;
254         ENTRY;
255
256         if (rc != 0)
257                 GOTO(out, rc);
258
259         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 GOTO(out, rc = -EPROTO);
262
263         *la->la_oa = body->oa;
264 out:
265         rc = la->la_upcall(la->la_cookie, rc);
266         RETURN(rc);
267 }
268
269 /**
270  * If rqset is NULL, do not wait for response. Upcall and cookie could also
271  * be NULL in this case
272  */
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274                      struct ladvise_hdr *ladvise_hdr,
275                      obd_enqueue_update_f upcall, void *cookie,
276                      struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct ost_body         *body;
280         struct osc_ladvise_args *la;
281         int                      rc;
282         struct lu_ladvise       *req_ladvise;
283         struct lu_ladvise       *ladvise = ladvise_hdr->lah_advise;
284         int                      num_advise = ladvise_hdr->lah_count;
285         struct ladvise_hdr      *req_ladvise_hdr;
286         ENTRY;
287
288         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289         if (req == NULL)
290                 RETURN(-ENOMEM);
291
292         req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293                              num_advise * sizeof(*ladvise));
294         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295         if (rc != 0) {
296                 ptlrpc_request_free(req);
297                 RETURN(rc);
298         }
299         req->rq_request_portal = OST_IO_PORTAL;
300         ptlrpc_at_set_req_timeout(req);
301
302         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303         LASSERT(body);
304         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305                              oa);
306
307         req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308                                                  &RMF_OST_LADVISE_HDR);
309         memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310
311         req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312         memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313         ptlrpc_request_set_replen(req);
314
315         if (rqset == NULL) {
316                 /* Do not wait for response. */
317                 ptlrpcd_add_req(req);
318                 RETURN(0);
319         }
320
321         req->rq_interpret_reply = osc_ladvise_interpret;
322         la = ptlrpc_req_async_args(la, req);
323         la->la_oa = oa;
324         la->la_upcall = upcall;
325         la->la_cookie = cookie;
326
327         ptlrpc_set_add_req(rqset, req);
328
329         RETURN(0);
330 }
331
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333                       struct obdo *oa)
334 {
335         struct ptlrpc_request *req;
336         struct ost_body       *body;
337         int                    rc;
338         ENTRY;
339
340         LASSERT(oa != NULL);
341         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343
344         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345         if (req == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349         if (rc) {
350                 ptlrpc_request_free(req);
351                 GOTO(out, rc);
352         }
353
354         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355         LASSERT(body);
356
357         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358
359         ptlrpc_request_set_replen(req);
360
361         rc = ptlrpc_queue_wait(req);
362         if (rc)
363                 GOTO(out_req, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out_req, rc = -EPROTO);
368
369         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371
372         oa->o_blksize = cli_brw_size(exp->exp_obd);
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         CDEBUG(D_HA, "transno: %lld\n",
376                lustre_msg_get_transno(req->rq_repmsg));
377 out_req:
378         ptlrpc_req_finished(req);
379 out:
380         RETURN(rc);
381 }
382
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384                    obd_enqueue_update_f upcall, void *cookie)
385 {
386         struct ptlrpc_request *req;
387         struct osc_setattr_args *sa;
388         struct obd_import *imp = class_exp2cliimp(exp);
389         struct ost_body *body;
390         int rc;
391
392         ENTRY;
393
394         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395         if (req == NULL)
396                 RETURN(-ENOMEM);
397
398         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399         if (rc < 0) {
400                 ptlrpc_request_free(req);
401                 RETURN(rc);
402         }
403
404         osc_set_io_portal(req);
405
406         ptlrpc_at_set_req_timeout(req);
407
408         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409
410         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411
412         ptlrpc_request_set_replen(req);
413
414         req->rq_interpret_reply = osc_setattr_interpret;
415         sa = ptlrpc_req_async_args(sa, req);
416         sa->sa_oa = oa;
417         sa->sa_upcall = upcall;
418         sa->sa_cookie = cookie;
419
420         ptlrpcd_add_req(req);
421
422         RETURN(0);
423 }
424 EXPORT_SYMBOL(osc_punch_send);
425
426 /**
427  * osc_fallocate_base() - Handles fallocate request.
428  *
429  * @exp:        Export structure
430  * @oa:         Attributes passed to OSS from client (obdo structure)
431  * @upcall:     Primary & supplementary group information
432  * @cookie:     Exclusive identifier
433  * @rqset:      Request list.
434  * @mode:       Operation done on given range.
435  *
436  * osc_fallocate_base() - Handles fallocate requests only. Only block
437  * allocation or standard preallocate operation is supported currently.
438  * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439  * is supported via SETATTR request.
440  *
441  * Return: Non-zero on failure and O on success.
442  */
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444                        obd_enqueue_update_f upcall, void *cookie, int mode)
445 {
446         struct ptlrpc_request *req;
447         struct osc_setattr_args *sa;
448         struct ost_body *body;
449         struct obd_import *imp = class_exp2cliimp(exp);
450         int rc;
451         ENTRY;
452
453         /*
454          * Only mode == 0 (which is standard prealloc) is supported now.
455          * Punch is not supported yet.
456          */
457         if (mode & ~FALLOC_FL_KEEP_SIZE)
458                 RETURN(-EOPNOTSUPP);
459         oa->o_falloc_mode = mode;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                    &RQF_OST_FALLOCATE);
463         if (req == NULL)
464                 RETURN(-ENOMEM);
465
466         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
467         if (rc != 0) {
468                 ptlrpc_request_free(req);
469                 RETURN(rc);
470         }
471
472         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
473         LASSERT(body);
474
475         lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
476
477         ptlrpc_request_set_replen(req);
478
479         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480         BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481         sa = ptlrpc_req_async_args(sa, req);
482         sa->sa_oa = oa;
483         sa->sa_upcall = upcall;
484         sa->sa_cookie = cookie;
485
486         ptlrpcd_add_req(req);
487
488         RETURN(0);
489 }
490
491 static int osc_sync_interpret(const struct lu_env *env,
492                               struct ptlrpc_request *req, void *args, int rc)
493 {
494         struct osc_fsync_args *fa = args;
495         struct ost_body *body;
496         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497         unsigned long valid = 0;
498         struct cl_object *obj;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL) {
506                 CERROR("can't unpack ost_body\n");
507                 GOTO(out, rc = -EPROTO);
508         }
509
510         *fa->fa_oa = body->oa;
511         obj = osc2cl(fa->fa_obj);
512
513         /* Update osc object's blocks attribute */
514         cl_object_attr_lock(obj);
515         if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516                 attr->cat_blocks = body->oa.o_blocks;
517                 valid |= CAT_BLOCKS;
518         }
519
520         if (valid != 0)
521                 cl_object_attr_update(env, obj, attr, valid);
522         cl_object_attr_unlock(obj);
523
524 out:
525         rc = fa->fa_upcall(fa->fa_cookie, rc);
526         RETURN(rc);
527 }
528
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530                   obd_enqueue_update_f upcall, void *cookie,
531                   struct ptlrpc_request_set *rqset)
532 {
533         struct obd_export     *exp = osc_export(obj);
534         struct ptlrpc_request *req;
535         struct ost_body       *body;
536         struct osc_fsync_args *fa;
537         int                    rc;
538         ENTRY;
539
540         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(rc);
548         }
549
550         /* overload the size and blocks fields in the oa with start/end */
551         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
552         LASSERT(body);
553         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
554
555         ptlrpc_request_set_replen(req);
556         req->rq_interpret_reply = osc_sync_interpret;
557
558         fa = ptlrpc_req_async_args(fa, req);
559         fa->fa_obj = obj;
560         fa->fa_oa = oa;
561         fa->fa_upcall = upcall;
562         fa->fa_cookie = cookie;
563
564         ptlrpc_set_add_req(rqset, req);
565
566         RETURN (0);
567 }
568
569 /* Find and cancel locally locks matched by @mode in the resource found by
570  * @objid. Found locks are added into @cancel list. Returns the amount of
571  * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573                                    struct list_head *cancels,
574                                    enum ldlm_mode mode, __u64 lock_flags)
575 {
576         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577         struct ldlm_res_id res_id;
578         struct ldlm_resource *res;
579         int count;
580         ENTRY;
581
582         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583          * export) but disabled through procfs (flag in NS).
584          *
585          * This distinguishes from a case when ELC is not supported originally,
586          * when we still want to cancel locks in advance and just cancel them
587          * locally, without sending any RPC. */
588         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
589                 RETURN(0);
590
591         ostid_build_res_name(&oa->o_oi, &res_id);
592         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
593         if (IS_ERR(res))
594                 RETURN(0);
595
596         LDLM_RESOURCE_ADDREF(res);
597         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598                                            lock_flags, 0, NULL);
599         LDLM_RESOURCE_DELREF(res);
600         ldlm_resource_putref(res);
601         RETURN(count);
602 }
603
604 static int osc_destroy_interpret(const struct lu_env *env,
605                                  struct ptlrpc_request *req, void *args, int rc)
606 {
607         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
608
609         atomic_dec(&cli->cl_destroy_in_flight);
610         wake_up(&cli->cl_destroy_waitq);
611
612         return 0;
613 }
614
615 static int osc_can_send_destroy(struct client_obd *cli)
616 {
617         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618             cli->cl_max_rpcs_in_flight) {
619                 /* The destroy request can be sent */
620                 return 1;
621         }
622         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623             cli->cl_max_rpcs_in_flight) {
624                 /*
625                  * The counter has been modified between the two atomic
626                  * operations.
627                  */
628                 wake_up(&cli->cl_destroy_waitq);
629         }
630         return 0;
631 }
632
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
634                        struct obdo *oa)
635 {
636         struct client_obd     *cli = &exp->exp_obd->u.cli;
637         struct ptlrpc_request *req;
638         struct ost_body       *body;
639         LIST_HEAD(cancels);
640         int rc, count;
641         ENTRY;
642
643         if (!oa) {
644                 CDEBUG(D_INFO, "oa NULL\n");
645                 RETURN(-EINVAL);
646         }
647
648         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649                                         LDLM_FL_DISCARD_DATA);
650
651         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
652         if (req == NULL) {
653                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
654                 RETURN(-ENOMEM);
655         }
656
657         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
658                                0, &cancels, count);
659         if (rc) {
660                 ptlrpc_request_free(req);
661                 RETURN(rc);
662         }
663
664         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665         ptlrpc_at_set_req_timeout(req);
666
667         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
668         LASSERT(body);
669         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
670
671         ptlrpc_request_set_replen(req);
672
673         req->rq_interpret_reply = osc_destroy_interpret;
674         if (!osc_can_send_destroy(cli)) {
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 rc = l_wait_event_abortable_exclusive(
680                         cli->cl_destroy_waitq,
681                         osc_can_send_destroy(cli));
682                 if (rc) {
683                         ptlrpc_req_finished(req);
684                         RETURN(-EINTR);
685                 }
686         }
687
688         /* Do not wait for response */
689         ptlrpcd_add_req(req);
690         RETURN(0);
691 }
692
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
694                                 long writing_bytes)
695 {
696         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
697
698         LASSERT(!(oa->o_valid & bits));
699
700         oa->o_valid |= bits;
701         spin_lock(&cli->cl_loi_list_lock);
702         if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703                 oa->o_dirty = cli->cl_dirty_grant;
704         else
705                 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706         if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712                             (long)(obd_max_dirty_pages + 1))) {
713                 /* The atomic_read() allowing the atomic_inc() are
714                  * not covered by a lock thus they may safely race and trip
715                  * this CERROR() unless we add in a small fudge factor (+1). */
716                 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717                        cli_name(cli), atomic_long_read(&obd_dirty_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long nrpages;
727                 unsigned long undirty;
728
729                 nrpages = cli->cl_max_pages_per_rpc;
730                 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731                 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732                 undirty = nrpages << PAGE_SHIFT;
733                 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
734                                  GRANT_PARAM)) {
735                         int nrextents;
736
737                         /* take extent tax into account when asking for more
738                          * grant space */
739                         nrextents = (nrpages + cli->cl_max_extent_pages - 1)  /
740                                      cli->cl_max_extent_pages;
741                         undirty += nrextents * cli->cl_grant_extent_tax;
742                 }
743                 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744                  * to add extent tax, etc.
745                  */
746                 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747                                     ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748         }
749         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750         oa->o_dropped = cli->cl_lost_grant;
751         cli->cl_lost_grant = 0;
752         spin_unlock(&cli->cl_loi_list_lock);
753         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
754                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
755 }
756
757 void osc_update_next_shrink(struct client_obd *cli)
758 {
759         cli->cl_next_shrink_grant = ktime_get_seconds() +
760                                     cli->cl_grant_shrink_interval;
761
762         CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763                cli->cl_next_shrink_grant);
764 }
765
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
767 {
768         spin_lock(&cli->cl_loi_list_lock);
769         cli->cl_avail_grant += grant;
770         spin_unlock(&cli->cl_loi_list_lock);
771 }
772
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
774 {
775         if (body->oa.o_valid & OBD_MD_FLGRANT) {
776                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777                 __osc_update_grant(cli, body->oa.o_grant);
778         }
779 }
780
781 /**
782  * grant thread data for shrinking space.
783  */
784 struct grant_thread_data {
785         struct list_head        gtd_clients;
786         struct mutex            gtd_mutex;
787         unsigned long           gtd_stopped:1;
788 };
789 static struct grant_thread_data client_gtd;
790
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792                                       struct ptlrpc_request *req,
793                                       void *args, int rc)
794 {
795         struct osc_grant_args *aa = args;
796         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797         struct ost_body *body;
798
799         if (rc != 0) {
800                 __osc_update_grant(cli, aa->aa_oa->o_grant);
801                 GOTO(out, rc);
802         }
803
804         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
805         LASSERT(body);
806         osc_update_grant(cli, body);
807 out:
808         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
809         aa->aa_oa = NULL;
810
811         return rc;
812 }
813
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
815 {
816         spin_lock(&cli->cl_loi_list_lock);
817         oa->o_grant = cli->cl_avail_grant / 4;
818         cli->cl_avail_grant -= oa->o_grant;
819         spin_unlock(&cli->cl_loi_list_lock);
820         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821                 oa->o_valid |= OBD_MD_FLFLAGS;
822                 oa->o_flags = 0;
823         }
824         oa->o_flags |= OBD_FL_SHRINK_GRANT;
825         osc_update_next_shrink(cli);
826 }
827
828 /* Shrink the current grant, either from some large amount to enough for a
829  * full set of in-flight RPCs, or if we have already shrunk to that limit
830  * then to enough for a single RPC.  This avoids keeping more grant than
831  * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
833 {
834         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835                              (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
836
837         spin_lock(&cli->cl_loi_list_lock);
838         if (cli->cl_avail_grant <= target_bytes)
839                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840         spin_unlock(&cli->cl_loi_list_lock);
841
842         return osc_shrink_grant_to_target(cli, target_bytes);
843 }
844
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
846 {
847         int                     rc = 0;
848         struct ost_body        *body;
849         ENTRY;
850
851         spin_lock(&cli->cl_loi_list_lock);
852         /* Don't shrink if we are already above or below the desired limit
853          * We don't want to shrink below a single RPC, as that will negatively
854          * impact block allocation and long-term performance. */
855         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
857
858         if (target_bytes >= cli->cl_avail_grant) {
859                 spin_unlock(&cli->cl_loi_list_lock);
860                 RETURN(0);
861         }
862         spin_unlock(&cli->cl_loi_list_lock);
863
864         OBD_ALLOC_PTR(body);
865         if (!body)
866                 RETURN(-ENOMEM);
867
868         osc_announce_cached(cli, &body->oa, 0);
869
870         spin_lock(&cli->cl_loi_list_lock);
871         if (target_bytes >= cli->cl_avail_grant) {
872                 /* available grant has changed since target calculation */
873                 spin_unlock(&cli->cl_loi_list_lock);
874                 GOTO(out_free, rc = 0);
875         }
876         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877         cli->cl_avail_grant = target_bytes;
878         spin_unlock(&cli->cl_loi_list_lock);
879         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880                 body->oa.o_valid |= OBD_MD_FLFLAGS;
881                 body->oa.o_flags = 0;
882         }
883         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884         osc_update_next_shrink(cli);
885
886         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888                                 sizeof(*body), body, NULL);
889         if (rc != 0)
890                 __osc_update_grant(cli, body->oa.o_grant);
891 out_free:
892         OBD_FREE_PTR(body);
893         RETURN(rc);
894 }
895
896 static int osc_should_shrink_grant(struct client_obd *client)
897 {
898         time64_t next_shrink = client->cl_next_shrink_grant;
899
900         if (client->cl_import == NULL)
901                 return 0;
902
903         if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904             client->cl_import->imp_grant_shrink_disabled) {
905                 osc_update_next_shrink(client);
906                 return 0;
907         }
908
909         if (ktime_get_seconds() >= next_shrink - 5) {
910                 /* Get the current RPC size directly, instead of going via:
911                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912                  * Keep comment here so that it can be found by searching. */
913                 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
914
915                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916                     client->cl_avail_grant > brw_size)
917                         return 1;
918                 else
919                         osc_update_next_shrink(client);
920         }
921         return 0;
922 }
923
924 #define GRANT_SHRINK_RPC_BATCH  100
925
926 static struct delayed_work work;
927
928 static void osc_grant_work_handler(struct work_struct *data)
929 {
930         struct client_obd *cli;
931         int rpc_sent;
932         bool init_next_shrink = true;
933         time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
934
935         rpc_sent = 0;
936         mutex_lock(&client_gtd.gtd_mutex);
937         list_for_each_entry(cli, &client_gtd.gtd_clients,
938                             cl_grant_chain) {
939                 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940                     osc_should_shrink_grant(cli)) {
941                         osc_shrink_grant(cli);
942                         rpc_sent++;
943                 }
944
945                 if (!init_next_shrink) {
946                         if (cli->cl_next_shrink_grant < next_shrink &&
947                             cli->cl_next_shrink_grant > ktime_get_seconds())
948                                 next_shrink = cli->cl_next_shrink_grant;
949                 } else {
950                         init_next_shrink = false;
951                         next_shrink = cli->cl_next_shrink_grant;
952                 }
953         }
954         mutex_unlock(&client_gtd.gtd_mutex);
955
956         if (client_gtd.gtd_stopped == 1)
957                 return;
958
959         if (next_shrink > ktime_get_seconds()) {
960                 time64_t delay = next_shrink - ktime_get_seconds();
961
962                 schedule_delayed_work(&work, cfs_time_seconds(delay));
963         } else {
964                 schedule_work(&work.work);
965         }
966 }
967
968 void osc_schedule_grant_work(void)
969 {
970         cancel_delayed_work_sync(&work);
971         schedule_work(&work.work);
972 }
973
974 /**
975  * Start grant thread for returing grant to server for idle clients.
976  */
977 static int osc_start_grant_work(void)
978 {
979         client_gtd.gtd_stopped = 0;
980         mutex_init(&client_gtd.gtd_mutex);
981         INIT_LIST_HEAD(&client_gtd.gtd_clients);
982
983         INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984         schedule_work(&work.work);
985
986         return 0;
987 }
988
989 static void osc_stop_grant_work(void)
990 {
991         client_gtd.gtd_stopped = 1;
992         cancel_delayed_work_sync(&work);
993 }
994
995 static void osc_add_grant_list(struct client_obd *client)
996 {
997         mutex_lock(&client_gtd.gtd_mutex);
998         list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999         mutex_unlock(&client_gtd.gtd_mutex);
1000 }
1001
1002 static void osc_del_grant_list(struct client_obd *client)
1003 {
1004         if (list_empty(&client->cl_grant_chain))
1005                 return;
1006
1007         mutex_lock(&client_gtd.gtd_mutex);
1008         list_del_init(&client->cl_grant_chain);
1009         mutex_unlock(&client_gtd.gtd_mutex);
1010 }
1011
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1013 {
1014         /*
1015          * ocd_grant is the total grant amount we're expect to hold: if we've
1016          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1018          * dirty.
1019          *
1020          * race is tolerable here: if we're evicted, but imp_state already
1021          * left EVICTED state, then cl_dirty_pages must be 0 already.
1022          */
1023         spin_lock(&cli->cl_loi_list_lock);
1024         cli->cl_avail_grant = ocd->ocd_grant;
1025         if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026                 cli->cl_avail_grant -= cli->cl_reserved_grant;
1027                 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1028                         cli->cl_avail_grant -= cli->cl_dirty_grant;
1029                 else
1030                         cli->cl_avail_grant -=
1031                                         cli->cl_dirty_pages << PAGE_SHIFT;
1032         }
1033
1034         if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1035                 u64 size;
1036                 int chunk_mask;
1037
1038                 /* overhead for each extent insertion */
1039                 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1040                 /* determine the appropriate chunk size used by osc_extent. */
1041                 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1042                                           ocd->ocd_grant_blkbits);
1043                 /* max_pages_per_rpc must be chunk aligned */
1044                 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1045                 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1046                                              ~chunk_mask) & chunk_mask;
1047                 /* determine maximum extent size, in #pages */
1048                 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1049                 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1050                 if (cli->cl_max_extent_pages == 0)
1051                         cli->cl_max_extent_pages = 1;
1052         } else {
1053                 cli->cl_grant_extent_tax = 0;
1054                 cli->cl_chunkbits = PAGE_SHIFT;
1055                 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1056         }
1057         spin_unlock(&cli->cl_loi_list_lock);
1058
1059         CDEBUG(D_CACHE,
1060                "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1061                cli_name(cli),
1062                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1063                cli->cl_max_extent_pages);
1064
1065         if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1066                 osc_add_grant_list(cli);
1067 }
1068 EXPORT_SYMBOL(osc_init_grant);
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, size_t page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            size_t page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0) {
1127                         CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1128                                i, remote_rcs[i], req);
1129                         return remote_rcs[i];
1130                 }
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138         if (req->rq_bulk != NULL &&
1139             req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.whamcloud.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1169 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1170                                    size_t pg_count, struct brw_page **pga,
1171                                    int opc, obd_dif_csum_fn *fn,
1172                                    int sector_size,
1173                                    u32 *check_sum)
1174 {
1175         struct ahash_request *req;
1176         /* Used Adler as the default checksum type on top of DIF tags */
1177         unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1178         struct page *__page;
1179         unsigned char *buffer;
1180         __u16 *guard_start;
1181         unsigned int bufsize;
1182         int guard_number;
1183         int used_number = 0;
1184         int used;
1185         u32 cksum;
1186         int rc = 0;
1187         int i = 0;
1188
1189         LASSERT(pg_count > 0);
1190
1191         __page = alloc_page(GFP_KERNEL);
1192         if (__page == NULL)
1193                 return -ENOMEM;
1194
1195         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(req)) {
1197                 rc = PTR_ERR(req);
1198                 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1199                        obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1200                 GOTO(out, rc);
1201         }
1202
1203         buffer = kmap(__page);
1204         guard_start = (__u16 *)buffer;
1205         guard_number = PAGE_SIZE / sizeof(*guard_start);
1206         while (nob > 0 && pg_count > 0) {
1207                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (unlikely(i == 0 && opc == OST_READ &&
1212                              OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1213                         unsigned char *ptr = kmap(pga[i]->pg);
1214                         int off = pga[i]->off & ~PAGE_MASK;
1215
1216                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1217                         kunmap(pga[i]->pg);
1218                 }
1219
1220                 /*
1221                  * The left guard number should be able to hold checksums of a
1222                  * whole page
1223                  */
1224                 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1225                                                   pga[i]->off & ~PAGE_MASK,
1226                                                   count,
1227                                                   guard_start + used_number,
1228                                                   guard_number - used_number,
1229                                                   &used, sector_size,
1230                                                   fn);
1231                 if (rc)
1232                         break;
1233
1234                 used_number += used;
1235                 if (used_number == guard_number) {
1236                         cfs_crypto_hash_update_page(req, __page, 0,
1237                                 used_number * sizeof(*guard_start));
1238                         used_number = 0;
1239                 }
1240
1241                 nob -= pga[i]->count;
1242                 pg_count--;
1243                 i++;
1244         }
1245         kunmap(__page);
1246         if (rc)
1247                 GOTO(out, rc);
1248
1249         if (used_number != 0)
1250                 cfs_crypto_hash_update_page(req, __page, 0,
1251                         used_number * sizeof(*guard_start));
1252
1253         bufsize = sizeof(cksum);
1254         cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1255
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         *check_sum = cksum;
1262 out:
1263         __free_page(__page);
1264         return rc;
1265 }
1266 #else /* !CONFIG_CRC_T10DIF */
1267 #define obd_dif_ip_fn NULL
1268 #define obd_dif_crc_fn NULL
1269 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum)  \
1270         -EOPNOTSUPP
1271 #endif /* CONFIG_CRC_T10DIF */
1272
1273 static int osc_checksum_bulk(int nob, size_t pg_count,
1274                              struct brw_page **pga, int opc,
1275                              enum cksum_types cksum_type,
1276                              u32 *cksum)
1277 {
1278         int                             i = 0;
1279         struct ahash_request           *req;
1280         unsigned int                    bufsize;
1281         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1282
1283         LASSERT(pg_count > 0);
1284
1285         req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1286         if (IS_ERR(req)) {
1287                 CERROR("Unable to initialize checksum hash %s\n",
1288                        cfs_crypto_hash_name(cfs_alg));
1289                 return PTR_ERR(req);
1290         }
1291
1292         while (nob > 0 && pg_count > 0) {
1293                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1294
1295                 /* corrupt the data before we compute the checksum, to
1296                  * simulate an OST->client data error */
1297                 if (i == 0 && opc == OST_READ &&
1298                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1299                         unsigned char *ptr = kmap(pga[i]->pg);
1300                         int off = pga[i]->off & ~PAGE_MASK;
1301
1302                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1303                         kunmap(pga[i]->pg);
1304                 }
1305                 cfs_crypto_hash_update_page(req, pga[i]->pg,
1306                                             pga[i]->off & ~PAGE_MASK,
1307                                             count);
1308                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1309                                (int)(pga[i]->off & ~PAGE_MASK));
1310
1311                 nob -= pga[i]->count;
1312                 pg_count--;
1313                 i++;
1314         }
1315
1316         bufsize = sizeof(*cksum);
1317         cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1318
1319         /* For sending we only compute the wrong checksum instead
1320          * of corrupting the data so it is still correct on a redo */
1321         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1322                 (*cksum)++;
1323
1324         return 0;
1325 }
1326
1327 static int osc_checksum_bulk_rw(const char *obd_name,
1328                                 enum cksum_types cksum_type,
1329                                 int nob, size_t pg_count,
1330                                 struct brw_page **pga, int opc,
1331                                 u32 *check_sum)
1332 {
1333         obd_dif_csum_fn *fn = NULL;
1334         int sector_size = 0;
1335         int rc;
1336
1337         ENTRY;
1338         obd_t10_cksum2dif(cksum_type, &fn, &sector_size);
1339
1340         if (fn)
1341                 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1342                                              opc, fn, sector_size, check_sum);
1343         else
1344                 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1345                                        check_sum);
1346
1347         RETURN(rc);
1348 }
1349
1350 static int
1351 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1352                      u32 page_count, struct brw_page **pga,
1353                      struct ptlrpc_request **reqp, int resend)
1354 {
1355         struct ptlrpc_request   *req;
1356         struct ptlrpc_bulk_desc *desc;
1357         struct ost_body         *body;
1358         struct obd_ioobj        *ioobj;
1359         struct niobuf_remote    *niobuf;
1360         int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1361         struct osc_brw_async_args *aa;
1362         struct req_capsule      *pill;
1363         struct brw_page *pg_prev;
1364         void *short_io_buf;
1365         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1366
1367         ENTRY;
1368         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1369                 RETURN(-ENOMEM); /* Recoverable */
1370         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1371                 RETURN(-EINVAL); /* Fatal */
1372
1373         if ((cmd & OBD_BRW_WRITE) != 0) {
1374                 opc = OST_WRITE;
1375                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1376                                                 osc_rq_pool,
1377                                                 &RQF_OST_BRW_WRITE);
1378         } else {
1379                 opc = OST_READ;
1380                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1381         }
1382         if (req == NULL)
1383                 RETURN(-ENOMEM);
1384
1385         for (niocount = i = 1; i < page_count; i++) {
1386                 if (!can_merge_pages(pga[i - 1], pga[i]))
1387                         niocount++;
1388         }
1389
1390         pill = &req->rq_pill;
1391         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1392                              sizeof(*ioobj));
1393         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1394                              niocount * sizeof(*niobuf));
1395
1396         for (i = 0; i < page_count; i++)
1397                 short_io_size += pga[i]->count;
1398
1399         /* Check if read/write is small enough to be a short io. */
1400         if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1401             !imp_connect_shortio(cli->cl_import))
1402                 short_io_size = 0;
1403
1404         req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1405                              opc == OST_READ ? 0 : short_io_size);
1406         if (opc == OST_READ)
1407                 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1408                                      short_io_size);
1409
1410         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1411         if (rc) {
1412                 ptlrpc_request_free(req);
1413                 RETURN(rc);
1414         }
1415         osc_set_io_portal(req);
1416
1417         ptlrpc_at_set_req_timeout(req);
1418         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1419          * retry logic */
1420         req->rq_no_retry_einprogress = 1;
1421
1422         if (short_io_size != 0) {
1423                 desc = NULL;
1424                 short_io_buf = NULL;
1425                 goto no_bulk;
1426         }
1427
1428         desc = ptlrpc_prep_bulk_imp(req, page_count,
1429                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1430                 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1431                         PTLRPC_BULK_PUT_SINK),
1432                 OST_BULK_PORTAL,
1433                 &ptlrpc_bulk_kiov_pin_ops);
1434
1435         if (desc == NULL)
1436                 GOTO(out, rc = -ENOMEM);
1437         /* NB request now owns desc and will free it when it gets freed */
1438 no_bulk:
1439         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1440         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1441         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1442         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1443
1444         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1445
1446         /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1447          * and from_kgid(), because they are asynchronous. Fortunately, variable
1448          * oa contains valid o_uid and o_gid in these two operations.
1449          * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1450          * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1451          * other process logic */
1452         body->oa.o_uid = oa->o_uid;
1453         body->oa.o_gid = oa->o_gid;
1454
1455         obdo_to_ioobj(oa, ioobj);
1456         ioobj->ioo_bufcnt = niocount;
1457         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1458          * that might be send for this request.  The actual number is decided
1459          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1460          * "max - 1" for old client compatibility sending "0", and also so the
1461          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1462         if (desc != NULL)
1463                 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1464         else /* short io */
1465                 ioobj_max_brw_set(ioobj, 0);
1466
1467         if (short_io_size != 0) {
1468                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1469                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1470                         body->oa.o_flags = 0;
1471                 }
1472                 body->oa.o_flags |= OBD_FL_SHORT_IO;
1473                 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1474                        short_io_size);
1475                 if (opc == OST_WRITE) {
1476                         short_io_buf = req_capsule_client_get(pill,
1477                                                               &RMF_SHORT_IO);
1478                         LASSERT(short_io_buf != NULL);
1479                 }
1480         }
1481
1482         LASSERT(page_count > 0);
1483         pg_prev = pga[0];
1484         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1485                 struct brw_page *pg = pga[i];
1486                 int poff = pg->off & ~PAGE_MASK;
1487
1488                 LASSERT(pg->count > 0);
1489                 /* make sure there is no gap in the middle of page array */
1490                 LASSERTF(page_count == 1 ||
1491                          (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1492                           ergo(i > 0 && i < page_count - 1,
1493                                poff == 0 && pg->count == PAGE_SIZE)   &&
1494                           ergo(i == page_count - 1, poff == 0)),
1495                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1496                          i, page_count, pg, pg->off, pg->count);
1497                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1498                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1499                          " prev_pg %p [pri %lu ind %lu] off %llu\n",
1500                          i, page_count,
1501                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1502                          pg_prev->pg, page_private(pg_prev->pg),
1503                          pg_prev->pg->index, pg_prev->off);
1504                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1505                         (pg->flag & OBD_BRW_SRVLOCK));
1506                 if (short_io_size != 0 && opc == OST_WRITE) {
1507                         unsigned char *ptr = kmap_atomic(pg->pg);
1508
1509                         LASSERT(short_io_size >= requested_nob + pg->count);
1510                         memcpy(short_io_buf + requested_nob,
1511                                ptr + poff,
1512                                pg->count);
1513                         kunmap_atomic(ptr);
1514                 } else if (short_io_size == 0) {
1515                         desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1516                                                          pg->count);
1517                 }
1518                 requested_nob += pg->count;
1519
1520                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1521                         niobuf--;
1522                         niobuf->rnb_len += pg->count;
1523                 } else {
1524                         niobuf->rnb_offset = pg->off;
1525                         niobuf->rnb_len    = pg->count;
1526                         niobuf->rnb_flags  = pg->flag;
1527                 }
1528                 pg_prev = pg;
1529         }
1530
1531         LASSERTF((void *)(niobuf - niocount) ==
1532                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1533                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1534                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1535
1536         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1537         if (resend) {
1538                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1539                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1540                         body->oa.o_flags = 0;
1541                 }
1542                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1543         }
1544
1545         if (osc_should_shrink_grant(cli))
1546                 osc_shrink_grant_local(cli, &body->oa);
1547
1548         /* size[REQ_REC_OFF] still sizeof (*body) */
1549         if (opc == OST_WRITE) {
1550                 if (cli->cl_checksum &&
1551                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1552                         /* store cl_cksum_type in a local variable since
1553                          * it can be changed via lprocfs */
1554                         enum cksum_types cksum_type = cli->cl_cksum_type;
1555
1556                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1557                                 body->oa.o_flags = 0;
1558
1559                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1560                                                                 cksum_type);
1561                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1562
1563                         rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1564                                                   requested_nob, page_count,
1565                                                   pga, OST_WRITE,
1566                                                   &body->oa.o_cksum);
1567                         if (rc < 0) {
1568                                 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1569                                        rc);
1570                                 GOTO(out, rc);
1571                         }
1572                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1573                                body->oa.o_cksum);
1574
1575                         /* save this in 'oa', too, for later checking */
1576                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1577                         oa->o_flags |= obd_cksum_type_pack(obd_name,
1578                                                            cksum_type);
1579                 } else {
1580                         /* clear out the checksum flag, in case this is a
1581                          * resend but cl_checksum is no longer set. b=11238 */
1582                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1583                 }
1584                 oa->o_cksum = body->oa.o_cksum;
1585                 /* 1 RC per niobuf */
1586                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1587                                      sizeof(__u32) * niocount);
1588         } else {
1589                 if (cli->cl_checksum &&
1590                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1591                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1592                                 body->oa.o_flags = 0;
1593                         body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1594                                 cli->cl_cksum_type);
1595                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1596                 }
1597
1598                 /* Client cksum has been already copied to wire obdo in previous
1599                  * lustre_set_wire_obdo(), and in the case a bulk-read is being
1600                  * resent due to cksum error, this will allow Server to
1601                  * check+dump pages on its side */
1602         }
1603         ptlrpc_request_set_replen(req);
1604
1605         aa = ptlrpc_req_async_args(aa, req);
1606         aa->aa_oa = oa;
1607         aa->aa_requested_nob = requested_nob;
1608         aa->aa_nio_count = niocount;
1609         aa->aa_page_count = page_count;
1610         aa->aa_resends = 0;
1611         aa->aa_ppga = pga;
1612         aa->aa_cli = cli;
1613         INIT_LIST_HEAD(&aa->aa_oaps);
1614
1615         *reqp = req;
1616         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1617         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1618                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1619                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1620         RETURN(0);
1621
1622  out:
1623         ptlrpc_req_finished(req);
1624         RETURN(rc);
1625 }
1626
1627 char dbgcksum_file_name[PATH_MAX];
1628
1629 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1630                                 struct brw_page **pga, __u32 server_cksum,
1631                                 __u32 client_cksum)
1632 {
1633         struct file *filp;
1634         int rc, i;
1635         unsigned int len;
1636         char *buf;
1637
1638         /* will only keep dump of pages on first error for the same range in
1639          * file/fid, not during the resends/retries. */
1640         snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1641                  "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1642                  (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1643                   libcfs_debug_file_path_arr :
1644                   LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1645                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1646                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1647                  oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1648                  pga[0]->off,
1649                  pga[page_count-1]->off + pga[page_count-1]->count - 1,
1650                  client_cksum, server_cksum);
1651         filp = filp_open(dbgcksum_file_name,
1652                          O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1653         if (IS_ERR(filp)) {
1654                 rc = PTR_ERR(filp);
1655                 if (rc == -EEXIST)
1656                         CDEBUG(D_INFO, "%s: can't open to dump pages with "
1657                                "checksum error: rc = %d\n", dbgcksum_file_name,
1658                                rc);
1659                 else
1660                         CERROR("%s: can't open to dump pages with checksum "
1661                                "error: rc = %d\n", dbgcksum_file_name, rc);
1662                 return;
1663         }
1664
1665         for (i = 0; i < page_count; i++) {
1666                 len = pga[i]->count;
1667                 buf = kmap(pga[i]->pg);
1668                 while (len != 0) {
1669                         rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1670                         if (rc < 0) {
1671                                 CERROR("%s: wanted to write %u but got %d "
1672                                        "error\n", dbgcksum_file_name, len, rc);
1673                                 break;
1674                         }
1675                         len -= rc;
1676                         buf += rc;
1677                         CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1678                                dbgcksum_file_name, rc);
1679                 }
1680                 kunmap(pga[i]->pg);
1681         }
1682
1683         rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1684         if (rc)
1685                 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1686         filp_close(filp, NULL);
1687 }
1688
1689 static int
1690 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1691                      __u32 client_cksum, __u32 server_cksum,
1692                      struct osc_brw_async_args *aa)
1693 {
1694         const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1695         enum cksum_types cksum_type;
1696         obd_dif_csum_fn *fn = NULL;
1697         int sector_size = 0;
1698         __u32 new_cksum;
1699         char *msg;
1700         int rc;
1701
1702         if (server_cksum == client_cksum) {
1703                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1704                 return 0;
1705         }
1706
1707         if (aa->aa_cli->cl_checksum_dump)
1708                 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1709                                     server_cksum, client_cksum);
1710
1711         cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1712                                            oa->o_flags : 0);
1713
1714         switch (cksum_type) {
1715         case OBD_CKSUM_T10IP512:
1716                 fn = obd_dif_ip_fn;
1717                 sector_size = 512;
1718                 break;
1719         case OBD_CKSUM_T10IP4K:
1720                 fn = obd_dif_ip_fn;
1721                 sector_size = 4096;
1722                 break;
1723         case OBD_CKSUM_T10CRC512:
1724                 fn = obd_dif_crc_fn;
1725                 sector_size = 512;
1726                 break;
1727         case OBD_CKSUM_T10CRC4K:
1728                 fn = obd_dif_crc_fn;
1729                 sector_size = 4096;
1730                 break;
1731         default:
1732                 break;
1733         }
1734
1735         if (fn)
1736                 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1737                                              aa->aa_page_count, aa->aa_ppga,
1738                                              OST_WRITE, fn, sector_size,
1739                                              &new_cksum);
1740         else
1741                 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1742                                        aa->aa_ppga, OST_WRITE, cksum_type,
1743                                        &new_cksum);
1744
1745         if (rc < 0)
1746                 msg = "failed to calculate the client write checksum";
1747         else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1748                 msg = "the server did not use the checksum type specified in "
1749                       "the original request - likely a protocol problem";
1750         else if (new_cksum == server_cksum)
1751                 msg = "changed on the client after we checksummed it - "
1752                       "likely false positive due to mmap IO (bug 11742)";
1753         else if (new_cksum == client_cksum)
1754                 msg = "changed in transit before arrival at OST";
1755         else
1756                 msg = "changed in transit AND doesn't match the original - "
1757                       "likely false positive due to mmap IO (bug 11742)";
1758
1759         LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1760                            DFID " object "DOSTID" extent [%llu-%llu], original "
1761                            "client csum %x (type %x), server csum %x (type %x),"
1762                            " client csum now %x\n",
1763                            obd_name, msg, libcfs_nid2str(peer->nid),
1764                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1765                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1766                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1767                            POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1768                            aa->aa_ppga[aa->aa_page_count - 1]->off +
1769                                 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1770                            client_cksum,
1771                            obd_cksum_type_unpack(aa->aa_oa->o_flags),
1772                            server_cksum, cksum_type, new_cksum);
1773         return 1;
1774 }
1775
1776 /* Note rc enters this function as number of bytes transferred */
1777 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1778 {
1779         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1780         struct client_obd *cli = aa->aa_cli;
1781         const char *obd_name = cli->cl_import->imp_obd->obd_name;
1782         const struct lnet_process_id *peer =
1783                 &req->rq_import->imp_connection->c_peer;
1784         struct ost_body *body;
1785         u32 client_cksum = 0;
1786
1787         ENTRY;
1788
1789         if (rc < 0 && rc != -EDQUOT) {
1790                 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1791                 RETURN(rc);
1792         }
1793
1794         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1795         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1796         if (body == NULL) {
1797                 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1798                 RETURN(-EPROTO);
1799         }
1800
1801         /* set/clear over quota flag for a uid/gid/projid */
1802         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1803             body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1804                 unsigned qid[LL_MAXQUOTAS] = {
1805                                          body->oa.o_uid, body->oa.o_gid,
1806                                          body->oa.o_projid };
1807                 CDEBUG(D_QUOTA,
1808                        "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1809                        body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1810                        body->oa.o_valid, body->oa.o_flags);
1811                        osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1812                                        body->oa.o_flags);
1813         }
1814
1815         osc_update_grant(cli, body);
1816
1817         if (rc < 0)
1818                 RETURN(rc);
1819
1820         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1821                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1822
1823         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1824                 if (rc > 0) {
1825                         CERROR("%s: unexpected positive size %d\n",
1826                                obd_name, rc);
1827                         RETURN(-EPROTO);
1828                 }
1829
1830                 if (req->rq_bulk != NULL &&
1831                     sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1832                         RETURN(-EAGAIN);
1833
1834                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1835                     check_write_checksum(&body->oa, peer, client_cksum,
1836                                          body->oa.o_cksum, aa))
1837                         RETURN(-EAGAIN);
1838
1839                 rc = check_write_rcs(req, aa->aa_requested_nob,
1840                                      aa->aa_nio_count, aa->aa_page_count,
1841                                      aa->aa_ppga);
1842                 GOTO(out, rc);
1843         }
1844
1845         /* The rest of this function executes only for OST_READs */
1846
1847         if (req->rq_bulk == NULL) {
1848                 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1849                                           RCL_SERVER);
1850                 LASSERT(rc == req->rq_status);
1851         } else {
1852                 /* if unwrap_bulk failed, return -EAGAIN to retry */
1853                 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1854         }
1855         if (rc < 0)
1856                 GOTO(out, rc = -EAGAIN);
1857
1858         if (rc > aa->aa_requested_nob) {
1859                 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1860                        rc, aa->aa_requested_nob);
1861                 RETURN(-EPROTO);
1862         }
1863
1864         if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1865                 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1866                        rc, req->rq_bulk->bd_nob_transferred);
1867                 RETURN(-EPROTO);
1868         }
1869
1870         if (req->rq_bulk == NULL) {
1871                 /* short io */
1872                 int nob, pg_count, i = 0;
1873                 unsigned char *buf;
1874
1875                 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1876                 pg_count = aa->aa_page_count;
1877                 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1878                                                    rc);
1879                 nob = rc;
1880                 while (nob > 0 && pg_count > 0) {
1881                         unsigned char *ptr;
1882                         int count = aa->aa_ppga[i]->count > nob ?
1883                                     nob : aa->aa_ppga[i]->count;
1884
1885                         CDEBUG(D_CACHE, "page %p count %d\n",
1886                                aa->aa_ppga[i]->pg, count);
1887                         ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1888                         memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1889                                count);
1890                         kunmap_atomic((void *) ptr);
1891
1892                         buf += count;
1893                         nob -= count;
1894                         i++;
1895                         pg_count--;
1896                 }
1897         }
1898
1899         if (rc < aa->aa_requested_nob)
1900                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1901
1902         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1903                 static int cksum_counter;
1904                 u32        server_cksum = body->oa.o_cksum;
1905                 char      *via = "";
1906                 char      *router = "";
1907                 enum cksum_types cksum_type;
1908                 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1909                         body->oa.o_flags : 0;
1910
1911                 cksum_type = obd_cksum_type_unpack(o_flags);
1912                 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1913                                           aa->aa_page_count, aa->aa_ppga,
1914                                           OST_READ, &client_cksum);
1915                 if (rc < 0)
1916                         GOTO(out, rc);
1917
1918                 if (req->rq_bulk != NULL &&
1919                     peer->nid != req->rq_bulk->bd_sender) {
1920                         via = " via ";
1921                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1922                 }
1923
1924                 if (server_cksum != client_cksum) {
1925                         struct ost_body *clbody;
1926                         u32 page_count = aa->aa_page_count;
1927
1928                         clbody = req_capsule_client_get(&req->rq_pill,
1929                                                         &RMF_OST_BODY);
1930                         if (cli->cl_checksum_dump)
1931                                 dump_all_bulk_pages(&clbody->oa, page_count,
1932                                                     aa->aa_ppga, server_cksum,
1933                                                     client_cksum);
1934
1935                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1936                                            "%s%s%s inode "DFID" object "DOSTID
1937                                            " extent [%llu-%llu], client %x, "
1938                                            "server %x, cksum_type %x\n",
1939                                            obd_name,
1940                                            libcfs_nid2str(peer->nid),
1941                                            via, router,
1942                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1943                                                 clbody->oa.o_parent_seq : 0ULL,
1944                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1945                                                 clbody->oa.o_parent_oid : 0,
1946                                            clbody->oa.o_valid & OBD_MD_FLFID ?
1947                                                 clbody->oa.o_parent_ver : 0,
1948                                            POSTID(&body->oa.o_oi),
1949                                            aa->aa_ppga[0]->off,
1950                                            aa->aa_ppga[page_count-1]->off +
1951                                            aa->aa_ppga[page_count-1]->count - 1,
1952                                            client_cksum, server_cksum,
1953                                            cksum_type);
1954                         cksum_counter = 0;
1955                         aa->aa_oa->o_cksum = client_cksum;
1956                         rc = -EAGAIN;
1957                 } else {
1958                         cksum_counter++;
1959                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1960                         rc = 0;
1961                 }
1962         } else if (unlikely(client_cksum)) {
1963                 static int cksum_missed;
1964
1965                 cksum_missed++;
1966                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1967                         CERROR("%s: checksum %u requested from %s but not sent\n",
1968                                obd_name, cksum_missed,
1969                                libcfs_nid2str(peer->nid));
1970         } else {
1971                 rc = 0;
1972         }
1973 out:
1974         if (rc >= 0)
1975                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1976                                      aa->aa_oa, &body->oa);
1977
1978         RETURN(rc);
1979 }
1980
1981 static int osc_brw_redo_request(struct ptlrpc_request *request,
1982                                 struct osc_brw_async_args *aa, int rc)
1983 {
1984         struct ptlrpc_request *new_req;
1985         struct osc_brw_async_args *new_aa;
1986         struct osc_async_page *oap;
1987         ENTRY;
1988
1989         /* The below message is checked in replay-ost-single.sh test_8ae*/
1990         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1991                   "redo for recoverable error %d", rc);
1992
1993         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1994                                 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1995                                   aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1996                                   aa->aa_ppga, &new_req, 1);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2001                 if (oap->oap_request != NULL) {
2002                         LASSERTF(request == oap->oap_request,
2003                                  "request %p != oap_request %p\n",
2004                                  request, oap->oap_request);
2005                 }
2006         }
2007         /*
2008          * New request takes over pga and oaps from old request.
2009          * Note that copying a list_head doesn't work, need to move it...
2010          */
2011         aa->aa_resends++;
2012         new_req->rq_interpret_reply = request->rq_interpret_reply;
2013         new_req->rq_async_args = request->rq_async_args;
2014         new_req->rq_commit_cb = request->rq_commit_cb;
2015         /* cap resend delay to the current request timeout, this is similar to
2016          * what ptlrpc does (see after_reply()) */
2017         if (aa->aa_resends > new_req->rq_timeout)
2018                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2019         else
2020                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2021         new_req->rq_generation_set = 1;
2022         new_req->rq_import_generation = request->rq_import_generation;
2023
2024         new_aa = ptlrpc_req_async_args(new_aa, new_req);
2025
2026         INIT_LIST_HEAD(&new_aa->aa_oaps);
2027         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2028         INIT_LIST_HEAD(&new_aa->aa_exts);
2029         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2030         new_aa->aa_resends = aa->aa_resends;
2031
2032         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2033                 if (oap->oap_request) {
2034                         ptlrpc_req_finished(oap->oap_request);
2035                         oap->oap_request = ptlrpc_request_addref(new_req);
2036                 }
2037         }
2038
2039         /* XXX: This code will run into problem if we're going to support
2040          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2041          * and wait for all of them to be finished. We should inherit request
2042          * set from old request. */
2043         ptlrpcd_add_req(new_req);
2044
2045         DEBUG_REQ(D_INFO, new_req, "new request");
2046         RETURN(0);
2047 }
2048
2049 /*
2050  * ugh, we want disk allocation on the target to happen in offset order.  we'll
2051  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2052  * fine for our small page arrays and doesn't require allocation.  its an
2053  * insertion sort that swaps elements that are strides apart, shrinking the
2054  * stride down until its '1' and the array is sorted.
2055  */
2056 static void sort_brw_pages(struct brw_page **array, int num)
2057 {
2058         int stride, i, j;
2059         struct brw_page *tmp;
2060
2061         if (num == 1)
2062                 return;
2063         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2064                 ;
2065
2066         do {
2067                 stride /= 3;
2068                 for (i = stride ; i < num ; i++) {
2069                         tmp = array[i];
2070                         j = i;
2071                         while (j >= stride && array[j - stride]->off > tmp->off) {
2072                                 array[j] = array[j - stride];
2073                                 j -= stride;
2074                         }
2075                         array[j] = tmp;
2076                 }
2077         } while (stride > 1);
2078 }
2079
2080 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2081 {
2082         LASSERT(ppga != NULL);
2083         OBD_FREE_PTR_ARRAY(ppga, count);
2084 }
2085
2086 static int brw_interpret(const struct lu_env *env,
2087                          struct ptlrpc_request *req, void *args, int rc)
2088 {
2089         struct osc_brw_async_args *aa = args;
2090         struct osc_extent *ext;
2091         struct osc_extent *tmp;
2092         struct client_obd *cli = aa->aa_cli;
2093         unsigned long transferred = 0;
2094
2095         ENTRY;
2096
2097         rc = osc_brw_fini_request(req, rc);
2098         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2099         /*
2100          * When server returns -EINPROGRESS, client should always retry
2101          * regardless of the number of times the bulk was resent already.
2102          */
2103         if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2104                 if (req->rq_import_generation !=
2105                     req->rq_import->imp_generation) {
2106                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2107                                ""DOSTID", rc = %d.\n",
2108                                req->rq_import->imp_obd->obd_name,
2109                                POSTID(&aa->aa_oa->o_oi), rc);
2110                 } else if (rc == -EINPROGRESS ||
2111                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2112                         rc = osc_brw_redo_request(req, aa, rc);
2113                 } else {
2114                         CERROR("%s: too many resent retries for object: "
2115                                "%llu:%llu, rc = %d.\n",
2116                                req->rq_import->imp_obd->obd_name,
2117                                POSTID(&aa->aa_oa->o_oi), rc);
2118                 }
2119
2120                 if (rc == 0)
2121                         RETURN(0);
2122                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2123                         rc = -EIO;
2124         }
2125
2126         if (rc == 0) {
2127                 struct obdo *oa = aa->aa_oa;
2128                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2129                 unsigned long valid = 0;
2130                 struct cl_object *obj;
2131                 struct osc_async_page *last;
2132
2133                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2134                 obj = osc2cl(last->oap_obj);
2135
2136                 cl_object_attr_lock(obj);
2137                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2138                         attr->cat_blocks = oa->o_blocks;
2139                         valid |= CAT_BLOCKS;
2140                 }
2141                 if (oa->o_valid & OBD_MD_FLMTIME) {
2142                         attr->cat_mtime = oa->o_mtime;
2143                         valid |= CAT_MTIME;
2144                 }
2145                 if (oa->o_valid & OBD_MD_FLATIME) {
2146                         attr->cat_atime = oa->o_atime;
2147                         valid |= CAT_ATIME;
2148                 }
2149                 if (oa->o_valid & OBD_MD_FLCTIME) {
2150                         attr->cat_ctime = oa->o_ctime;
2151                         valid |= CAT_CTIME;
2152                 }
2153
2154                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2155                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2156                         loff_t last_off = last->oap_count + last->oap_obj_off +
2157                                 last->oap_page_off;
2158
2159                         /* Change file size if this is an out of quota or
2160                          * direct IO write and it extends the file size */
2161                         if (loi->loi_lvb.lvb_size < last_off) {
2162                                 attr->cat_size = last_off;
2163                                 valid |= CAT_SIZE;
2164                         }
2165                         /* Extend KMS if it's not a lockless write */
2166                         if (loi->loi_kms < last_off &&
2167                             oap2osc_page(last)->ops_srvlock == 0) {
2168                                 attr->cat_kms = last_off;
2169                                 valid |= CAT_KMS;
2170                         }
2171                 }
2172
2173                 if (valid != 0)
2174                         cl_object_attr_update(env, obj, attr, valid);
2175                 cl_object_attr_unlock(obj);
2176         }
2177         OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2178         aa->aa_oa = NULL;
2179
2180         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2181                 osc_inc_unstable_pages(req);
2182
2183         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2184                 list_del_init(&ext->oe_link);
2185                 osc_extent_finish(env, ext, 1,
2186                                   rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2187         }
2188         LASSERT(list_empty(&aa->aa_exts));
2189         LASSERT(list_empty(&aa->aa_oaps));
2190
2191         transferred = (req->rq_bulk == NULL ? /* short io */
2192                        aa->aa_requested_nob :
2193                        req->rq_bulk->bd_nob_transferred);
2194
2195         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2196         ptlrpc_lprocfs_brw(req, transferred);
2197
2198         spin_lock(&cli->cl_loi_list_lock);
2199         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2200          * is called so we know whether to go to sync BRWs or wait for more
2201          * RPCs to complete */
2202         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2203                 cli->cl_w_in_flight--;
2204         else
2205                 cli->cl_r_in_flight--;
2206         osc_wake_cache_waiters(cli);
2207         spin_unlock(&cli->cl_loi_list_lock);
2208
2209         osc_io_unplug(env, cli, NULL);
2210         RETURN(rc);
2211 }
2212
2213 static void brw_commit(struct ptlrpc_request *req)
2214 {
2215         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2216          * this called via the rq_commit_cb, I need to ensure
2217          * osc_dec_unstable_pages is still called. Otherwise unstable
2218          * pages may be leaked. */
2219         spin_lock(&req->rq_lock);
2220         if (likely(req->rq_unstable)) {
2221                 req->rq_unstable = 0;
2222                 spin_unlock(&req->rq_lock);
2223
2224                 osc_dec_unstable_pages(req);
2225         } else {
2226                 req->rq_committed = 1;
2227                 spin_unlock(&req->rq_lock);
2228         }
2229 }
2230
2231 /**
2232  * Build an RPC by the list of extent @ext_list. The caller must ensure
2233  * that the total pages in this list are NOT over max pages per RPC.
2234  * Extents in the list must be in OES_RPC state.
2235  */
2236 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2237                   struct list_head *ext_list, int cmd)
2238 {
2239         struct ptlrpc_request           *req = NULL;
2240         struct osc_extent               *ext;
2241         struct brw_page                 **pga = NULL;
2242         struct osc_brw_async_args       *aa = NULL;
2243         struct obdo                     *oa = NULL;
2244         struct osc_async_page           *oap;
2245         struct osc_object               *obj = NULL;
2246         struct cl_req_attr              *crattr = NULL;
2247         loff_t                          starting_offset = OBD_OBJECT_EOF;
2248         loff_t                          ending_offset = 0;
2249         /* '1' for consistency with code that checks !mpflag to restore */
2250         int mpflag = 1;
2251         int                             mem_tight = 0;
2252         int                             page_count = 0;
2253         bool                            soft_sync = false;
2254         bool                            ndelay = false;
2255         int                             i;
2256         int                             grant = 0;
2257         int                             rc;
2258         __u32                           layout_version = 0;
2259         LIST_HEAD(rpc_list);
2260         struct ost_body                 *body;
2261         ENTRY;
2262         LASSERT(!list_empty(ext_list));
2263
2264         /* add pages into rpc_list to build BRW rpc */
2265         list_for_each_entry(ext, ext_list, oe_link) {
2266                 LASSERT(ext->oe_state == OES_RPC);
2267                 mem_tight |= ext->oe_memalloc;
2268                 grant += ext->oe_grants;
2269                 page_count += ext->oe_nr_pages;
2270                 layout_version = max(layout_version, ext->oe_layout_version);
2271                 if (obj == NULL)
2272                         obj = ext->oe_obj;
2273         }
2274
2275         soft_sync = osc_over_unstable_soft_limit(cli);
2276         if (mem_tight)
2277                 mpflag = memalloc_noreclaim_save();
2278
2279         OBD_ALLOC_PTR_ARRAY(pga, page_count);
2280         if (pga == NULL)
2281                 GOTO(out, rc = -ENOMEM);
2282
2283         OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2284         if (oa == NULL)
2285                 GOTO(out, rc = -ENOMEM);
2286
2287         i = 0;
2288         list_for_each_entry(ext, ext_list, oe_link) {
2289                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2290                         if (mem_tight)
2291                                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2292                         if (soft_sync)
2293                                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2294                         pga[i] = &oap->oap_brw_page;
2295                         pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2296                         i++;
2297
2298                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2299                         if (starting_offset == OBD_OBJECT_EOF ||
2300                             starting_offset > oap->oap_obj_off)
2301                                 starting_offset = oap->oap_obj_off;
2302                         else
2303                                 LASSERT(oap->oap_page_off == 0);
2304                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2305                                 ending_offset = oap->oap_obj_off +
2306                                                 oap->oap_count;
2307                         else
2308                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2309                                         PAGE_SIZE);
2310                 }
2311                 if (ext->oe_ndelay)
2312                         ndelay = true;
2313         }
2314
2315         /* first page in the list */
2316         oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2317
2318         crattr = &osc_env_info(env)->oti_req_attr;
2319         memset(crattr, 0, sizeof(*crattr));
2320         crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2321         crattr->cra_flags = ~0ULL;
2322         crattr->cra_page = oap2cl_page(oap);
2323         crattr->cra_oa = oa;
2324         cl_req_attr_set(env, osc2cl(obj), crattr);
2325
2326         if (cmd == OBD_BRW_WRITE) {
2327                 oa->o_grant_used = grant;
2328                 if (layout_version > 0) {
2329                         CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2330                                PFID(&oa->o_oi.oi_fid), layout_version);
2331
2332                         oa->o_layout_version = layout_version;
2333                         oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2334                 }
2335         }
2336
2337         sort_brw_pages(pga, page_count);
2338         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2339         if (rc != 0) {
2340                 CERROR("prep_req failed: %d\n", rc);
2341                 GOTO(out, rc);
2342         }
2343
2344         req->rq_commit_cb = brw_commit;
2345         req->rq_interpret_reply = brw_interpret;
2346         req->rq_memalloc = mem_tight != 0;
2347         oap->oap_request = ptlrpc_request_addref(req);
2348         if (ndelay) {
2349                 req->rq_no_resend = req->rq_no_delay = 1;
2350                 /* probably set a shorter timeout value.
2351                  * to handle ETIMEDOUT in brw_interpret() correctly. */
2352                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2353         }
2354
2355         /* Need to update the timestamps after the request is built in case
2356          * we race with setattr (locally or in queue at OST).  If OST gets
2357          * later setattr before earlier BRW (as determined by the request xid),
2358          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2359          * way to do this in a single call.  bug 10150 */
2360         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2361         crattr->cra_oa = &body->oa;
2362         crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2363         cl_req_attr_set(env, osc2cl(obj), crattr);
2364         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2365
2366         aa = ptlrpc_req_async_args(aa, req);
2367         INIT_LIST_HEAD(&aa->aa_oaps);
2368         list_splice_init(&rpc_list, &aa->aa_oaps);
2369         INIT_LIST_HEAD(&aa->aa_exts);
2370         list_splice_init(ext_list, &aa->aa_exts);
2371
2372         spin_lock(&cli->cl_loi_list_lock);
2373         starting_offset >>= PAGE_SHIFT;
2374         if (cmd == OBD_BRW_READ) {
2375                 cli->cl_r_in_flight++;
2376                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2377                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2378                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2379                                       starting_offset + 1);
2380         } else {
2381                 cli->cl_w_in_flight++;
2382                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2383                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2384                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2385                                       starting_offset + 1);
2386         }
2387         spin_unlock(&cli->cl_loi_list_lock);
2388
2389         DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2390                   page_count, aa, cli->cl_r_in_flight,
2391                   cli->cl_w_in_flight);
2392         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2393
2394         ptlrpcd_add_req(req);
2395         rc = 0;
2396         EXIT;
2397
2398 out:
2399         if (mem_tight)
2400                 memalloc_noreclaim_restore(mpflag);
2401
2402         if (rc != 0) {
2403                 LASSERT(req == NULL);
2404
2405                 if (oa)
2406                         OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2407                 if (pga)
2408                         OBD_FREE_PTR_ARRAY(pga, page_count);
2409                 /* this should happen rarely and is pretty bad, it makes the
2410                  * pending list not follow the dirty order */
2411                 while (!list_empty(ext_list)) {
2412                         ext = list_entry(ext_list->next, struct osc_extent,
2413                                          oe_link);
2414                         list_del_init(&ext->oe_link);
2415                         osc_extent_finish(env, ext, 0, rc);
2416                 }
2417         }
2418         RETURN(rc);
2419 }
2420
2421 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2422 {
2423         int set = 0;
2424
2425         LASSERT(lock != NULL);
2426
2427         lock_res_and_lock(lock);
2428
2429         if (lock->l_ast_data == NULL)
2430                 lock->l_ast_data = data;
2431         if (lock->l_ast_data == data)
2432                 set = 1;
2433
2434         unlock_res_and_lock(lock);
2435
2436         return set;
2437 }
2438
2439 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2440                      void *cookie, struct lustre_handle *lockh,
2441                      enum ldlm_mode mode, __u64 *flags, bool speculative,
2442                      int errcode)
2443 {
2444         bool intent = *flags & LDLM_FL_HAS_INTENT;
2445         int rc;
2446         ENTRY;
2447
2448         /* The request was created before ldlm_cli_enqueue call. */
2449         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2450                 struct ldlm_reply *rep;
2451
2452                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2453                 LASSERT(rep != NULL);
2454
2455                 rep->lock_policy_res1 =
2456                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2457                 if (rep->lock_policy_res1)
2458                         errcode = rep->lock_policy_res1;
2459                 if (!speculative)
2460                         *flags |= LDLM_FL_LVB_READY;
2461         } else if (errcode == ELDLM_OK) {
2462                 *flags |= LDLM_FL_LVB_READY;
2463         }
2464
2465         /* Call the update callback. */
2466         rc = (*upcall)(cookie, lockh, errcode);
2467
2468         /* release the reference taken in ldlm_cli_enqueue() */
2469         if (errcode == ELDLM_LOCK_MATCHED)
2470                 errcode = ELDLM_OK;
2471         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2472                 ldlm_lock_decref(lockh, mode);
2473
2474         RETURN(rc);
2475 }
2476
2477 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2478                           void *args, int rc)
2479 {
2480         struct osc_enqueue_args *aa = args;
2481         struct ldlm_lock *lock;
2482         struct lustre_handle *lockh = &aa->oa_lockh;
2483         enum ldlm_mode mode = aa->oa_mode;
2484         struct ost_lvb *lvb = aa->oa_lvb;
2485         __u32 lvb_len = sizeof(*lvb);
2486         __u64 flags = 0;
2487
2488         ENTRY;
2489
2490         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2491          * be valid. */
2492         lock = ldlm_handle2lock(lockh);
2493         LASSERTF(lock != NULL,
2494                  "lockh %#llx, req %p, aa %p - client evicted?\n",
2495                  lockh->cookie, req, aa);
2496
2497         /* Take an additional reference so that a blocking AST that
2498          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2499          * to arrive after an upcall has been executed by
2500          * osc_enqueue_fini(). */
2501         ldlm_lock_addref(lockh, mode);
2502
2503         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2504         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2505
2506         /* Let CP AST to grant the lock first. */
2507         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2508
2509         if (aa->oa_speculative) {
2510                 LASSERT(aa->oa_lvb == NULL);
2511                 LASSERT(aa->oa_flags == NULL);
2512                 aa->oa_flags = &flags;
2513         }
2514
2515         /* Complete obtaining the lock procedure. */
2516         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2517                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2518                                    lockh, rc);
2519         /* Complete osc stuff. */
2520         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2521                               aa->oa_flags, aa->oa_speculative, rc);
2522
2523         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2524
2525         ldlm_lock_decref(lockh, mode);
2526         LDLM_LOCK_PUT(lock);
2527         RETURN(rc);
2528 }
2529
2530 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2531  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2532  * other synchronous requests, however keeping some locks and trying to obtain
2533  * others may take a considerable amount of time in a case of ost failure; and
2534  * when other sync requests do not get released lock from a client, the client
2535  * is evicted from the cluster -- such scenarious make the life difficult, so
2536  * release locks just after they are obtained. */
2537 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2538                      __u64 *flags, union ldlm_policy_data *policy,
2539                      struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2540                      void *cookie, struct ldlm_enqueue_info *einfo,
2541                      struct ptlrpc_request_set *rqset, int async,
2542                      bool speculative)
2543 {
2544         struct obd_device *obd = exp->exp_obd;
2545         struct lustre_handle lockh = { 0 };
2546         struct ptlrpc_request *req = NULL;
2547         int intent = *flags & LDLM_FL_HAS_INTENT;
2548         __u64 match_flags = *flags;
2549         enum ldlm_mode mode;
2550         int rc;
2551         ENTRY;
2552
2553         /* Filesystem lock extents are extended to page boundaries so that
2554          * dealing with the page cache is a little smoother.  */
2555         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2556         policy->l_extent.end |= ~PAGE_MASK;
2557
2558         /* Next, search for already existing extent locks that will cover us */
2559         /* If we're trying to read, we also search for an existing PW lock.  The
2560          * VFS and page cache already protect us locally, so lots of readers/
2561          * writers can share a single PW lock.
2562          *
2563          * There are problems with conversion deadlocks, so instead of
2564          * converting a read lock to a write lock, we'll just enqueue a new
2565          * one.
2566          *
2567          * At some point we should cancel the read lock instead of making them
2568          * send us a blocking callback, but there are problems with canceling
2569          * locks out from other users right now, too. */
2570         mode = einfo->ei_mode;
2571         if (einfo->ei_mode == LCK_PR)
2572                 mode |= LCK_PW;
2573         /* Normal lock requests must wait for the LVB to be ready before
2574          * matching a lock; speculative lock requests do not need to,
2575          * because they will not actually use the lock. */
2576         if (!speculative)
2577                 match_flags |= LDLM_FL_LVB_READY;
2578         if (intent != 0)
2579                 match_flags |= LDLM_FL_BLOCK_GRANTED;
2580         mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2581                                einfo->ei_type, policy, mode, &lockh, 0);
2582         if (mode) {
2583                 struct ldlm_lock *matched;
2584
2585                 if (*flags & LDLM_FL_TEST_LOCK)
2586                         RETURN(ELDLM_OK);
2587
2588                 matched = ldlm_handle2lock(&lockh);
2589                 if (speculative) {
2590                         /* This DLM lock request is speculative, and does not
2591                          * have an associated IO request. Therefore if there
2592                          * is already a DLM lock, it wll just inform the
2593                          * caller to cancel the request for this stripe.*/
2594                         lock_res_and_lock(matched);
2595                         if (ldlm_extent_equal(&policy->l_extent,
2596                             &matched->l_policy_data.l_extent))
2597                                 rc = -EEXIST;
2598                         else
2599                                 rc = -ECANCELED;
2600                         unlock_res_and_lock(matched);
2601
2602                         ldlm_lock_decref(&lockh, mode);
2603                         LDLM_LOCK_PUT(matched);
2604                         RETURN(rc);
2605                 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2606                         *flags |= LDLM_FL_LVB_READY;
2607
2608                         /* We already have a lock, and it's referenced. */
2609                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2610
2611                         ldlm_lock_decref(&lockh, mode);
2612                         LDLM_LOCK_PUT(matched);
2613                         RETURN(ELDLM_OK);
2614                 } else {
2615                         ldlm_lock_decref(&lockh, mode);
2616                         LDLM_LOCK_PUT(matched);
2617                 }
2618         }
2619
2620         if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2621                 RETURN(-ENOLCK);
2622
2623         if (intent) {
2624                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2625                                            &RQF_LDLM_ENQUEUE_LVB);
2626                 if (req == NULL)
2627                         RETURN(-ENOMEM);
2628
2629                 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2630                 if (rc) {
2631                         ptlrpc_request_free(req);
2632                         RETURN(rc);
2633                 }
2634
2635                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2636                                      sizeof *lvb);
2637                 ptlrpc_request_set_replen(req);
2638         }
2639
2640         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2641         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2642
2643         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2644                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2645         if (async) {
2646                 if (!rc) {
2647                         struct osc_enqueue_args *aa;
2648                         aa = ptlrpc_req_async_args(aa, req);
2649                         aa->oa_exp         = exp;
2650                         aa->oa_mode        = einfo->ei_mode;
2651                         aa->oa_type        = einfo->ei_type;
2652                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2653                         aa->oa_upcall      = upcall;
2654                         aa->oa_cookie      = cookie;
2655                         aa->oa_speculative = speculative;
2656                         if (!speculative) {
2657                                 aa->oa_flags  = flags;
2658                                 aa->oa_lvb    = lvb;
2659                         } else {
2660                                 /* speculative locks are essentially to enqueue
2661                                  * a DLM lock  in advance, so we don't care
2662                                  * about the result of the enqueue. */
2663                                 aa->oa_lvb    = NULL;
2664                                 aa->oa_flags  = NULL;
2665                         }
2666
2667                         req->rq_interpret_reply = osc_enqueue_interpret;
2668                         ptlrpc_set_add_req(rqset, req);
2669                 } else if (intent) {
2670                         ptlrpc_req_finished(req);
2671                 }
2672                 RETURN(rc);
2673         }
2674
2675         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2676                               flags, speculative, rc);
2677         if (intent)
2678                 ptlrpc_req_finished(req);
2679
2680         RETURN(rc);
2681 }
2682
2683 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2684                    struct ldlm_res_id *res_id, enum ldlm_type type,
2685                    union ldlm_policy_data *policy, enum ldlm_mode mode,
2686                    __u64 *flags, struct osc_object *obj,
2687                    struct lustre_handle *lockh, int unref)
2688 {
2689         struct obd_device *obd = exp->exp_obd;
2690         __u64 lflags = *flags;
2691         enum ldlm_mode rc;
2692         ENTRY;
2693
2694         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2695                 RETURN(-EIO);
2696
2697         /* Filesystem lock extents are extended to page boundaries so that
2698          * dealing with the page cache is a little smoother */
2699         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2700         policy->l_extent.end |= ~PAGE_MASK;
2701
2702         /* Next, search for already existing extent locks that will cover us */
2703         /* If we're trying to read, we also search for an existing PW lock.  The
2704          * VFS and page cache already protect us locally, so lots of readers/
2705          * writers can share a single PW lock. */
2706         rc = mode;
2707         if (mode == LCK_PR)
2708                 rc |= LCK_PW;
2709         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2710                              res_id, type, policy, rc, lockh, unref);
2711         if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2712                 RETURN(rc);
2713
2714         if (obj != NULL) {
2715                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2716
2717                 LASSERT(lock != NULL);
2718                 if (osc_set_lock_data(lock, obj)) {
2719                         lock_res_and_lock(lock);
2720                         if (!ldlm_is_lvb_cached(lock)) {
2721                                 LASSERT(lock->l_ast_data == obj);
2722                                 osc_lock_lvb_update(env, obj, lock, NULL);
2723                                 ldlm_set_lvb_cached(lock);
2724                         }
2725                         unlock_res_and_lock(lock);
2726                 } else {
2727                         ldlm_lock_decref(lockh, rc);
2728                         rc = 0;
2729                 }
2730                 LDLM_LOCK_PUT(lock);
2731         }
2732         RETURN(rc);
2733 }
2734
2735 static int osc_statfs_interpret(const struct lu_env *env,
2736                                 struct ptlrpc_request *req, void *args, int rc)
2737 {
2738         struct osc_async_args *aa = args;
2739         struct obd_statfs *msfs;
2740
2741         ENTRY;
2742         if (rc == -EBADR)
2743                 /*
2744                  * The request has in fact never been sent due to issues at
2745                  * a higher level (LOV).  Exit immediately since the caller
2746                  * is aware of the problem and takes care of the clean up.
2747                  */
2748                 RETURN(rc);
2749
2750         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2751             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2752                 GOTO(out, rc = 0);
2753
2754         if (rc != 0)
2755                 GOTO(out, rc);
2756
2757         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2758         if (msfs == NULL)
2759                 GOTO(out, rc = -EPROTO);
2760
2761         *aa->aa_oi->oi_osfs = *msfs;
2762 out:
2763         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2764
2765         RETURN(rc);
2766 }
2767
2768 static int osc_statfs_async(struct obd_export *exp,
2769                             struct obd_info *oinfo, time64_t max_age,
2770                             struct ptlrpc_request_set *rqset)
2771 {
2772         struct obd_device     *obd = class_exp2obd(exp);
2773         struct ptlrpc_request *req;
2774         struct osc_async_args *aa;
2775         int rc;
2776         ENTRY;
2777
2778         if (obd->obd_osfs_age >= max_age) {
2779                 CDEBUG(D_SUPER,
2780                        "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2781                        obd->obd_name, &obd->obd_osfs,
2782                        obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2783                        obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2784                 spin_lock(&obd->obd_osfs_lock);
2785                 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2786                 spin_unlock(&obd->obd_osfs_lock);
2787                 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2788                 if (oinfo->oi_cb_up)
2789                         oinfo->oi_cb_up(oinfo, 0);
2790
2791                 RETURN(0);
2792         }
2793
2794         /* We could possibly pass max_age in the request (as an absolute
2795          * timestamp or a "seconds.usec ago") so the target can avoid doing
2796          * extra calls into the filesystem if that isn't necessary (e.g.
2797          * during mount that would help a bit).  Having relative timestamps
2798          * is not so great if request processing is slow, while absolute
2799          * timestamps are not ideal because they need time synchronization. */
2800         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2801         if (req == NULL)
2802                 RETURN(-ENOMEM);
2803
2804         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2805         if (rc) {
2806                 ptlrpc_request_free(req);
2807                 RETURN(rc);
2808         }
2809         ptlrpc_request_set_replen(req);
2810         req->rq_request_portal = OST_CREATE_PORTAL;
2811         ptlrpc_at_set_req_timeout(req);
2812
2813         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2814                 /* procfs requests not want stat in wait for avoid deadlock */
2815                 req->rq_no_resend = 1;
2816                 req->rq_no_delay = 1;
2817         }
2818
2819         req->rq_interpret_reply = osc_statfs_interpret;
2820         aa = ptlrpc_req_async_args(aa, req);
2821         aa->aa_oi = oinfo;
2822
2823         ptlrpc_set_add_req(rqset, req);
2824         RETURN(0);
2825 }
2826
2827 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2828                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2829 {
2830         struct obd_device     *obd = class_exp2obd(exp);
2831         struct obd_statfs     *msfs;
2832         struct ptlrpc_request *req;
2833         struct obd_import     *imp = NULL;
2834         int rc;
2835         ENTRY;
2836
2837
2838         /*Since the request might also come from lprocfs, so we need
2839          *sync this with client_disconnect_export Bug15684*/
2840         down_read(&obd->u.cli.cl_sem);
2841         if (obd->u.cli.cl_import)
2842                 imp = class_import_get(obd->u.cli.cl_import);
2843         up_read(&obd->u.cli.cl_sem);
2844         if (!imp)
2845                 RETURN(-ENODEV);
2846
2847         /* We could possibly pass max_age in the request (as an absolute
2848          * timestamp or a "seconds.usec ago") so the target can avoid doing
2849          * extra calls into the filesystem if that isn't necessary (e.g.
2850          * during mount that would help a bit).  Having relative timestamps
2851          * is not so great if request processing is slow, while absolute
2852          * timestamps are not ideal because they need time synchronization. */
2853         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2854
2855         class_import_put(imp);
2856
2857         if (req == NULL)
2858                 RETURN(-ENOMEM);
2859
2860         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2861         if (rc) {
2862                 ptlrpc_request_free(req);
2863                 RETURN(rc);
2864         }
2865         ptlrpc_request_set_replen(req);
2866         req->rq_request_portal = OST_CREATE_PORTAL;
2867         ptlrpc_at_set_req_timeout(req);
2868
2869         if (flags & OBD_STATFS_NODELAY) {
2870                 /* procfs requests not want stat in wait for avoid deadlock */
2871                 req->rq_no_resend = 1;
2872                 req->rq_no_delay = 1;
2873         }
2874
2875         rc = ptlrpc_queue_wait(req);
2876         if (rc)
2877                 GOTO(out, rc);
2878
2879         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2880         if (msfs == NULL)
2881                 GOTO(out, rc = -EPROTO);
2882
2883         *osfs = *msfs;
2884
2885         EXIT;
2886 out:
2887         ptlrpc_req_finished(req);
2888         return rc;
2889 }
2890
2891 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2892                          void *karg, void __user *uarg)
2893 {
2894         struct obd_device *obd = exp->exp_obd;
2895         struct obd_ioctl_data *data = karg;
2896         int rc = 0;
2897
2898         ENTRY;
2899         if (!try_module_get(THIS_MODULE)) {
2900                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2901                        module_name(THIS_MODULE));
2902                 return -EINVAL;
2903         }
2904         switch (cmd) {
2905         case OBD_IOC_CLIENT_RECOVER:
2906                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2907                                            data->ioc_inlbuf1, 0);
2908                 if (rc > 0)
2909                         rc = 0;
2910                 break;
2911         case IOC_OSC_SET_ACTIVE:
2912                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2913                                               data->ioc_offset);
2914                 break;
2915         default:
2916                 rc = -ENOTTY;
2917                 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2918                        obd->obd_name, cmd, current->comm, rc);
2919                 break;
2920         }
2921
2922         module_put(THIS_MODULE);
2923         return rc;
2924 }
2925
2926 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2927                        u32 keylen, void *key, u32 vallen, void *val,
2928                        struct ptlrpc_request_set *set)
2929 {
2930         struct ptlrpc_request *req;
2931         struct obd_device     *obd = exp->exp_obd;
2932         struct obd_import     *imp = class_exp2cliimp(exp);
2933         char                  *tmp;
2934         int                    rc;
2935         ENTRY;
2936
2937         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2938
2939         if (KEY_IS(KEY_CHECKSUM)) {
2940                 if (vallen != sizeof(int))
2941                         RETURN(-EINVAL);
2942                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2943                 RETURN(0);
2944         }
2945
2946         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2947                 sptlrpc_conf_client_adapt(obd);
2948                 RETURN(0);
2949         }
2950
2951         if (KEY_IS(KEY_FLUSH_CTX)) {
2952                 sptlrpc_import_flush_my_ctx(imp);
2953                 RETURN(0);
2954         }
2955
2956         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2957                 struct client_obd *cli = &obd->u.cli;
2958                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2959                 long target = *(long *)val;
2960
2961                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2962                 *(long *)val -= nr;
2963                 RETURN(0);
2964         }
2965
2966         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2967                 RETURN(-EINVAL);
2968
2969         /* We pass all other commands directly to OST. Since nobody calls osc
2970            methods directly and everybody is supposed to go through LOV, we
2971            assume lov checked invalid values for us.
2972            The only recognised values so far are evict_by_nid and mds_conn.
2973            Even if something bad goes through, we'd get a -EINVAL from OST
2974            anyway. */
2975
2976         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2977                                                 &RQF_OST_SET_GRANT_INFO :
2978                                                 &RQF_OBD_SET_INFO);
2979         if (req == NULL)
2980                 RETURN(-ENOMEM);
2981
2982         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2983                              RCL_CLIENT, keylen);
2984         if (!KEY_IS(KEY_GRANT_SHRINK))
2985                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2986                                      RCL_CLIENT, vallen);
2987         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2988         if (rc) {
2989                 ptlrpc_request_free(req);
2990                 RETURN(rc);
2991         }
2992
2993         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2994         memcpy(tmp, key, keylen);
2995         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2996                                                         &RMF_OST_BODY :
2997                                                         &RMF_SETINFO_VAL);
2998         memcpy(tmp, val, vallen);
2999
3000         if (KEY_IS(KEY_GRANT_SHRINK)) {
3001                 struct osc_grant_args *aa;
3002                 struct obdo *oa;
3003
3004                 aa = ptlrpc_req_async_args(aa, req);
3005                 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3006                 if (!oa) {
3007                         ptlrpc_req_finished(req);
3008                         RETURN(-ENOMEM);
3009                 }
3010                 *oa = ((struct ost_body *)val)->oa;
3011                 aa->aa_oa = oa;
3012                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3013         }
3014
3015         ptlrpc_request_set_replen(req);
3016         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3017                 LASSERT(set != NULL);
3018                 ptlrpc_set_add_req(set, req);
3019                 ptlrpc_check_set(NULL, set);
3020         } else {
3021                 ptlrpcd_add_req(req);
3022         }
3023
3024         RETURN(0);
3025 }
3026 EXPORT_SYMBOL(osc_set_info_async);
3027
3028 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3029                   struct obd_device *obd, struct obd_uuid *cluuid,
3030                   struct obd_connect_data *data, void *localdata)
3031 {
3032         struct client_obd *cli = &obd->u.cli;
3033
3034         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3035                 long lost_grant;
3036                 long grant;
3037
3038                 spin_lock(&cli->cl_loi_list_lock);
3039                 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3040                 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3041                         /* restore ocd_grant_blkbits as client page bits */
3042                         data->ocd_grant_blkbits = PAGE_SHIFT;
3043                         grant += cli->cl_dirty_grant;
3044                 } else {
3045                         grant += cli->cl_dirty_pages << PAGE_SHIFT;
3046                 }
3047                 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3048                 lost_grant = cli->cl_lost_grant;
3049                 cli->cl_lost_grant = 0;
3050                 spin_unlock(&cli->cl_loi_list_lock);
3051
3052                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3053                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3054                        data->ocd_version, data->ocd_grant, lost_grant);
3055         }
3056
3057         RETURN(0);
3058 }
3059 EXPORT_SYMBOL(osc_reconnect);
3060
3061 int osc_disconnect(struct obd_export *exp)
3062 {
3063         struct obd_device *obd = class_exp2obd(exp);
3064         int rc;
3065
3066         rc = client_disconnect_export(exp);
3067         /**
3068          * Initially we put del_shrink_grant before disconnect_export, but it
3069          * causes the following problem if setup (connect) and cleanup
3070          * (disconnect) are tangled together.
3071          *      connect p1                     disconnect p2
3072          *   ptlrpc_connect_import
3073          *     ...............               class_manual_cleanup
3074          *                                     osc_disconnect
3075          *                                     del_shrink_grant
3076          *   ptlrpc_connect_interrupt
3077          *     osc_init_grant
3078          *   add this client to shrink list
3079          *                                      cleanup_osc
3080          * Bang! grant shrink thread trigger the shrink. BUG18662
3081          */
3082         osc_del_grant_list(&obd->u.cli);
3083         return rc;
3084 }
3085 EXPORT_SYMBOL(osc_disconnect);
3086
3087 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3088                                  struct hlist_node *hnode, void *arg)
3089 {
3090         struct lu_env *env = arg;
3091         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3092         struct ldlm_lock *lock;
3093         struct osc_object *osc = NULL;
3094         ENTRY;
3095
3096         lock_res(res);
3097         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3098                 if (lock->l_ast_data != NULL && osc == NULL) {
3099                         osc = lock->l_ast_data;
3100                         cl_object_get(osc2cl(osc));
3101                 }
3102
3103                 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3104                  * by the 2nd round of ldlm_namespace_clean() call in
3105                  * osc_import_event(). */
3106                 ldlm_clear_cleaned(lock);
3107         }
3108         unlock_res(res);
3109
3110         if (osc != NULL) {
3111                 osc_object_invalidate(env, osc);
3112                 cl_object_put(env, osc2cl(osc));
3113         }
3114
3115         RETURN(0);
3116 }
3117 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3118
3119 static int osc_import_event(struct obd_device *obd,
3120                             struct obd_import *imp,
3121                             enum obd_import_event event)
3122 {
3123         struct client_obd *cli;
3124         int rc = 0;
3125
3126         ENTRY;
3127         LASSERT(imp->imp_obd == obd);
3128
3129         switch (event) {
3130         case IMP_EVENT_DISCON: {
3131                 cli = &obd->u.cli;
3132                 spin_lock(&cli->cl_loi_list_lock);
3133                 cli->cl_avail_grant = 0;
3134                 cli->cl_lost_grant = 0;
3135                 spin_unlock(&cli->cl_loi_list_lock);
3136                 break;
3137         }
3138         case IMP_EVENT_INACTIVE: {
3139                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3140                 break;
3141         }
3142         case IMP_EVENT_INVALIDATE: {
3143                 struct ldlm_namespace *ns = obd->obd_namespace;
3144                 struct lu_env         *env;
3145                 __u16                  refcheck;
3146
3147                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3148
3149                 env = cl_env_get(&refcheck);
3150                 if (!IS_ERR(env)) {
3151                         osc_io_unplug(env, &obd->u.cli, NULL);
3152
3153                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
3154                                                  osc_ldlm_resource_invalidate,
3155                                                  env, 0);
3156                         cl_env_put(env, &refcheck);
3157
3158                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3159                 } else
3160                         rc = PTR_ERR(env);
3161                 break;
3162         }
3163         case IMP_EVENT_ACTIVE: {
3164                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3165                 break;
3166         }
3167         case IMP_EVENT_OCD: {
3168                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3169
3170                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3171                         osc_init_grant(&obd->u.cli, ocd);
3172
3173                 /* See bug 7198 */
3174                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3175                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3176
3177                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3178                 break;
3179         }
3180         case IMP_EVENT_DEACTIVATE: {
3181                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3182                 break;
3183         }
3184         case IMP_EVENT_ACTIVATE: {
3185                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3186                 break;
3187         }
3188         default:
3189                 CERROR("Unknown import event %d\n", event);
3190                 LBUG();
3191         }
3192         RETURN(rc);
3193 }
3194
3195 /**
3196  * Determine whether the lock can be canceled before replaying the lock
3197  * during recovery, see bug16774 for detailed information.
3198  *
3199  * \retval zero the lock can't be canceled
3200  * \retval other ok to cancel
3201  */
3202 static int osc_cancel_weight(struct ldlm_lock *lock)
3203 {
3204         /*
3205          * Cancel all unused and granted extent lock.
3206          */
3207         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3208             ldlm_is_granted(lock) &&
3209             osc_ldlm_weigh_ast(lock) == 0)
3210                 RETURN(1);
3211
3212         RETURN(0);
3213 }
3214
3215 static int brw_queue_work(const struct lu_env *env, void *data)
3216 {
3217         struct client_obd *cli = data;
3218
3219         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3220
3221         osc_io_unplug(env, cli, NULL);
3222         RETURN(0);
3223 }
3224
3225 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3226 {
3227         struct client_obd *cli = &obd->u.cli;
3228         void *handler;
3229         int rc;
3230
3231         ENTRY;
3232
3233         rc = ptlrpcd_addref();
3234         if (rc)
3235                 RETURN(rc);
3236
3237         rc = client_obd_setup(obd, lcfg);
3238         if (rc)
3239                 GOTO(out_ptlrpcd, rc);
3240
3241
3242         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3243         if (IS_ERR(handler))
3244                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3245         cli->cl_writeback_work = handler;
3246
3247         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3248         if (IS_ERR(handler))
3249                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3250         cli->cl_lru_work = handler;
3251
3252         rc = osc_quota_setup(obd);
3253         if (rc)
3254                 GOTO(out_ptlrpcd_work, rc);
3255
3256         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3257         osc_update_next_shrink(cli);
3258
3259         RETURN(rc);
3260
3261 out_ptlrpcd_work:
3262         if (cli->cl_writeback_work != NULL) {
3263                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3264                 cli->cl_writeback_work = NULL;
3265         }
3266         if (cli->cl_lru_work != NULL) {
3267                 ptlrpcd_destroy_work(cli->cl_lru_work);
3268                 cli->cl_lru_work = NULL;
3269         }
3270         client_obd_cleanup(obd);
3271 out_ptlrpcd:
3272         ptlrpcd_decref();
3273         RETURN(rc);
3274 }
3275 EXPORT_SYMBOL(osc_setup_common);
3276
3277 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3278 {
3279         struct client_obd *cli = &obd->u.cli;
3280         int                adding;
3281         int                added;
3282         int                req_count;
3283         int                rc;
3284
3285         ENTRY;
3286
3287         rc = osc_setup_common(obd, lcfg);
3288         if (rc < 0)
3289                 RETURN(rc);
3290
3291         rc = osc_tunables_init(obd);
3292         if (rc)
3293                 RETURN(rc);
3294
3295         /*
3296          * We try to control the total number of requests with a upper limit
3297          * osc_reqpool_maxreqcount. There might be some race which will cause
3298          * over-limit allocation, but it is fine.
3299          */
3300         req_count = atomic_read(&osc_pool_req_count);
3301         if (req_count < osc_reqpool_maxreqcount) {
3302                 adding = cli->cl_max_rpcs_in_flight + 2;
3303                 if (req_count + adding > osc_reqpool_maxreqcount)
3304                         adding = osc_reqpool_maxreqcount - req_count;
3305
3306                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3307                 atomic_add(added, &osc_pool_req_count);
3308         }
3309
3310         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3311
3312         spin_lock(&osc_shrink_lock);
3313         list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3314         spin_unlock(&osc_shrink_lock);
3315         cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3316         cli->cl_import->imp_idle_debug = D_HA;
3317
3318         RETURN(0);
3319 }
3320
3321 int osc_precleanup_common(struct obd_device *obd)
3322 {
3323         struct client_obd *cli = &obd->u.cli;
3324         ENTRY;
3325
3326         /* LU-464
3327          * for echo client, export may be on zombie list, wait for
3328          * zombie thread to cull it, because cli.cl_import will be
3329          * cleared in client_disconnect_export():
3330          *   class_export_destroy() -> obd_cleanup() ->
3331          *   echo_device_free() -> echo_client_cleanup() ->
3332          *   obd_disconnect() -> osc_disconnect() ->
3333          *   client_disconnect_export()
3334          */
3335         obd_zombie_barrier();
3336         if (cli->cl_writeback_work) {
3337                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3338                 cli->cl_writeback_work = NULL;
3339         }
3340
3341         if (cli->cl_lru_work) {
3342                 ptlrpcd_destroy_work(cli->cl_lru_work);
3343                 cli->cl_lru_work = NULL;
3344         }
3345
3346         obd_cleanup_client_import(obd);
3347         RETURN(0);
3348 }
3349 EXPORT_SYMBOL(osc_precleanup_common);
3350
3351 static int osc_precleanup(struct obd_device *obd)
3352 {
3353         ENTRY;
3354
3355         osc_precleanup_common(obd);
3356
3357         ptlrpc_lprocfs_unregister_obd(obd);
3358         RETURN(0);
3359 }
3360
3361 int osc_cleanup_common(struct obd_device *obd)
3362 {
3363         struct client_obd *cli = &obd->u.cli;
3364         int rc;
3365
3366         ENTRY;
3367
3368         spin_lock(&osc_shrink_lock);
3369         list_del(&cli->cl_shrink_list);
3370         spin_unlock(&osc_shrink_lock);
3371
3372         /* lru cleanup */
3373         if (cli->cl_cache != NULL) {
3374                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3375                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3376                 list_del_init(&cli->cl_lru_osc);
3377                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3378                 cli->cl_lru_left = NULL;
3379                 cl_cache_decref(cli->cl_cache);
3380                 cli->cl_cache = NULL;
3381         }
3382
3383         /* free memory of osc quota cache */
3384         osc_quota_cleanup(obd);
3385
3386         rc = client_obd_cleanup(obd);
3387
3388         ptlrpcd_decref();
3389         RETURN(rc);
3390 }
3391 EXPORT_SYMBOL(osc_cleanup_common);
3392
3393 static const struct obd_ops osc_obd_ops = {
3394         .o_owner                = THIS_MODULE,
3395         .o_setup                = osc_setup,
3396         .o_precleanup           = osc_precleanup,
3397         .o_cleanup              = osc_cleanup_common,
3398         .o_add_conn             = client_import_add_conn,
3399         .o_del_conn             = client_import_del_conn,
3400         .o_connect              = client_connect_import,
3401         .o_reconnect            = osc_reconnect,
3402         .o_disconnect           = osc_disconnect,
3403         .o_statfs               = osc_statfs,
3404         .o_statfs_async         = osc_statfs_async,
3405         .o_create               = osc_create,
3406         .o_destroy              = osc_destroy,
3407         .o_getattr              = osc_getattr,
3408         .o_setattr              = osc_setattr,
3409         .o_iocontrol            = osc_iocontrol,
3410         .o_set_info_async       = osc_set_info_async,
3411         .o_import_event         = osc_import_event,
3412         .o_quotactl             = osc_quotactl,
3413 };
3414
3415 static struct shrinker *osc_cache_shrinker;
3416 LIST_HEAD(osc_shrink_list);
3417 DEFINE_SPINLOCK(osc_shrink_lock);
3418
3419 #ifndef HAVE_SHRINKER_COUNT
3420 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3421 {
3422         struct shrink_control scv = {
3423                 .nr_to_scan = shrink_param(sc, nr_to_scan),
3424                 .gfp_mask   = shrink_param(sc, gfp_mask)
3425         };
3426         (void)osc_cache_shrink_scan(shrinker, &scv);
3427
3428         return osc_cache_shrink_count(shrinker, &scv);
3429 }
3430 #endif
3431
3432 static int __init osc_init(void)
3433 {
3434         unsigned int reqpool_size;
3435         unsigned int reqsize;
3436         int rc;
3437         DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3438                          osc_cache_shrink_count, osc_cache_shrink_scan);
3439         ENTRY;
3440
3441         /* print an address of _any_ initialized kernel symbol from this
3442          * module, to allow debugging with gdb that doesn't support data
3443          * symbols from modules.*/
3444         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3445
3446         rc = lu_kmem_init(osc_caches);
3447         if (rc)
3448                 RETURN(rc);
3449
3450         rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3451                                  LUSTRE_OSC_NAME, &osc_device_type);
3452         if (rc)
3453                 GOTO(out_kmem, rc);
3454
3455         osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3456
3457         /* This is obviously too much memory, only prevent overflow here */
3458         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3459                 GOTO(out_type, rc = -EINVAL);
3460
3461         reqpool_size = osc_reqpool_mem_max << 20;
3462
3463         reqsize = 1;
3464         while (reqsize < OST_IO_MAXREQSIZE)
3465                 reqsize = reqsize << 1;
3466
3467         /*
3468          * We don't enlarge the request count in OSC pool according to
3469          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3470          * tried after normal allocation failed. So a small OSC pool won't
3471          * cause much performance degression in most of cases.
3472          */
3473         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3474
3475         atomic_set(&osc_pool_req_count, 0);
3476         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3477                                           ptlrpc_add_rqs_to_pool);
3478
3479         if (osc_rq_pool == NULL)
3480                 GOTO(out_type, rc = -ENOMEM);
3481
3482         rc = osc_start_grant_work();
3483         if (rc != 0)
3484                 GOTO(out_req_pool, rc);
3485
3486         RETURN(rc);
3487
3488 out_req_pool:
3489         ptlrpc_free_rq_pool(osc_rq_pool);
3490 out_type:
3491         class_unregister_type(LUSTRE_OSC_NAME);
3492 out_kmem:
3493         lu_kmem_fini(osc_caches);
3494
3495         RETURN(rc);
3496 }
3497
3498 static void __exit osc_exit(void)
3499 {
3500         osc_stop_grant_work();
3501         remove_shrinker(osc_cache_shrinker);
3502         class_unregister_type(LUSTRE_OSC_NAME);
3503         lu_kmem_fini(osc_caches);
3504         ptlrpc_free_rq_pool(osc_rq_pool);
3505 }
3506
3507 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3508 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3509 MODULE_VERSION(LUSTRE_VERSION_STRING);
3510 MODULE_LICENSE("GPL");
3511
3512 module_init(osc_init);
3513 module_exit(osc_exit);