Whamcloud - gitweb
LU-4856 misc: Reduce exposure to overflow on page counters.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <lustre_ha.h>
47 #include <lprocfs_status.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_log.h>
50 #include <lustre_debug.h>
51 #include <lustre_param.h>
52 #include <lustre_fid.h>
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57         struct obdo              *aa_oa;
58         int                       aa_requested_nob;
59         int                       aa_nio_count;
60         obd_count                 aa_page_count;
61         int                       aa_resends;
62         struct brw_page **aa_ppga;
63         struct client_obd        *aa_cli;
64         struct list_head          aa_oaps;
65         struct list_head          aa_exts;
66         struct obd_capa  *aa_ocapa;
67         struct cl_req            *aa_clerq;
68 };
69
70 #define osc_grant_args osc_brw_async_args
71
72 struct osc_async_args {
73         struct obd_info *aa_oi;
74 };
75
76 struct osc_setattr_args {
77         struct obdo             *sa_oa;
78         obd_enqueue_update_f     sa_upcall;
79         void                    *sa_cookie;
80 };
81
82 struct osc_fsync_args {
83         struct obd_info *fa_oi;
84         obd_enqueue_update_f     fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_enqueue_args {
89         struct obd_export               *oa_exp;
90         __u64                           *oa_flags;
91         obd_enqueue_update_f             oa_upcall;
92         void                            *oa_cookie;
93         struct ost_lvb                  *oa_lvb;
94         struct lustre_handle            *oa_lockh;
95         struct ldlm_enqueue_info        *oa_ei;
96         unsigned int                     oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof(*lmm)) {
113                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
114                                exp->exp_obd->obd_name, lmm_bytes,
115                                (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
121                         CERROR("%s: zero lmm_object_id: rc = %d\n",
122                                exp->exp_obd->obd_name, -EINVAL);
123                         RETURN(-EINVAL);
124                 }
125         }
126
127         lsm_size = lov_stripe_md_size(1);
128         if (lsmp == NULL)
129                 RETURN(lsm_size);
130
131         if (*lsmp != NULL && lmm == NULL) {
132                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133                 OBD_FREE(*lsmp, lsm_size);
134                 *lsmp = NULL;
135                 RETURN(0);
136         }
137
138         if (*lsmp == NULL) {
139                 OBD_ALLOC(*lsmp, lsm_size);
140                 if (unlikely(*lsmp == NULL))
141                         RETURN(-ENOMEM);
142                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144                         OBD_FREE(*lsmp, lsm_size);
145                         RETURN(-ENOMEM);
146                 }
147                 loi_init((*lsmp)->lsm_oinfo[0]);
148         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
149                 RETURN(-EBADF);
150         }
151
152         if (lmm != NULL)
153                 /* XXX zero *lsmp? */
154                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
190                              oinfo->oi_oa);
191         osc_pack_capa(req, body, oinfo->oi_capa);
192 }
193
194 static inline void osc_set_capa_size(struct ptlrpc_request *req,
195                                      const struct req_msg_field *field,
196                                      struct obd_capa *oc)
197 {
198         if (oc == NULL)
199                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200         else
201                 /* it is already calculated as sizeof struct obd_capa */
202                 ;
203 }
204
205 static int osc_getattr_interpret(const struct lu_env *env,
206                                  struct ptlrpc_request *req,
207                                  struct osc_async_args *aa, int rc)
208 {
209         struct ost_body *body;
210         ENTRY;
211
212         if (rc != 0)
213                 GOTO(out, rc);
214
215         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216         if (body) {
217                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
219                                      aa->aa_oi->oi_oa, &body->oa);
220
221                 /* This should really be sent by the OST */
222                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224         } else {
225                 CDEBUG(D_INFO, "can't unpack ost_body\n");
226                 rc = -EPROTO;
227                 aa->aa_oi->oi_oa->o_valid = 0;
228         }
229 out:
230         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231         RETURN(rc);
232 }
233
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235                              struct ptlrpc_request_set *set)
236 {
237         struct ptlrpc_request *req;
238         struct osc_async_args *aa;
239         int                    rc;
240         ENTRY;
241
242         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243         if (req == NULL)
244                 RETURN(-ENOMEM);
245
246         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248         if (rc) {
249                 ptlrpc_request_free(req);
250                 RETURN(rc);
251         }
252
253         osc_pack_req_body(req, oinfo);
254
255         ptlrpc_request_set_replen(req);
256         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257
258         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259         aa = ptlrpc_req_async_args(req);
260         aa->aa_oi = oinfo;
261
262         ptlrpc_set_add_req(set, req);
263         RETURN(0);
264 }
265
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267                        struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
299                              &body->oa);
300
301         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         EXIT;
305  out:
306         ptlrpc_req_finished(req);
307         return rc;
308 }
309
310 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
311                        struct obd_info *oinfo, struct obd_trans_info *oti)
312 {
313         struct ptlrpc_request *req;
314         struct ost_body       *body;
315         int                    rc;
316         ENTRY;
317
318         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
344                              &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
367                              &body->oa);
368 out:
369         rc = sa->sa_upcall(sa->sa_cookie, rc);
370         RETURN(rc);
371 }
372
373 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
374                            struct obd_trans_info *oti,
375                            obd_enqueue_update_f upcall, void *cookie,
376                            struct ptlrpc_request_set *rqset)
377 {
378         struct ptlrpc_request   *req;
379         struct osc_setattr_args *sa;
380         int                      rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384         if (req == NULL)
385                 RETURN(-ENOMEM);
386
387         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
388         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
389         if (rc) {
390                 ptlrpc_request_free(req);
391                 RETURN(rc);
392         }
393
394         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396
397         osc_pack_req_body(req, oinfo);
398
399         ptlrpc_request_set_replen(req);
400
401         /* do mds to ost setattr asynchronously */
402         if (!rqset) {
403                 /* Do not wait for response. */
404                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405         } else {
406                 req->rq_interpret_reply =
407                         (ptlrpc_interpterer_t)osc_setattr_interpret;
408
409                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
410                 sa = ptlrpc_req_async_args(req);
411                 sa->sa_oa = oinfo->oi_oa;
412                 sa->sa_upcall = upcall;
413                 sa->sa_cookie = cookie;
414
415                 if (rqset == PTLRPCD_SET)
416                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
417                 else
418                         ptlrpc_set_add_req(rqset, req);
419         }
420
421         RETURN(0);
422 }
423
424 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
425                              struct obd_trans_info *oti,
426                              struct ptlrpc_request_set *rqset)
427 {
428         return osc_setattr_async_base(exp, oinfo, oti,
429                                       oinfo->oi_cb_up, oinfo, rqset);
430 }
431
432 int osc_real_create(struct obd_export *exp, struct obdo *oa,
433                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
434 {
435         struct ptlrpc_request *req;
436         struct ost_body       *body;
437         struct lov_stripe_md  *lsm;
438         int                    rc;
439         ENTRY;
440
441         LASSERT(oa);
442         LASSERT(ea);
443
444         lsm = *ea;
445         if (!lsm) {
446                 rc = obd_alloc_memmd(exp, &lsm);
447                 if (rc < 0)
448                         RETURN(rc);
449         }
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
452         if (req == NULL)
453                 GOTO(out, rc = -ENOMEM);
454
455         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 GOTO(out, rc);
459         }
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463
464         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
485         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
486
487         oa->o_blksize = cli_brw_size(exp->exp_obd);
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_oi = oa->o_oi;
495         *ea = lsm;
496
497         if (oti != NULL) {
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (oti->oti_logcookies == NULL)
500                                 oti->oti_logcookies = &oti->oti_onecookie;
501
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
542                              oinfo->oi_oa);
543         osc_pack_capa(req, body, oinfo->oi_capa);
544
545         ptlrpc_request_set_replen(req);
546
547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549         sa = ptlrpc_req_async_args(req);
550         sa->sa_oa     = oinfo->oi_oa;
551         sa->sa_upcall = upcall;
552         sa->sa_cookie = cookie;
553         if (rqset == PTLRPCD_SET)
554                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         else
556                 ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync_interpret(const struct lu_env *env,
562                               struct ptlrpc_request *req,
563                               void *arg, int rc)
564 {
565         struct osc_fsync_args *fa = arg;
566         struct ost_body *body;
567         ENTRY;
568
569         if (rc)
570                 GOTO(out, rc);
571
572         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
573         if (body == NULL) {
574                 CERROR ("can't unpack ost_body\n");
575                 GOTO(out, rc = -EPROTO);
576         }
577
578         *fa->fa_oi->oi_oa = body->oa;
579 out:
580         rc = fa->fa_upcall(fa->fa_cookie, rc);
581         RETURN(rc);
582 }
583
584 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
585                   obd_enqueue_update_f upcall, void *cookie,
586                   struct ptlrpc_request_set *rqset)
587 {
588         struct ptlrpc_request *req;
589         struct ost_body       *body;
590         struct osc_fsync_args *fa;
591         int                    rc;
592         ENTRY;
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
609                              oinfo->oi_oa);
610         osc_pack_capa(req, body, oinfo->oi_capa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         RETURN (0);
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640         ENTRY;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 RETURN(0);
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (IS_ERR(res))
654                 RETURN(0);
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         RETURN(count);
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694                struct obdo *oa, struct lov_stripe_md **ea,
695                struct obd_trans_info *oti)
696 {
697         int rc = 0;
698         ENTRY;
699
700         LASSERT(oa);
701         LASSERT(ea);
702         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
703
704         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
705             oa->o_flags == OBD_FL_RECREATE_OBJS) {
706                 RETURN(osc_real_create(exp, oa, ea, oti));
707         }
708
709         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
710                 RETURN(osc_real_create(exp, oa, ea, oti));
711
712         /* we should not get here anymore */
713         LBUG();
714
715         RETURN(rc);
716 }
717
718 /* Destroy requests can be async always on the client, and we don't even really
719  * care about the return code since the client cannot do anything at all about
720  * a destroy failure.
721  * When the MDS is unlinking a filename, it saves the file objects into a
722  * recovery llog, and these object records are cancelled when the OST reports
723  * they were destroyed and sync'd to disk (i.e. transaction committed).
724  * If the client dies, or the OST is down when the object should be destroyed,
725  * the records are not cancelled, and when the OST reconnects to the MDS next,
726  * it will retrieve the llog unlink logs and then sends the log cancellation
727  * cookies to the MDS after committing destroy transactions. */
728 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
729                        struct obdo *oa, struct lov_stripe_md *ea,
730                        struct obd_trans_info *oti, struct obd_export *md_export,
731                        void *capa)
732 {
733         struct client_obd     *cli = &exp->exp_obd->u.cli;
734         struct ptlrpc_request *req;
735         struct ost_body       *body;
736         struct list_head       cancels = LIST_HEAD_INIT(cancels);
737         int rc, count;
738         ENTRY;
739
740         if (!oa) {
741                 CDEBUG(D_INFO, "oa NULL\n");
742                 RETURN(-EINVAL);
743         }
744
745         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
746                                         LDLM_FL_DISCARD_DATA);
747
748         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
749         if (req == NULL) {
750                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
751                 RETURN(-ENOMEM);
752         }
753
754         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
755         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
756                                0, &cancels, count);
757         if (rc) {
758                 ptlrpc_request_free(req);
759                 RETURN(rc);
760         }
761
762         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
763         ptlrpc_at_set_req_timeout(req);
764
765         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
766                 oa->o_lcookie = *oti->oti_logcookies;
767         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
768         LASSERT(body);
769         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
770
771         osc_pack_capa(req, body, (struct obd_capa *)capa);
772         ptlrpc_request_set_replen(req);
773
774         /* If osc_destory is for destroying the unlink orphan,
775          * sent from MDT to OST, which should not be blocked here,
776          * because the process might be triggered by ptlrpcd, and
777          * it is not good to block ptlrpcd thread (b=16006)*/
778         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
779                 req->rq_interpret_reply = osc_destroy_interpret;
780                 if (!osc_can_send_destroy(cli)) {
781                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
782                                                           NULL);
783
784                         /*
785                          * Wait until the number of on-going destroy RPCs drops
786                          * under max_rpc_in_flight
787                          */
788                         l_wait_event_exclusive(cli->cl_destroy_waitq,
789                                                osc_can_send_destroy(cli), &lwi);
790                 }
791         }
792
793         /* Do not wait for response */
794         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
795         RETURN(0);
796 }
797
798 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
799                                 long writing_bytes)
800 {
801         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
802
803         LASSERT(!(oa->o_valid & bits));
804
805         oa->o_valid |= bits;
806         client_obd_list_lock(&cli->cl_loi_list_lock);
807         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
808         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
809                      cli->cl_dirty_max_pages)) {
810                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
811                        cli->cl_dirty_pages, cli->cl_dirty_transit,
812                        cli->cl_dirty_max_pages);
813                 oa->o_undirty = 0;
814         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
815                             atomic_long_read(&obd_dirty_transit_pages) >
816                             (obd_max_dirty_pages + 1))) {
817                 /* The atomic_read() allowing the atomic_inc() are
818                  * not covered by a lock thus they may safely race and trip
819                  * this CERROR() unless we add in a small fudge factor (+1). */
820                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
821                        cli->cl_import->imp_obd->obd_name,
822                        atomic_long_read(&obd_dirty_pages),
823                        atomic_long_read(&obd_dirty_transit_pages),
824                        obd_max_dirty_pages);
825                 oa->o_undirty = 0;
826         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
827                             0x7fffffff)) {
828                 CERROR("dirty %lu - dirty_max %lu too big???\n",
829                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
830                 oa->o_undirty = 0;
831         } else {
832                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
833                                       PAGE_CACHE_SHIFT) *
834                                      (cli->cl_max_rpcs_in_flight + 1);
835                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
836                                     max_in_flight);
837         }
838         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
839         oa->o_dropped = cli->cl_lost_grant;
840         cli->cl_lost_grant = 0;
841         client_obd_list_unlock(&cli->cl_loi_list_lock);
842         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
843                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
844
845 }
846
847 void osc_update_next_shrink(struct client_obd *cli)
848 {
849         cli->cl_next_shrink_grant =
850                 cfs_time_shift(cli->cl_grant_shrink_interval);
851         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
852                cli->cl_next_shrink_grant);
853 }
854
855 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
856 {
857         client_obd_list_lock(&cli->cl_loi_list_lock);
858         cli->cl_avail_grant += grant;
859         client_obd_list_unlock(&cli->cl_loi_list_lock);
860 }
861
862 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
863 {
864         if (body->oa.o_valid & OBD_MD_FLGRANT) {
865                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
866                 __osc_update_grant(cli, body->oa.o_grant);
867         }
868 }
869
870 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
871                               obd_count keylen, void *key, obd_count vallen,
872                               void *val, struct ptlrpc_request_set *set);
873
874 static int osc_shrink_grant_interpret(const struct lu_env *env,
875                                       struct ptlrpc_request *req,
876                                       void *aa, int rc)
877 {
878         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
879         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
880         struct ost_body *body;
881
882         if (rc != 0) {
883                 __osc_update_grant(cli, oa->o_grant);
884                 GOTO(out, rc);
885         }
886
887         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
888         LASSERT(body);
889         osc_update_grant(cli, body);
890 out:
891         OBDO_FREE(oa);
892         return rc;
893 }
894
895 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
896 {
897         client_obd_list_lock(&cli->cl_loi_list_lock);
898         oa->o_grant = cli->cl_avail_grant / 4;
899         cli->cl_avail_grant -= oa->o_grant;
900         client_obd_list_unlock(&cli->cl_loi_list_lock);
901         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
902                 oa->o_valid |= OBD_MD_FLFLAGS;
903                 oa->o_flags = 0;
904         }
905         oa->o_flags |= OBD_FL_SHRINK_GRANT;
906         osc_update_next_shrink(cli);
907 }
908
909 /* Shrink the current grant, either from some large amount to enough for a
910  * full set of in-flight RPCs, or if we have already shrunk to that limit
911  * then to enough for a single RPC.  This avoids keeping more grant than
912  * needed, and avoids shrinking the grant piecemeal. */
913 static int osc_shrink_grant(struct client_obd *cli)
914 {
915         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
916                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
917
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         if (cli->cl_avail_grant <= target_bytes)
920                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922
923         return osc_shrink_grant_to_target(cli, target_bytes);
924 }
925
926 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
927 {
928         int                     rc = 0;
929         struct ost_body        *body;
930         ENTRY;
931
932         client_obd_list_lock(&cli->cl_loi_list_lock);
933         /* Don't shrink if we are already above or below the desired limit
934          * We don't want to shrink below a single RPC, as that will negatively
935          * impact block allocation and long-term performance. */
936         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
937                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
938
939         if (target_bytes >= cli->cl_avail_grant) {
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 RETURN(0);
942         }
943         client_obd_list_unlock(&cli->cl_loi_list_lock);
944
945         OBD_ALLOC_PTR(body);
946         if (!body)
947                 RETURN(-ENOMEM);
948
949         osc_announce_cached(cli, &body->oa, 0);
950
951         client_obd_list_lock(&cli->cl_loi_list_lock);
952         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
953         cli->cl_avail_grant = target_bytes;
954         client_obd_list_unlock(&cli->cl_loi_list_lock);
955         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
956                 body->oa.o_valid |= OBD_MD_FLFLAGS;
957                 body->oa.o_flags = 0;
958         }
959         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
960         osc_update_next_shrink(cli);
961
962         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
963                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
964                                 sizeof(*body), body, NULL);
965         if (rc != 0)
966                 __osc_update_grant(cli, body->oa.o_grant);
967         OBD_FREE_PTR(body);
968         RETURN(rc);
969 }
970
971 static int osc_should_shrink_grant(struct client_obd *client)
972 {
973         cfs_time_t time = cfs_time_current();
974         cfs_time_t next_shrink = client->cl_next_shrink_grant;
975
976         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
977              OBD_CONNECT_GRANT_SHRINK) == 0)
978                 return 0;
979
980         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
981                 /* Get the current RPC size directly, instead of going via:
982                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
983                  * Keep comment here so that it can be found by searching. */
984                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
985
986                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
987                     client->cl_avail_grant > brw_size)
988                         return 1;
989                 else
990                         osc_update_next_shrink(client);
991         }
992         return 0;
993 }
994
995 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
996 {
997         struct client_obd *client;
998
999         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1000                 if (osc_should_shrink_grant(client))
1001                         osc_shrink_grant(client);
1002         }
1003         return 0;
1004 }
1005
1006 static int osc_add_shrink_grant(struct client_obd *client)
1007 {
1008         int rc;
1009
1010         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1011                                        TIMEOUT_GRANT,
1012                                        osc_grant_shrink_grant_cb, NULL,
1013                                        &client->cl_grant_shrink_list);
1014         if (rc) {
1015                 CERROR("add grant client %s error %d\n",
1016                         client->cl_import->imp_obd->obd_name, rc);
1017                 return rc;
1018         }
1019         CDEBUG(D_CACHE, "add grant client %s \n",
1020                client->cl_import->imp_obd->obd_name);
1021         osc_update_next_shrink(client);
1022         return 0;
1023 }
1024
1025 static int osc_del_shrink_grant(struct client_obd *client)
1026 {
1027         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1028                                          TIMEOUT_GRANT);
1029 }
1030
1031 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1032 {
1033         /*
1034          * ocd_grant is the total grant amount we're expect to hold: if we've
1035          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1036          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1037          * dirty.
1038          *
1039          * race is tolerable here: if we're evicted, but imp_state already
1040          * left EVICTED state, then cl_dirty_pages must be 0 already.
1041          */
1042         client_obd_list_lock(&cli->cl_loi_list_lock);
1043         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         else
1046                 cli->cl_avail_grant = ocd->ocd_grant -
1047                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1048
1049         if (cli->cl_avail_grant < 0) {
1050                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1053                 /* workaround for servers which do not have the patch from
1054                  * LU-2679 */
1055                 cli->cl_avail_grant = ocd->ocd_grant;
1056         }
1057
1058         /* determine the appropriate chunk size used by osc_extent. */
1059         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060         client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067             list_empty(&cli->cl_grant_shrink_list))
1068                 osc_add_shrink_grant(cli);
1069 }
1070
1071 /* We assume that the reason this OSC got a short read is because it read
1072  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1073  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1074  * this stripe never got written at or beyond this stripe offset yet. */
1075 static void handle_short_read(int nob_read, obd_count page_count,
1076                               struct brw_page **pga)
1077 {
1078         char *ptr;
1079         int i = 0;
1080
1081         /* skip bytes read OK */
1082         while (nob_read > 0) {
1083                 LASSERT (page_count > 0);
1084
1085                 if (pga[i]->count > nob_read) {
1086                         /* EOF inside this page */
1087                         ptr = kmap(pga[i]->pg) +
1088                                 (pga[i]->off & ~CFS_PAGE_MASK);
1089                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090                         kunmap(pga[i]->pg);
1091                         page_count--;
1092                         i++;
1093                         break;
1094                 }
1095
1096                 nob_read -= pga[i]->count;
1097                 page_count--;
1098                 i++;
1099         }
1100
1101         /* zero remaining pages */
1102         while (page_count-- > 0) {
1103                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104                 memset(ptr, 0, pga[i]->count);
1105                 kunmap(pga[i]->pg);
1106                 i++;
1107         }
1108 }
1109
1110 static int check_write_rcs(struct ptlrpc_request *req,
1111                            int requested_nob, int niocount,
1112                            obd_count page_count, struct brw_page **pga)
1113 {
1114         int     i;
1115         __u32   *remote_rcs;
1116
1117         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118                                                   sizeof(*remote_rcs) *
1119                                                   niocount);
1120         if (remote_rcs == NULL) {
1121                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122                 return(-EPROTO);
1123         }
1124
1125         /* return error if any niobuf was in error */
1126         for (i = 0; i < niocount; i++) {
1127                 if ((int)remote_rcs[i] < 0)
1128                         return(remote_rcs[i]);
1129
1130                 if (remote_rcs[i] != 0) {
1131                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132                                 i, remote_rcs[i], req);
1133                         return(-EPROTO);
1134                 }
1135         }
1136
1137         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139                        req->rq_bulk->bd_nob_transferred, requested_nob);
1140                 return(-EPROTO);
1141         }
1142
1143         return (0);
1144 }
1145
1146 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147 {
1148         if (p1->flag != p2->flag) {
1149                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1150                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1151                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1152
1153                 /* warn if we try to combine flags that we don't know to be
1154                  * safe to combine */
1155                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1156                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1157                               "report this at https://jira.hpdd.intel.com/\n",
1158                               p1->flag, p2->flag);
1159                 }
1160                 return 0;
1161         }
1162
1163         return (p1->off + p1->count == p2->off);
1164 }
1165
1166 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1167                                    struct brw_page **pga, int opc,
1168                                    cksum_type_t cksum_type)
1169 {
1170         __u32                           cksum;
1171         int                             i = 0;
1172         struct cfs_crypto_hash_desc     *hdesc;
1173         unsigned int                    bufsize;
1174         int                             err;
1175         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1176
1177         LASSERT(pg_count > 0);
1178
1179         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1180         if (IS_ERR(hdesc)) {
1181                 CERROR("Unable to initialize checksum hash %s\n",
1182                        cfs_crypto_hash_name(cfs_alg));
1183                 return PTR_ERR(hdesc);
1184         }
1185
1186         while (nob > 0 && pg_count > 0) {
1187                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1188
1189                 /* corrupt the data before we compute the checksum, to
1190                  * simulate an OST->client data error */
1191                 if (i == 0 && opc == OST_READ &&
1192                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1193                         unsigned char *ptr = kmap(pga[i]->pg);
1194                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1195
1196                         memcpy(ptr + off, "bad1", min(4, nob));
1197                         kunmap(pga[i]->pg);
1198                 }
1199                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1200                                             pga[i]->off & ~CFS_PAGE_MASK,
1201                                             count);
1202                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1203                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1204
1205                 nob -= pga[i]->count;
1206                 pg_count--;
1207                 i++;
1208         }
1209
1210         bufsize = sizeof(cksum);
1211         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1212
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve,
1226                                 int resend)
1227 {
1228         struct ptlrpc_request   *req;
1229         struct ptlrpc_bulk_desc *desc;
1230         struct ost_body         *body;
1231         struct obd_ioobj        *ioobj;
1232         struct niobuf_remote    *niobuf;
1233         int niocount, i, requested_nob, opc, rc;
1234         struct osc_brw_async_args *aa;
1235         struct req_capsule      *pill;
1236         struct brw_page *pg_prev;
1237
1238         ENTRY;
1239         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240                 RETURN(-ENOMEM); /* Recoverable */
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242                 RETURN(-EINVAL); /* Fatal */
1243
1244         if ((cmd & OBD_BRW_WRITE) != 0) {
1245                 opc = OST_WRITE;
1246                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1247                                                 cli->cl_import->imp_rq_pool,
1248                                                 &RQF_OST_BRW_WRITE);
1249         } else {
1250                 opc = OST_READ;
1251                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252         }
1253         if (req == NULL)
1254                 RETURN(-ENOMEM);
1255
1256         for (niocount = i = 1; i < page_count; i++) {
1257                 if (!can_merge_pages(pga[i - 1], pga[i]))
1258                         niocount++;
1259         }
1260
1261         pill = &req->rq_pill;
1262         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263                              sizeof(*ioobj));
1264         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265                              niocount * sizeof(*niobuf));
1266         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269         if (rc) {
1270                 ptlrpc_request_free(req);
1271                 RETURN(rc);
1272         }
1273         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1274         ptlrpc_at_set_req_timeout(req);
1275         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1276          * retry logic */
1277         req->rq_no_retry_einprogress = 1;
1278
1279         desc = ptlrpc_prep_bulk_imp(req, page_count,
1280                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282                 OST_BULK_PORTAL);
1283
1284         if (desc == NULL)
1285                 GOTO(out, rc = -ENOMEM);
1286         /* NB request now owns desc and will free it when it gets freed */
1287
1288         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295         obdo_to_ioobj(oa, ioobj);
1296         ioobj->ioo_bufcnt = niocount;
1297         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1298          * that might be send for this request.  The actual number is decided
1299          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1300          * "max - 1" for old client compatibility sending "0", and also so the
1301          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1302         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT(page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308                 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                 LASSERT(pg->count > 0);
1311                 /* make sure there is no gap in the middle of page array */
1312                 LASSERTF(page_count == 1 ||
1313                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314                           ergo(i > 0 && i < page_count - 1,
1315                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1316                           ergo(i == page_count - 1, poff == 0)),
1317                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                          i, page_count, pg, pg->off, pg->count);
1319                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1320                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322                          i, page_count,
1323                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324                          pg_prev->pg, page_private(pg_prev->pg),
1325                          pg_prev->pg->index, pg_prev->off);
1326                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327                         (pg->flag & OBD_BRW_SRVLOCK));
1328
1329                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330                 requested_nob += pg->count;
1331
1332                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                         niobuf--;
1334                         niobuf->rnb_len += pg->count;
1335                 } else {
1336                         niobuf->rnb_offset = pg->off;
1337                         niobuf->rnb_len    = pg->count;
1338                         niobuf->rnb_flags  = pg->flag;
1339                 }
1340                 pg_prev = pg;
1341         }
1342
1343         LASSERTF((void *)(niobuf - niocount) ==
1344                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349         if (resend) {
1350                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1352                         body->oa.o_flags = 0;
1353                 }
1354                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355         }
1356
1357         if (osc_should_shrink_grant(cli))
1358                 osc_shrink_grant_local(cli, &body->oa);
1359
1360         /* size[REQ_REC_OFF] still sizeof (*body) */
1361         if (opc == OST_WRITE) {
1362                 if (cli->cl_checksum &&
1363                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364                         /* store cl_cksum_type in a local variable since
1365                          * it can be changed via lprocfs */
1366                         cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1370                                 body->oa.o_flags = 0;
1371                         }
1372                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1373                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375                                                              page_count, pga,
1376                                                              OST_WRITE,
1377                                                              cksum_type);
1378                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379                                body->oa.o_cksum);
1380                         /* save this in 'oa', too, for later checking */
1381                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                         oa->o_flags |= cksum_type_pack(cksum_type);
1383                 } else {
1384                         /* clear out the checksum flag, in case this is a
1385                          * resend but cl_checksum is no longer set. b=11238 */
1386                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1387                 }
1388                 oa->o_cksum = body->oa.o_cksum;
1389                 /* 1 RC per niobuf */
1390                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391                                      sizeof(__u32) * niocount);
1392         } else {
1393                 if (cli->cl_checksum &&
1394                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396                                 body->oa.o_flags = 0;
1397                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                 }
1400         }
1401         ptlrpc_request_set_replen(req);
1402
1403         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404         aa = ptlrpc_req_async_args(req);
1405         aa->aa_oa = oa;
1406         aa->aa_requested_nob = requested_nob;
1407         aa->aa_nio_count = niocount;
1408         aa->aa_page_count = page_count;
1409         aa->aa_resends = 0;
1410         aa->aa_ppga = pga;
1411         aa->aa_cli = cli;
1412         INIT_LIST_HEAD(&aa->aa_oaps);
1413         if (ocapa && reserve)
1414                 aa->aa_ocapa = capa_get(ocapa);
1415
1416         *reqp = req;
1417         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1418         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1419                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1420                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1421         RETURN(0);
1422
1423  out:
1424         ptlrpc_req_finished(req);
1425         RETURN(rc);
1426 }
1427
1428 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1429                                 __u32 client_cksum, __u32 server_cksum, int nob,
1430                                 obd_count page_count, struct brw_page **pga,
1431                                 cksum_type_t client_cksum_type)
1432 {
1433         __u32 new_cksum;
1434         char *msg;
1435         cksum_type_t cksum_type;
1436
1437         if (server_cksum == client_cksum) {
1438                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1439                 return 0;
1440         }
1441
1442         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1443                                        oa->o_flags : 0);
1444         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1445                                       cksum_type);
1446
1447         if (cksum_type != client_cksum_type)
1448                 msg = "the server did not use the checksum type specified in "
1449                       "the original request - likely a protocol problem";
1450         else if (new_cksum == server_cksum)
1451                 msg = "changed on the client after we checksummed it - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453         else if (new_cksum == client_cksum)
1454                 msg = "changed in transit before arrival at OST";
1455         else
1456                 msg = "changed in transit AND doesn't match the original - "
1457                       "likely false positive due to mmap IO (bug 11742)";
1458
1459         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1460                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1461                            msg, libcfs_nid2str(peer->nid),
1462                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1463                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1464                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1465                            POSTID(&oa->o_oi), pga[0]->off,
1466                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1467         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1468                "client csum now %x\n", client_cksum, client_cksum_type,
1469                server_cksum, cksum_type, new_cksum);
1470         return 1;
1471 }
1472
1473 /* Note rc enters this function as number of bytes transferred */
1474 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1475 {
1476         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1477         const lnet_process_id_t *peer =
1478                         &req->rq_import->imp_connection->c_peer;
1479         struct client_obd *cli = aa->aa_cli;
1480         struct ost_body *body;
1481         __u32 client_cksum = 0;
1482         ENTRY;
1483
1484         if (rc < 0 && rc != -EDQUOT) {
1485                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1486                 RETURN(rc);
1487         }
1488
1489         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1490         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1491         if (body == NULL) {
1492                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1493                 RETURN(-EPROTO);
1494         }
1495
1496         /* set/clear over quota flag for a uid/gid */
1497         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1498             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1499                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1500
1501                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1502                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1503                        body->oa.o_flags);
1504                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1505         }
1506
1507         osc_update_grant(cli, body);
1508
1509         if (rc < 0)
1510                 RETURN(rc);
1511
1512         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1513                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1514
1515         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1516                 if (rc > 0) {
1517                         CERROR("Unexpected +ve rc %d\n", rc);
1518                         RETURN(-EPROTO);
1519                 }
1520                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1521
1522                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1523                         RETURN(-EAGAIN);
1524
1525                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1526                     check_write_checksum(&body->oa, peer, client_cksum,
1527                                          body->oa.o_cksum, aa->aa_requested_nob,
1528                                          aa->aa_page_count, aa->aa_ppga,
1529                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1530                         RETURN(-EAGAIN);
1531
1532                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1533                                      aa->aa_page_count, aa->aa_ppga);
1534                 GOTO(out, rc);
1535         }
1536
1537         /* The rest of this function executes only for OST_READs */
1538
1539         /* if unwrap_bulk failed, return -EAGAIN to retry */
1540         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1541         if (rc < 0)
1542                 GOTO(out, rc = -EAGAIN);
1543
1544         if (rc > aa->aa_requested_nob) {
1545                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1546                        aa->aa_requested_nob);
1547                 RETURN(-EPROTO);
1548         }
1549
1550         if (rc != req->rq_bulk->bd_nob_transferred) {
1551                 CERROR ("Unexpected rc %d (%d transferred)\n",
1552                         rc, req->rq_bulk->bd_nob_transferred);
1553                 return (-EPROTO);
1554         }
1555
1556         if (rc < aa->aa_requested_nob)
1557                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1558
1559         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1560                 static int cksum_counter;
1561                 __u32      server_cksum = body->oa.o_cksum;
1562                 char      *via;
1563                 char      *router;
1564                 cksum_type_t cksum_type;
1565
1566                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1567                                                body->oa.o_flags : 0);
1568                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1569                                                  aa->aa_ppga, OST_READ,
1570                                                  cksum_type);
1571
1572                 if (peer->nid == req->rq_bulk->bd_sender) {
1573                         via = router = "";
1574                 } else {
1575                         via = " via ";
1576                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1577                 }
1578
1579                 if (server_cksum != client_cksum) {
1580                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1581                                            "%s%s%s inode "DFID" object "DOSTID
1582                                            " extent ["LPU64"-"LPU64"]\n",
1583                                            req->rq_import->imp_obd->obd_name,
1584                                            libcfs_nid2str(peer->nid),
1585                                            via, router,
1586                                            body->oa.o_valid & OBD_MD_FLFID ?
1587                                                 body->oa.o_parent_seq : (__u64)0,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_parent_oid : 0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_ver : 0,
1592                                            POSTID(&body->oa.o_oi),
1593                                            aa->aa_ppga[0]->off,
1594                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1595                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1596                                                                         1);
1597                         CERROR("client %x, server %x, cksum_type %x\n",
1598                                client_cksum, server_cksum, cksum_type);
1599                         cksum_counter = 0;
1600                         aa->aa_oa->o_cksum = client_cksum;
1601                         rc = -EAGAIN;
1602                 } else {
1603                         cksum_counter++;
1604                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605                         rc = 0;
1606                 }
1607         } else if (unlikely(client_cksum)) {
1608                 static int cksum_missed;
1609
1610                 cksum_missed++;
1611                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612                         CERROR("Checksum %u requested from %s but not sent\n",
1613                                cksum_missed, libcfs_nid2str(peer->nid));
1614         } else {
1615                 rc = 0;
1616         }
1617 out:
1618         if (rc >= 0)
1619                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620                                      aa->aa_oa, &body->oa);
1621
1622         RETURN(rc);
1623 }
1624
1625 static int osc_brw_redo_request(struct ptlrpc_request *request,
1626                                 struct osc_brw_async_args *aa, int rc)
1627 {
1628         struct ptlrpc_request *new_req;
1629         struct osc_brw_async_args *new_aa;
1630         struct osc_async_page *oap;
1631         ENTRY;
1632
1633         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1634                   "redo for recoverable error %d", rc);
1635
1636         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1637                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1638                                   aa->aa_cli, aa->aa_oa,
1639                                   NULL /* lsm unused by osc currently */,
1640                                   aa->aa_page_count, aa->aa_ppga,
1641                                   &new_req, aa->aa_ocapa, 0, 1);
1642         if (rc)
1643                 RETURN(rc);
1644
1645         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1646                 if (oap->oap_request != NULL) {
1647                         LASSERTF(request == oap->oap_request,
1648                                  "request %p != oap_request %p\n",
1649                                  request, oap->oap_request);
1650                         if (oap->oap_interrupted) {
1651                                 ptlrpc_req_finished(new_req);
1652                                 RETURN(-EINTR);
1653                         }
1654                 }
1655         }
1656         /* New request takes over pga and oaps from old request.
1657          * Note that copying a list_head doesn't work, need to move it... */
1658         aa->aa_resends++;
1659         new_req->rq_interpret_reply = request->rq_interpret_reply;
1660         new_req->rq_async_args = request->rq_async_args;
1661         new_req->rq_commit_cb = request->rq_commit_cb;
1662         /* cap resend delay to the current request timeout, this is similar to
1663          * what ptlrpc does (see after_reply()) */
1664         if (aa->aa_resends > new_req->rq_timeout)
1665                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1666         else
1667                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1668         new_req->rq_generation_set = 1;
1669         new_req->rq_import_generation = request->rq_import_generation;
1670
1671         new_aa = ptlrpc_req_async_args(new_req);
1672
1673         INIT_LIST_HEAD(&new_aa->aa_oaps);
1674         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1675         INIT_LIST_HEAD(&new_aa->aa_exts);
1676         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1677         new_aa->aa_resends = aa->aa_resends;
1678
1679         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request) {
1681                         ptlrpc_req_finished(oap->oap_request);
1682                         oap->oap_request = ptlrpc_request_addref(new_req);
1683                 }
1684         }
1685
1686         new_aa->aa_ocapa = aa->aa_ocapa;
1687         aa->aa_ocapa = NULL;
1688
1689         /* XXX: This code will run into problem if we're going to support
1690          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1691          * and wait for all of them to be finished. We should inherit request
1692          * set from old request. */
1693         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1694
1695         DEBUG_REQ(D_INFO, new_req, "new request");
1696         RETURN(0);
1697 }
1698
1699 /*
1700  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1701  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1702  * fine for our small page arrays and doesn't require allocation.  its an
1703  * insertion sort that swaps elements that are strides apart, shrinking the
1704  * stride down until its '1' and the array is sorted.
1705  */
1706 static void sort_brw_pages(struct brw_page **array, int num)
1707 {
1708         int stride, i, j;
1709         struct brw_page *tmp;
1710
1711         if (num == 1)
1712                 return;
1713         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1714                 ;
1715
1716         do {
1717                 stride /= 3;
1718                 for (i = stride ; i < num ; i++) {
1719                         tmp = array[i];
1720                         j = i;
1721                         while (j >= stride && array[j - stride]->off > tmp->off) {
1722                                 array[j] = array[j - stride];
1723                                 j -= stride;
1724                         }
1725                         array[j] = tmp;
1726                 }
1727         } while (stride > 1);
1728 }
1729
1730 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1731 {
1732         LASSERT(ppga != NULL);
1733         OBD_FREE(ppga, sizeof(*ppga) * count);
1734 }
1735
1736 static int brw_interpret(const struct lu_env *env,
1737                          struct ptlrpc_request *req, void *data, int rc)
1738 {
1739         struct osc_brw_async_args *aa = data;
1740         struct osc_extent *ext;
1741         struct osc_extent *tmp;
1742         struct client_obd *cli = aa->aa_cli;
1743         ENTRY;
1744
1745         rc = osc_brw_fini_request(req, rc);
1746         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1747         /* When server return -EINPROGRESS, client should always retry
1748          * regardless of the number of times the bulk was resent already. */
1749         if (osc_recoverable_error(rc)) {
1750                 if (req->rq_import_generation !=
1751                     req->rq_import->imp_generation) {
1752                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1753                                ""DOSTID", rc = %d.\n",
1754                                req->rq_import->imp_obd->obd_name,
1755                                POSTID(&aa->aa_oa->o_oi), rc);
1756                 } else if (rc == -EINPROGRESS ||
1757                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1758                         rc = osc_brw_redo_request(req, aa, rc);
1759                 } else {
1760                         CERROR("%s: too many resent retries for object: "
1761                                ""LPU64":"LPU64", rc = %d.\n",
1762                                req->rq_import->imp_obd->obd_name,
1763                                POSTID(&aa->aa_oa->o_oi), rc);
1764                 }
1765
1766                 if (rc == 0)
1767                         RETURN(0);
1768                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1769                         rc = -EIO;
1770         }
1771
1772         if (aa->aa_ocapa) {
1773                 capa_put(aa->aa_ocapa);
1774                 aa->aa_ocapa = NULL;
1775         }
1776
1777         if (rc == 0) {
1778                 struct obdo *oa = aa->aa_oa;
1779                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1780                 unsigned long valid = 0;
1781                 struct cl_object *obj;
1782                 struct osc_async_page *last;
1783
1784                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1785                 obj = osc2cl(last->oap_obj);
1786
1787                 cl_object_attr_lock(obj);
1788                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1789                         attr->cat_blocks = oa->o_blocks;
1790                         valid |= CAT_BLOCKS;
1791                 }
1792                 if (oa->o_valid & OBD_MD_FLMTIME) {
1793                         attr->cat_mtime = oa->o_mtime;
1794                         valid |= CAT_MTIME;
1795                 }
1796                 if (oa->o_valid & OBD_MD_FLATIME) {
1797                         attr->cat_atime = oa->o_atime;
1798                         valid |= CAT_ATIME;
1799                 }
1800                 if (oa->o_valid & OBD_MD_FLCTIME) {
1801                         attr->cat_ctime = oa->o_ctime;
1802                         valid |= CAT_CTIME;
1803                 }
1804
1805                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1806                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1807                         loff_t last_off = last->oap_count + last->oap_obj_off +
1808                                 last->oap_page_off;
1809
1810                         /* Change file size if this is an out of quota or
1811                          * direct IO write and it extends the file size */
1812                         if (loi->loi_lvb.lvb_size < last_off) {
1813                                 attr->cat_size = last_off;
1814                                 valid |= CAT_SIZE;
1815                         }
1816                         /* Extend KMS if it's not a lockless write */
1817                         if (loi->loi_kms < last_off &&
1818                             oap2osc_page(last)->ops_srvlock == 0) {
1819                                 attr->cat_kms = last_off;
1820                                 valid |= CAT_KMS;
1821                         }
1822                 }
1823
1824                 if (valid != 0)
1825                         cl_object_attr_set(env, obj, attr, valid);
1826                 cl_object_attr_unlock(obj);
1827         }
1828         OBDO_FREE(aa->aa_oa);
1829
1830         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1831                 osc_inc_unstable_pages(req);
1832
1833         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1834                 list_del_init(&ext->oe_link);
1835                 osc_extent_finish(env, ext, 1, rc);
1836         }
1837         LASSERT(list_empty(&aa->aa_exts));
1838         LASSERT(list_empty(&aa->aa_oaps));
1839
1840         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1841                           req->rq_bulk->bd_nob_transferred);
1842         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1843         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1844
1845         client_obd_list_lock(&cli->cl_loi_list_lock);
1846         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1847          * is called so we know whether to go to sync BRWs or wait for more
1848          * RPCs to complete */
1849         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1850                 cli->cl_w_in_flight--;
1851         else
1852                 cli->cl_r_in_flight--;
1853         osc_wake_cache_waiters(cli);
1854         client_obd_list_unlock(&cli->cl_loi_list_lock);
1855
1856         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1857         RETURN(rc);
1858 }
1859
1860 static void brw_commit(struct ptlrpc_request *req)
1861 {
1862         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1863          * this called via the rq_commit_cb, I need to ensure
1864          * osc_dec_unstable_pages is still called. Otherwise unstable
1865          * pages may be leaked. */
1866         spin_lock(&req->rq_lock);
1867         if (likely(req->rq_unstable)) {
1868                 req->rq_unstable = 0;
1869                 spin_unlock(&req->rq_lock);
1870
1871                 osc_dec_unstable_pages(req);
1872         } else {
1873                 req->rq_committed = 1;
1874                 spin_unlock(&req->rq_lock);
1875         }
1876 }
1877
1878 /**
1879  * Build an RPC by the list of extent @ext_list. The caller must ensure
1880  * that the total pages in this list are NOT over max pages per RPC.
1881  * Extents in the list must be in OES_RPC state.
1882  */
1883 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1884                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1885 {
1886         struct ptlrpc_request           *req = NULL;
1887         struct osc_extent               *ext;
1888         struct brw_page                 **pga = NULL;
1889         struct osc_brw_async_args       *aa = NULL;
1890         struct obdo                     *oa = NULL;
1891         struct osc_async_page           *oap;
1892         struct osc_async_page           *tmp;
1893         struct cl_req                   *clerq = NULL;
1894         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1895                                                                       CRT_READ;
1896         struct cl_req_attr              *crattr = NULL;
1897         obd_off                         starting_offset = OBD_OBJECT_EOF;
1898         obd_off                         ending_offset = 0;
1899         int                             mpflag = 0;
1900         int                             mem_tight = 0;
1901         int                             page_count = 0;
1902         bool                            soft_sync = false;
1903         int                             i;
1904         int                             rc;
1905         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1906
1907         ENTRY;
1908         LASSERT(!list_empty(ext_list));
1909
1910         /* add pages into rpc_list to build BRW rpc */
1911         list_for_each_entry(ext, ext_list, oe_link) {
1912                 LASSERT(ext->oe_state == OES_RPC);
1913                 mem_tight |= ext->oe_memalloc;
1914                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1915                         ++page_count;
1916                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1917                         if (starting_offset > oap->oap_obj_off)
1918                                 starting_offset = oap->oap_obj_off;
1919                         else
1920                                 LASSERT(oap->oap_page_off == 0);
1921                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1922                                 ending_offset = oap->oap_obj_off +
1923                                                 oap->oap_count;
1924                         else
1925                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1926                                         PAGE_CACHE_SIZE);
1927                 }
1928         }
1929
1930         soft_sync = osc_over_unstable_soft_limit(cli);
1931         if (mem_tight)
1932                 mpflag = cfs_memory_pressure_get_and_set();
1933
1934         OBD_ALLOC(crattr, sizeof(*crattr));
1935         if (crattr == NULL)
1936                 GOTO(out, rc = -ENOMEM);
1937
1938         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1939         if (pga == NULL)
1940                 GOTO(out, rc = -ENOMEM);
1941
1942         OBDO_ALLOC(oa);
1943         if (oa == NULL)
1944                 GOTO(out, rc = -ENOMEM);
1945
1946         i = 0;
1947         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1948                 struct cl_page *page = oap2cl_page(oap);
1949                 if (clerq == NULL) {
1950                         clerq = cl_req_alloc(env, page, crt,
1951                                              1 /* only 1-object rpcs for now */);
1952                         if (IS_ERR(clerq))
1953                                 GOTO(out, rc = PTR_ERR(clerq));
1954                 }
1955                 if (mem_tight)
1956                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1957                 if (soft_sync)
1958                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1959                 pga[i] = &oap->oap_brw_page;
1960                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1961                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1962                        pga[i]->pg, page_index(oap->oap_page), oap,
1963                        pga[i]->flag);
1964                 i++;
1965                 cl_req_page_add(env, clerq, page);
1966         }
1967
1968         /* always get the data for the obdo for the rpc */
1969         LASSERT(clerq != NULL);
1970         crattr->cra_oa = oa;
1971         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1972
1973         rc = cl_req_prep(env, clerq);
1974         if (rc != 0) {
1975                 CERROR("cl_req_prep failed: %d\n", rc);
1976                 GOTO(out, rc);
1977         }
1978
1979         sort_brw_pages(pga, page_count);
1980         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1981                         pga, &req, crattr->cra_capa, 1, 0);
1982         if (rc != 0) {
1983                 CERROR("prep_req failed: %d\n", rc);
1984                 GOTO(out, rc);
1985         }
1986
1987         req->rq_commit_cb = brw_commit;
1988         req->rq_interpret_reply = brw_interpret;
1989
1990         if (mem_tight != 0)
1991                 req->rq_memalloc = 1;
1992
1993         /* Need to update the timestamps after the request is built in case
1994          * we race with setattr (locally or in queue at OST).  If OST gets
1995          * later setattr before earlier BRW (as determined by the request xid),
1996          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1997          * way to do this in a single call.  bug 10150 */
1998         cl_req_attr_set(env, clerq, crattr,
1999                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2000
2001         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2002
2003         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2004         aa = ptlrpc_req_async_args(req);
2005         INIT_LIST_HEAD(&aa->aa_oaps);
2006         list_splice_init(&rpc_list, &aa->aa_oaps);
2007         INIT_LIST_HEAD(&aa->aa_exts);
2008         list_splice_init(ext_list, &aa->aa_exts);
2009         aa->aa_clerq = clerq;
2010
2011         /* queued sync pages can be torn down while the pages
2012          * were between the pending list and the rpc */
2013         tmp = NULL;
2014         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2015                 /* only one oap gets a request reference */
2016                 if (tmp == NULL)
2017                         tmp = oap;
2018                 if (oap->oap_interrupted && !req->rq_intr) {
2019                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2020                                         oap, req);
2021                         ptlrpc_mark_interrupted(req);
2022                 }
2023         }
2024         if (tmp != NULL)
2025                 tmp->oap_request = ptlrpc_request_addref(req);
2026
2027         client_obd_list_lock(&cli->cl_loi_list_lock);
2028         starting_offset >>= PAGE_CACHE_SHIFT;
2029         if (cmd == OBD_BRW_READ) {
2030                 cli->cl_r_in_flight++;
2031                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2032                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2033                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2034                                       starting_offset + 1);
2035         } else {
2036                 cli->cl_w_in_flight++;
2037                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2038                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2039                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2040                                       starting_offset + 1);
2041         }
2042         client_obd_list_unlock(&cli->cl_loi_list_lock);
2043
2044         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2045                   page_count, aa, cli->cl_r_in_flight,
2046                   cli->cl_w_in_flight);
2047
2048         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2049          * see which CPU/NUMA node the majority of pages were allocated
2050          * on, and try to assign the async RPC to the CPU core
2051          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2052          *
2053          * But on the other hand, we expect that multiple ptlrpcd
2054          * threads and the initial write sponsor can run in parallel,
2055          * especially when data checksum is enabled, which is CPU-bound
2056          * operation and single ptlrpcd thread cannot process in time.
2057          * So more ptlrpcd threads sharing BRW load
2058          * (with PDL_POLICY_ROUND) seems better.
2059          */
2060         ptlrpcd_add_req(req, pol, -1);
2061         rc = 0;
2062         EXIT;
2063
2064 out:
2065         if (mem_tight != 0)
2066                 cfs_memory_pressure_restore(mpflag);
2067
2068         if (crattr != NULL) {
2069                 capa_put(crattr->cra_capa);
2070                 OBD_FREE(crattr, sizeof(*crattr));
2071         }
2072
2073         if (rc != 0) {
2074                 LASSERT(req == NULL);
2075
2076                 if (oa)
2077                         OBDO_FREE(oa);
2078                 if (pga)
2079                         OBD_FREE(pga, sizeof(*pga) * page_count);
2080                 /* this should happen rarely and is pretty bad, it makes the
2081                  * pending list not follow the dirty order */
2082                 while (!list_empty(ext_list)) {
2083                         ext = list_entry(ext_list->next, struct osc_extent,
2084                                          oe_link);
2085                         list_del_init(&ext->oe_link);
2086                         osc_extent_finish(env, ext, 0, rc);
2087                 }
2088                 if (clerq && !IS_ERR(clerq))
2089                         cl_req_completion(env, clerq, rc);
2090         }
2091         RETURN(rc);
2092 }
2093
2094 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2095                                         struct ldlm_enqueue_info *einfo)
2096 {
2097         void *data = einfo->ei_cbdata;
2098         int set = 0;
2099
2100         LASSERT(lock != NULL);
2101         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2102         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2103         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2104         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2105
2106         lock_res_and_lock(lock);
2107         spin_lock(&osc_ast_guard);
2108
2109         if (lock->l_ast_data == NULL)
2110                 lock->l_ast_data = data;
2111         if (lock->l_ast_data == data)
2112                 set = 1;
2113
2114         spin_unlock(&osc_ast_guard);
2115         unlock_res_and_lock(lock);
2116
2117         return set;
2118 }
2119
2120 static int osc_set_data_with_check(struct lustre_handle *lockh,
2121                                    struct ldlm_enqueue_info *einfo)
2122 {
2123         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2124         int set = 0;
2125
2126         if (lock != NULL) {
2127                 set = osc_set_lock_data_with_check(lock, einfo);
2128                 LDLM_LOCK_PUT(lock);
2129         } else
2130                 CERROR("lockh %p, data %p - client evicted?\n",
2131                        lockh, einfo->ei_cbdata);
2132         return set;
2133 }
2134
2135 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2136                              ldlm_iterator_t replace, void *data)
2137 {
2138         struct ldlm_res_id res_id;
2139         struct obd_device *obd = class_exp2obd(exp);
2140
2141         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2142         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2143         return 0;
2144 }
2145
2146 /* find any ldlm lock of the inode in osc
2147  * return 0    not find
2148  *        1    find one
2149  *      < 0    error */
2150 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2151                            ldlm_iterator_t replace, void *data)
2152 {
2153         struct ldlm_res_id res_id;
2154         struct obd_device *obd = class_exp2obd(exp);
2155         int rc = 0;
2156
2157         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2158         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2159         if (rc == LDLM_ITER_STOP)
2160                 return(1);
2161         if (rc == LDLM_ITER_CONTINUE)
2162                 return(0);
2163         return(rc);
2164 }
2165
2166 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2167                             obd_enqueue_update_f upcall, void *cookie,
2168                             __u64 *flags, int agl, int rc)
2169 {
2170         int intent = *flags & LDLM_FL_HAS_INTENT;
2171         ENTRY;
2172
2173         if (intent) {
2174                 /* The request was created before ldlm_cli_enqueue call. */
2175                 if (rc == ELDLM_LOCK_ABORTED) {
2176                         struct ldlm_reply *rep;
2177                         rep = req_capsule_server_get(&req->rq_pill,
2178                                                      &RMF_DLM_REP);
2179
2180                         LASSERT(rep != NULL);
2181                         rep->lock_policy_res1 =
2182                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2183                         if (rep->lock_policy_res1)
2184                                 rc = rep->lock_policy_res1;
2185                 }
2186         }
2187
2188         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2189             (rc == 0)) {
2190                 *flags |= LDLM_FL_LVB_READY;
2191                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2192                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2193         }
2194
2195         /* Call the update callback. */
2196         rc = (*upcall)(cookie, rc);
2197         RETURN(rc);
2198 }
2199
2200 static int osc_enqueue_interpret(const struct lu_env *env,
2201                                  struct ptlrpc_request *req,
2202                                  struct osc_enqueue_args *aa, int rc)
2203 {
2204         struct ldlm_lock *lock;
2205         struct lustre_handle handle;
2206         __u32 mode;
2207         struct ost_lvb *lvb;
2208         __u32 lvb_len;
2209         __u64 *flags = aa->oa_flags;
2210
2211         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2212          * might be freed anytime after lock upcall has been called. */
2213         lustre_handle_copy(&handle, aa->oa_lockh);
2214         mode = aa->oa_ei->ei_mode;
2215
2216         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2217          * be valid. */
2218         lock = ldlm_handle2lock(&handle);
2219
2220         /* Take an additional reference so that a blocking AST that
2221          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2222          * to arrive after an upcall has been executed by
2223          * osc_enqueue_fini(). */
2224         ldlm_lock_addref(&handle, mode);
2225
2226         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2227         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2228
2229         /* Let CP AST to grant the lock first. */
2230         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2231
2232         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2233                 lvb = NULL;
2234                 lvb_len = 0;
2235         } else {
2236                 lvb = aa->oa_lvb;
2237                 lvb_len = sizeof(*aa->oa_lvb);
2238         }
2239
2240         /* Complete obtaining the lock procedure. */
2241         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2242                                    mode, flags, lvb, lvb_len, &handle, rc);
2243         /* Complete osc stuff. */
2244         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2245                               flags, aa->oa_agl, rc);
2246
2247         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2248
2249         /* Release the lock for async request. */
2250         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2251                 /*
2252                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2253                  * not already released by
2254                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2255                  */
2256                 ldlm_lock_decref(&handle, mode);
2257
2258         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2259                  aa->oa_lockh, req, aa);
2260         ldlm_lock_decref(&handle, mode);
2261         LDLM_LOCK_PUT(lock);
2262         return rc;
2263 }
2264
2265 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2266
2267 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2268  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2269  * other synchronous requests, however keeping some locks and trying to obtain
2270  * others may take a considerable amount of time in a case of ost failure; and
2271  * when other sync requests do not get released lock from a client, the client
2272  * is excluded from the cluster -- such scenarious make the life difficult, so
2273  * release locks just after they are obtained. */
2274 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2275                      __u64 *flags, ldlm_policy_data_t *policy,
2276                      struct ost_lvb *lvb, int kms_valid,
2277                      obd_enqueue_update_f upcall, void *cookie,
2278                      struct ldlm_enqueue_info *einfo,
2279                      struct lustre_handle *lockh,
2280                      struct ptlrpc_request_set *rqset, int async, int agl)
2281 {
2282         struct obd_device *obd = exp->exp_obd;
2283         struct ptlrpc_request *req = NULL;
2284         int intent = *flags & LDLM_FL_HAS_INTENT;
2285         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2286         ldlm_mode_t mode;
2287         int rc;
2288         ENTRY;
2289
2290         /* Filesystem lock extents are extended to page boundaries so that
2291          * dealing with the page cache is a little smoother.  */
2292         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2293         policy->l_extent.end |= ~CFS_PAGE_MASK;
2294
2295         /*
2296          * kms is not valid when either object is completely fresh (so that no
2297          * locks are cached), or object was evicted. In the latter case cached
2298          * lock cannot be used, because it would prime inode state with
2299          * potentially stale LVB.
2300          */
2301         if (!kms_valid)
2302                 goto no_match;
2303
2304         /* Next, search for already existing extent locks that will cover us */
2305         /* If we're trying to read, we also search for an existing PW lock.  The
2306          * VFS and page cache already protect us locally, so lots of readers/
2307          * writers can share a single PW lock.
2308          *
2309          * There are problems with conversion deadlocks, so instead of
2310          * converting a read lock to a write lock, we'll just enqueue a new
2311          * one.
2312          *
2313          * At some point we should cancel the read lock instead of making them
2314          * send us a blocking callback, but there are problems with canceling
2315          * locks out from other users right now, too. */
2316         mode = einfo->ei_mode;
2317         if (einfo->ei_mode == LCK_PR)
2318                 mode |= LCK_PW;
2319         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2320                                einfo->ei_type, policy, mode, lockh, 0);
2321         if (mode) {
2322                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2323
2324                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2325                         /* For AGL, if enqueue RPC is sent but the lock is not
2326                          * granted, then skip to process this strpe.
2327                          * Return -ECANCELED to tell the caller. */
2328                         ldlm_lock_decref(lockh, mode);
2329                         LDLM_LOCK_PUT(matched);
2330                         RETURN(-ECANCELED);
2331                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2332                         *flags |= LDLM_FL_LVB_READY;
2333                         /* addref the lock only if not async requests and PW
2334                          * lock is matched whereas we asked for PR. */
2335                         if (!rqset && einfo->ei_mode != mode)
2336                                 ldlm_lock_addref(lockh, LCK_PR);
2337                         if (intent) {
2338                                 /* I would like to be able to ASSERT here that
2339                                  * rss <= kms, but I can't, for reasons which
2340                                  * are explained in lov_enqueue() */
2341                         }
2342
2343                         /* We already have a lock, and it's referenced.
2344                          *
2345                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2346                          * AGL upcall may change it to CLS_HELD directly. */
2347                         (*upcall)(cookie, ELDLM_OK);
2348
2349                         if (einfo->ei_mode != mode)
2350                                 ldlm_lock_decref(lockh, LCK_PW);
2351                         else if (rqset)
2352                                 /* For async requests, decref the lock. */
2353                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2354                         LDLM_LOCK_PUT(matched);
2355                         RETURN(ELDLM_OK);
2356                 } else {
2357                         ldlm_lock_decref(lockh, mode);
2358                         LDLM_LOCK_PUT(matched);
2359                 }
2360         }
2361
2362  no_match:
2363         if (intent) {
2364                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2365                                            &RQF_LDLM_ENQUEUE_LVB);
2366                 if (req == NULL)
2367                         RETURN(-ENOMEM);
2368
2369                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2370                 if (rc < 0) {
2371                         ptlrpc_request_free(req);
2372                         RETURN(rc);
2373                 }
2374
2375                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2376                                      sizeof *lvb);
2377                 ptlrpc_request_set_replen(req);
2378         }
2379
2380         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2381         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2382
2383         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2384                               sizeof(*lvb), LVB_T_OST, lockh, async);
2385         if (rqset) {
2386                 if (!rc) {
2387                         struct osc_enqueue_args *aa;
2388                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2389                         aa = ptlrpc_req_async_args(req);
2390                         aa->oa_ei = einfo;
2391                         aa->oa_exp = exp;
2392                         aa->oa_flags  = flags;
2393                         aa->oa_upcall = upcall;
2394                         aa->oa_cookie = cookie;
2395                         aa->oa_lvb    = lvb;
2396                         aa->oa_lockh  = lockh;
2397                         aa->oa_agl    = !!agl;
2398
2399                         req->rq_interpret_reply =
2400                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2401                         if (rqset == PTLRPCD_SET)
2402                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2403                         else
2404                                 ptlrpc_set_add_req(rqset, req);
2405                 } else if (intent) {
2406                         ptlrpc_req_finished(req);
2407                 }
2408                 RETURN(rc);
2409         }
2410
2411         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2412         if (intent)
2413                 ptlrpc_req_finished(req);
2414
2415         RETURN(rc);
2416 }
2417
2418 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2419                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2420                    __u64 *flags, void *data, struct lustre_handle *lockh,
2421                    int unref)
2422 {
2423         struct obd_device *obd = exp->exp_obd;
2424         __u64 lflags = *flags;
2425         ldlm_mode_t rc;
2426         ENTRY;
2427
2428         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2429                 RETURN(-EIO);
2430
2431         /* Filesystem lock extents are extended to page boundaries so that
2432          * dealing with the page cache is a little smoother */
2433         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2434         policy->l_extent.end |= ~CFS_PAGE_MASK;
2435
2436         /* Next, search for already existing extent locks that will cover us */
2437         /* If we're trying to read, we also search for an existing PW lock.  The
2438          * VFS and page cache already protect us locally, so lots of readers/
2439          * writers can share a single PW lock. */
2440         rc = mode;
2441         if (mode == LCK_PR)
2442                 rc |= LCK_PW;
2443         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2444                              res_id, type, policy, rc, lockh, unref);
2445         if (rc) {
2446                 if (data != NULL) {
2447                         if (!osc_set_data_with_check(lockh, data)) {
2448                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2449                                         ldlm_lock_decref(lockh, rc);
2450                                 RETURN(0);
2451                         }
2452                 }
2453                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2454                         ldlm_lock_addref(lockh, LCK_PR);
2455                         ldlm_lock_decref(lockh, LCK_PW);
2456                 }
2457                 RETURN(rc);
2458         }
2459         RETURN(rc);
2460 }
2461
2462 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2463 {
2464         ENTRY;
2465
2466         if (unlikely(mode == LCK_GROUP))
2467                 ldlm_lock_decref_and_cancel(lockh, mode);
2468         else
2469                 ldlm_lock_decref(lockh, mode);
2470
2471         RETURN(0);
2472 }
2473
2474 static int osc_statfs_interpret(const struct lu_env *env,
2475                                 struct ptlrpc_request *req,
2476                                 struct osc_async_args *aa, int rc)
2477 {
2478         struct obd_statfs *msfs;
2479         ENTRY;
2480
2481         if (rc == -EBADR)
2482                 /* The request has in fact never been sent
2483                  * due to issues at a higher level (LOV).
2484                  * Exit immediately since the caller is
2485                  * aware of the problem and takes care
2486                  * of the clean up */
2487                  RETURN(rc);
2488
2489         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2490             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2491                 GOTO(out, rc = 0);
2492
2493         if (rc != 0)
2494                 GOTO(out, rc);
2495
2496         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2497         if (msfs == NULL) {
2498                 GOTO(out, rc = -EPROTO);
2499         }
2500
2501         *aa->aa_oi->oi_osfs = *msfs;
2502 out:
2503         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2504         RETURN(rc);
2505 }
2506
2507 static int osc_statfs_async(struct obd_export *exp,
2508                             struct obd_info *oinfo, __u64 max_age,
2509                             struct ptlrpc_request_set *rqset)
2510 {
2511         struct obd_device     *obd = class_exp2obd(exp);
2512         struct ptlrpc_request *req;
2513         struct osc_async_args *aa;
2514         int                    rc;
2515         ENTRY;
2516
2517         /* We could possibly pass max_age in the request (as an absolute
2518          * timestamp or a "seconds.usec ago") so the target can avoid doing
2519          * extra calls into the filesystem if that isn't necessary (e.g.
2520          * during mount that would help a bit).  Having relative timestamps
2521          * is not so great if request processing is slow, while absolute
2522          * timestamps are not ideal because they need time synchronization. */
2523         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2524         if (req == NULL)
2525                 RETURN(-ENOMEM);
2526
2527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2528         if (rc) {
2529                 ptlrpc_request_free(req);
2530                 RETURN(rc);
2531         }
2532         ptlrpc_request_set_replen(req);
2533         req->rq_request_portal = OST_CREATE_PORTAL;
2534         ptlrpc_at_set_req_timeout(req);
2535
2536         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2537                 /* procfs requests not want stat in wait for avoid deadlock */
2538                 req->rq_no_resend = 1;
2539                 req->rq_no_delay = 1;
2540         }
2541
2542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2543         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2544         aa = ptlrpc_req_async_args(req);
2545         aa->aa_oi = oinfo;
2546
2547         ptlrpc_set_add_req(rqset, req);
2548         RETURN(0);
2549 }
2550
2551 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2552                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2553 {
2554         struct obd_device     *obd = class_exp2obd(exp);
2555         struct obd_statfs     *msfs;
2556         struct ptlrpc_request *req;
2557         struct obd_import     *imp = NULL;
2558         int rc;
2559         ENTRY;
2560
2561         /*Since the request might also come from lprocfs, so we need
2562          *sync this with client_disconnect_export Bug15684*/
2563         down_read(&obd->u.cli.cl_sem);
2564         if (obd->u.cli.cl_import)
2565                 imp = class_import_get(obd->u.cli.cl_import);
2566         up_read(&obd->u.cli.cl_sem);
2567         if (!imp)
2568                 RETURN(-ENODEV);
2569
2570         /* We could possibly pass max_age in the request (as an absolute
2571          * timestamp or a "seconds.usec ago") so the target can avoid doing
2572          * extra calls into the filesystem if that isn't necessary (e.g.
2573          * during mount that would help a bit).  Having relative timestamps
2574          * is not so great if request processing is slow, while absolute
2575          * timestamps are not ideal because they need time synchronization. */
2576         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2577
2578         class_import_put(imp);
2579
2580         if (req == NULL)
2581                 RETURN(-ENOMEM);
2582
2583         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2584         if (rc) {
2585                 ptlrpc_request_free(req);
2586                 RETURN(rc);
2587         }
2588         ptlrpc_request_set_replen(req);
2589         req->rq_request_portal = OST_CREATE_PORTAL;
2590         ptlrpc_at_set_req_timeout(req);
2591
2592         if (flags & OBD_STATFS_NODELAY) {
2593                 /* procfs requests not want stat in wait for avoid deadlock */
2594                 req->rq_no_resend = 1;
2595                 req->rq_no_delay = 1;
2596         }
2597
2598         rc = ptlrpc_queue_wait(req);
2599         if (rc)
2600                 GOTO(out, rc);
2601
2602         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2603         if (msfs == NULL) {
2604                 GOTO(out, rc = -EPROTO);
2605         }
2606
2607         *osfs = *msfs;
2608
2609         EXIT;
2610  out:
2611         ptlrpc_req_finished(req);
2612         return rc;
2613 }
2614
2615 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2616                          void *karg, void *uarg)
2617 {
2618         struct obd_device *obd = exp->exp_obd;
2619         struct obd_ioctl_data *data = karg;
2620         int err = 0;
2621         ENTRY;
2622
2623         if (!try_module_get(THIS_MODULE)) {
2624                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2625                        module_name(THIS_MODULE));
2626                 return -EINVAL;
2627         }
2628         switch (cmd) {
2629         case OBD_IOC_CLIENT_RECOVER:
2630                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2631                                             data->ioc_inlbuf1, 0);
2632                 if (err > 0)
2633                         err = 0;
2634                 GOTO(out, err);
2635         case IOC_OSC_SET_ACTIVE:
2636                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2637                                                data->ioc_offset);
2638                 GOTO(out, err);
2639         case OBD_IOC_POLL_QUOTACHECK:
2640                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2641                 GOTO(out, err);
2642         case OBD_IOC_PING_TARGET:
2643                 err = ptlrpc_obd_ping(obd);
2644                 GOTO(out, err);
2645         default:
2646                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2647                        cmd, current_comm());
2648                 GOTO(out, err = -ENOTTY);
2649         }
2650 out:
2651         module_put(THIS_MODULE);
2652         return err;
2653 }
2654
2655 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2656                         obd_count keylen, void *key, __u32 *vallen, void *val,
2657                         struct lov_stripe_md *lsm)
2658 {
2659         ENTRY;
2660         if (!vallen || !val)
2661                 RETURN(-EFAULT);
2662
2663         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2664                 __u32 *stripe = val;
2665                 *vallen = sizeof(*stripe);
2666                 *stripe = 0;
2667                 RETURN(0);
2668         } else if (KEY_IS(KEY_LAST_ID)) {
2669                 struct ptlrpc_request *req;
2670                 obd_id                *reply;
2671                 char                  *tmp;
2672                 int                    rc;
2673
2674                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2675                                            &RQF_OST_GET_INFO_LAST_ID);
2676                 if (req == NULL)
2677                         RETURN(-ENOMEM);
2678
2679                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2680                                      RCL_CLIENT, keylen);
2681                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2682                 if (rc) {
2683                         ptlrpc_request_free(req);
2684                         RETURN(rc);
2685                 }
2686
2687                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2688                 memcpy(tmp, key, keylen);
2689
2690                 req->rq_no_delay = req->rq_no_resend = 1;
2691                 ptlrpc_request_set_replen(req);
2692                 rc = ptlrpc_queue_wait(req);
2693                 if (rc)
2694                         GOTO(out, rc);
2695
2696                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2697                 if (reply == NULL)
2698                         GOTO(out, rc = -EPROTO);
2699
2700                 *((obd_id *)val) = *reply;
2701         out:
2702                 ptlrpc_req_finished(req);
2703                 RETURN(rc);
2704         } else if (KEY_IS(KEY_FIEMAP)) {
2705                 struct ll_fiemap_info_key *fm_key =
2706                                 (struct ll_fiemap_info_key *)key;
2707                 struct ldlm_res_id       res_id;
2708                 ldlm_policy_data_t       policy;
2709                 struct lustre_handle     lockh;
2710                 ldlm_mode_t              mode = 0;
2711                 struct ptlrpc_request   *req;
2712                 struct ll_user_fiemap   *reply;
2713                 char                    *tmp;
2714                 int                      rc;
2715
2716                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2717                         goto skip_locking;
2718
2719                 policy.l_extent.start = fm_key->fiemap.fm_start &
2720                                                 CFS_PAGE_MASK;
2721
2722                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2723                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2724                         policy.l_extent.end = OBD_OBJECT_EOF;
2725                 else
2726                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2727                                 fm_key->fiemap.fm_length +
2728                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2729
2730                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2731                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2732                                        LDLM_FL_BLOCK_GRANTED |
2733                                        LDLM_FL_LVB_READY,
2734                                        &res_id, LDLM_EXTENT, &policy,
2735                                        LCK_PR | LCK_PW, &lockh, 0);
2736                 if (mode) { /* lock is cached on client */
2737                         if (mode != LCK_PR) {
2738                                 ldlm_lock_addref(&lockh, LCK_PR);
2739                                 ldlm_lock_decref(&lockh, LCK_PW);
2740                         }
2741                 } else { /* no cached lock, needs acquire lock on server side */
2742                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2743                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2744                 }
2745
2746 skip_locking:
2747                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2748                                            &RQF_OST_GET_INFO_FIEMAP);
2749                 if (req == NULL)
2750                         GOTO(drop_lock, rc = -ENOMEM);
2751
2752                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2753                                      RCL_CLIENT, keylen);
2754                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2755                                      RCL_CLIENT, *vallen);
2756                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2757                                      RCL_SERVER, *vallen);
2758
2759                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2760                 if (rc) {
2761                         ptlrpc_request_free(req);
2762                         GOTO(drop_lock, rc);
2763                 }
2764
2765                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2766                 memcpy(tmp, key, keylen);
2767                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2768                 memcpy(tmp, val, *vallen);
2769
2770                 ptlrpc_request_set_replen(req);
2771                 rc = ptlrpc_queue_wait(req);
2772                 if (rc)
2773                         GOTO(fini_req, rc);
2774
2775                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2776                 if (reply == NULL)
2777                         GOTO(fini_req, rc = -EPROTO);
2778
2779                 memcpy(val, reply, *vallen);
2780 fini_req:
2781                 ptlrpc_req_finished(req);
2782 drop_lock:
2783                 if (mode)
2784                         ldlm_lock_decref(&lockh, LCK_PR);
2785                 RETURN(rc);
2786         }
2787
2788         RETURN(-EINVAL);
2789 }
2790
2791 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2792                               obd_count keylen, void *key, obd_count vallen,
2793                               void *val, struct ptlrpc_request_set *set)
2794 {
2795         struct ptlrpc_request *req;
2796         struct obd_device     *obd = exp->exp_obd;
2797         struct obd_import     *imp = class_exp2cliimp(exp);
2798         char                  *tmp;
2799         int                    rc;
2800         ENTRY;
2801
2802         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2803
2804         if (KEY_IS(KEY_CHECKSUM)) {
2805                 if (vallen != sizeof(int))
2806                         RETURN(-EINVAL);
2807                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2808                 RETURN(0);
2809         }
2810
2811         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2812                 sptlrpc_conf_client_adapt(obd);
2813                 RETURN(0);
2814         }
2815
2816         if (KEY_IS(KEY_FLUSH_CTX)) {
2817                 sptlrpc_import_flush_my_ctx(imp);
2818                 RETURN(0);
2819         }
2820
2821         if (KEY_IS(KEY_CACHE_SET)) {
2822                 struct client_obd *cli = &obd->u.cli;
2823
2824                 LASSERT(cli->cl_cache == NULL); /* only once */
2825                 cli->cl_cache = (struct cl_client_cache *)val;
2826                 atomic_inc(&cli->cl_cache->ccc_users);
2827                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2828
2829                 /* add this osc into entity list */
2830                 LASSERT(list_empty(&cli->cl_lru_osc));
2831                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2832                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2833                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2834
2835                 RETURN(0);
2836         }
2837
2838         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2839                 struct client_obd *cli = &obd->u.cli;
2840                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2841                 long target = *(long *)val;
2842
2843                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2844                 *(long *)val -= nr;
2845                 RETURN(0);
2846         }
2847
2848         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2849                 RETURN(-EINVAL);
2850
2851         /* We pass all other commands directly to OST. Since nobody calls osc
2852            methods directly and everybody is supposed to go through LOV, we
2853            assume lov checked invalid values for us.
2854            The only recognised values so far are evict_by_nid and mds_conn.
2855            Even if something bad goes through, we'd get a -EINVAL from OST
2856            anyway. */
2857
2858         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2859                                                 &RQF_OST_SET_GRANT_INFO :
2860                                                 &RQF_OBD_SET_INFO);
2861         if (req == NULL)
2862                 RETURN(-ENOMEM);
2863
2864         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2865                              RCL_CLIENT, keylen);
2866         if (!KEY_IS(KEY_GRANT_SHRINK))
2867                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2868                                      RCL_CLIENT, vallen);
2869         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2870         if (rc) {
2871                 ptlrpc_request_free(req);
2872                 RETURN(rc);
2873         }
2874
2875         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2876         memcpy(tmp, key, keylen);
2877         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2878                                                         &RMF_OST_BODY :
2879                                                         &RMF_SETINFO_VAL);
2880         memcpy(tmp, val, vallen);
2881
2882         if (KEY_IS(KEY_GRANT_SHRINK)) {
2883                 struct osc_grant_args *aa;
2884                 struct obdo *oa;
2885
2886                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2887                 aa = ptlrpc_req_async_args(req);
2888                 OBDO_ALLOC(oa);
2889                 if (!oa) {
2890                         ptlrpc_req_finished(req);
2891                         RETURN(-ENOMEM);
2892                 }
2893                 *oa = ((struct ost_body *)val)->oa;
2894                 aa->aa_oa = oa;
2895                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2896         }
2897
2898         ptlrpc_request_set_replen(req);
2899         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2900                 LASSERT(set != NULL);
2901                 ptlrpc_set_add_req(set, req);
2902                 ptlrpc_check_set(NULL, set);
2903         } else
2904                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2905
2906         RETURN(0);
2907 }
2908
2909 static int osc_reconnect(const struct lu_env *env,
2910                          struct obd_export *exp, struct obd_device *obd,
2911                          struct obd_uuid *cluuid,
2912                          struct obd_connect_data *data,
2913                          void *localdata)
2914 {
2915         struct client_obd *cli = &obd->u.cli;
2916
2917         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2918                 long lost_grant;
2919
2920                 client_obd_list_lock(&cli->cl_loi_list_lock);
2921                 data->ocd_grant = (cli->cl_avail_grant +
2922                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2923                                   2 * cli_brw_size(obd);
2924                 lost_grant = cli->cl_lost_grant;
2925                 cli->cl_lost_grant = 0;
2926                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2927
2928                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2929                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2930                        data->ocd_version, data->ocd_grant, lost_grant);
2931         }
2932
2933         RETURN(0);
2934 }
2935
2936 static int osc_disconnect(struct obd_export *exp)
2937 {
2938         struct obd_device *obd = class_exp2obd(exp);
2939         struct llog_ctxt  *ctxt;
2940         int rc;
2941
2942         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2943         if (ctxt) {
2944                 if (obd->u.cli.cl_conn_count == 1) {
2945                         /* Flush any remaining cancel messages out to the
2946                          * target */
2947                         llog_sync(ctxt, exp, 0);
2948                 }
2949                 llog_ctxt_put(ctxt);
2950         } else {
2951                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2952                        obd);
2953         }
2954
2955         rc = client_disconnect_export(exp);
2956         /**
2957          * Initially we put del_shrink_grant before disconnect_export, but it
2958          * causes the following problem if setup (connect) and cleanup
2959          * (disconnect) are tangled together.
2960          *      connect p1                     disconnect p2
2961          *   ptlrpc_connect_import
2962          *     ...............               class_manual_cleanup
2963          *                                     osc_disconnect
2964          *                                     del_shrink_grant
2965          *   ptlrpc_connect_interrupt
2966          *     init_grant_shrink
2967          *   add this client to shrink list
2968          *                                      cleanup_osc
2969          * Bang! pinger trigger the shrink.
2970          * So the osc should be disconnected from the shrink list, after we
2971          * are sure the import has been destroyed. BUG18662
2972          */
2973         if (obd->u.cli.cl_import == NULL)
2974                 osc_del_shrink_grant(&obd->u.cli);
2975         return rc;
2976 }
2977
2978 static int osc_import_event(struct obd_device *obd,
2979                             struct obd_import *imp,
2980                             enum obd_import_event event)
2981 {
2982         struct client_obd *cli;
2983         int rc = 0;
2984
2985         ENTRY;
2986         LASSERT(imp->imp_obd == obd);
2987
2988         switch (event) {
2989         case IMP_EVENT_DISCON: {
2990                 cli = &obd->u.cli;
2991                 client_obd_list_lock(&cli->cl_loi_list_lock);
2992                 cli->cl_avail_grant = 0;
2993                 cli->cl_lost_grant = 0;
2994                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2995                 break;
2996         }
2997         case IMP_EVENT_INACTIVE: {
2998                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2999                 break;
3000         }
3001         case IMP_EVENT_INVALIDATE: {
3002                 struct ldlm_namespace *ns = obd->obd_namespace;
3003                 struct lu_env         *env;
3004                 int                    refcheck;
3005
3006                 env = cl_env_get(&refcheck);
3007                 if (!IS_ERR(env)) {
3008                         /* Reset grants */
3009                         cli = &obd->u.cli;
3010                         /* all pages go to failing rpcs due to the invalid
3011                          * import */
3012                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3013
3014                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3015                         cl_env_put(env, &refcheck);
3016                 } else
3017                         rc = PTR_ERR(env);
3018                 break;
3019         }
3020         case IMP_EVENT_ACTIVE: {
3021                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3022                 break;
3023         }
3024         case IMP_EVENT_OCD: {
3025                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3026
3027                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3028                         osc_init_grant(&obd->u.cli, ocd);
3029
3030                 /* See bug 7198 */
3031                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3032                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3033
3034                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3035                 break;
3036         }
3037         case IMP_EVENT_DEACTIVATE: {
3038                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3039                 break;
3040         }
3041         case IMP_EVENT_ACTIVATE: {
3042                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3043                 break;
3044         }
3045         default:
3046                 CERROR("Unknown import event %d\n", event);
3047                 LBUG();
3048         }
3049         RETURN(rc);
3050 }
3051
3052 /**
3053  * Determine whether the lock can be canceled before replaying the lock
3054  * during recovery, see bug16774 for detailed information.
3055  *
3056  * \retval zero the lock can't be canceled
3057  * \retval other ok to cancel
3058  */
3059 static int osc_cancel_weight(struct ldlm_lock *lock)
3060 {
3061         /*
3062          * Cancel all unused and granted extent lock.
3063          */
3064         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3065             lock->l_granted_mode == lock->l_req_mode &&
3066             osc_ldlm_weigh_ast(lock) == 0)
3067                 RETURN(1);
3068
3069         RETURN(0);
3070 }
3071
3072 static int brw_queue_work(const struct lu_env *env, void *data)
3073 {
3074         struct client_obd *cli = data;
3075
3076         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3077
3078         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3079         RETURN(0);
3080 }
3081
3082 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3083 {
3084         struct client_obd *cli = &obd->u.cli;
3085         struct obd_type   *type;
3086         void              *handler;
3087         int                rc;
3088         ENTRY;
3089
3090         rc = ptlrpcd_addref();
3091         if (rc)
3092                 RETURN(rc);
3093
3094         rc = client_obd_setup(obd, lcfg);
3095         if (rc)
3096                 GOTO(out_ptlrpcd, rc);
3097
3098         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3099         if (IS_ERR(handler))
3100                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3101         cli->cl_writeback_work = handler;
3102
3103         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3104         if (IS_ERR(handler))
3105                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3106         cli->cl_lru_work = handler;
3107
3108         rc = osc_quota_setup(obd);
3109         if (rc)
3110                 GOTO(out_ptlrpcd_work, rc);
3111
3112         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3113
3114 #ifdef LPROCFS
3115         obd->obd_vars = lprocfs_osc_obd_vars;
3116 #endif
3117         /* If this is true then both client (osc) and server (osp) are on the
3118          * same node. The osp layer if loaded first will register the osc proc
3119          * directory. In that case this obd_device will be attached its proc
3120          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3121         type = class_search_type(LUSTRE_OSP_NAME);
3122         if (type && type->typ_procsym) {
3123                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3124                                                            type->typ_procsym,
3125                                                            obd->obd_vars, obd);
3126                 if (IS_ERR(obd->obd_proc_entry)) {
3127                         rc = PTR_ERR(obd->obd_proc_entry);
3128                         CERROR("error %d setting up lprocfs for %s\n", rc,
3129                                obd->obd_name);
3130                         obd->obd_proc_entry = NULL;
3131                 }
3132         } else {
3133                 rc = lprocfs_obd_setup(obd);
3134         }
3135
3136         /* If the basic OSC proc tree construction succeeded then
3137          * lets do the rest. */
3138         if (rc == 0) {
3139                 lproc_osc_attach_seqstat(obd);
3140                 sptlrpc_lprocfs_cliobd_attach(obd);
3141                 ptlrpc_lprocfs_register_obd(obd);
3142         }
3143
3144         /* We need to allocate a few requests more, because
3145          * brw_interpret tries to create new requests before freeing
3146          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3147          * reserved, but I'm afraid that might be too much wasted RAM
3148          * in fact, so 2 is just my guess and still should work. */
3149         cli->cl_import->imp_rq_pool =
3150                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3151                                     OST_MAXREQSIZE,
3152                                     ptlrpc_add_rqs_to_pool);
3153
3154         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3155         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3156         RETURN(0);
3157
3158 out_ptlrpcd_work:
3159         if (cli->cl_writeback_work != NULL) {
3160                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3161                 cli->cl_writeback_work = NULL;
3162         }
3163         if (cli->cl_lru_work != NULL) {
3164                 ptlrpcd_destroy_work(cli->cl_lru_work);
3165                 cli->cl_lru_work = NULL;
3166         }
3167 out_client_setup:
3168         client_obd_cleanup(obd);
3169 out_ptlrpcd:
3170         ptlrpcd_decref();
3171         RETURN(rc);
3172 }
3173
3174 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3175 {
3176         int rc = 0;
3177         ENTRY;
3178
3179         switch (stage) {
3180         case OBD_CLEANUP_EARLY: {
3181                 struct obd_import *imp;
3182                 imp = obd->u.cli.cl_import;
3183                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3184                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3185                 ptlrpc_deactivate_import(imp);
3186                 spin_lock(&imp->imp_lock);
3187                 imp->imp_pingable = 0;
3188                 spin_unlock(&imp->imp_lock);
3189                 break;
3190         }
3191         case OBD_CLEANUP_EXPORTS: {
3192                 struct client_obd *cli = &obd->u.cli;
3193                 /* LU-464
3194                  * for echo client, export may be on zombie list, wait for
3195                  * zombie thread to cull it, because cli.cl_import will be
3196                  * cleared in client_disconnect_export():
3197                  *   class_export_destroy() -> obd_cleanup() ->
3198                  *   echo_device_free() -> echo_client_cleanup() ->
3199                  *   obd_disconnect() -> osc_disconnect() ->
3200                  *   client_disconnect_export()
3201                  */
3202                 obd_zombie_barrier();
3203                 if (cli->cl_writeback_work) {
3204                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3205                         cli->cl_writeback_work = NULL;
3206                 }
3207                 if (cli->cl_lru_work) {
3208                         ptlrpcd_destroy_work(cli->cl_lru_work);
3209                         cli->cl_lru_work = NULL;
3210                 }
3211                 obd_cleanup_client_import(obd);
3212                 ptlrpc_lprocfs_unregister_obd(obd);
3213                 lprocfs_obd_cleanup(obd);
3214                 rc = obd_llog_finish(obd, 0);
3215                 if (rc != 0)
3216                         CERROR("failed to cleanup llogging subsystems\n");
3217                 break;
3218                 }
3219         }
3220         RETURN(rc);
3221 }
3222
3223 int osc_cleanup(struct obd_device *obd)
3224 {
3225         struct client_obd *cli = &obd->u.cli;
3226         int rc;
3227
3228         ENTRY;
3229
3230         /* lru cleanup */
3231         if (cli->cl_cache != NULL) {
3232                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3233                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3234                 list_del_init(&cli->cl_lru_osc);
3235                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3236                 cli->cl_lru_left = NULL;
3237                 atomic_dec(&cli->cl_cache->ccc_users);
3238                 cli->cl_cache = NULL;
3239         }
3240
3241         /* free memory of osc quota cache */
3242         osc_quota_cleanup(obd);
3243
3244         rc = client_obd_cleanup(obd);
3245
3246         ptlrpcd_decref();
3247         RETURN(rc);
3248 }
3249
3250 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3251 {
3252         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3253         return rc > 0 ? 0: rc;
3254 }
3255
3256 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3257 {
3258         return osc_process_config_base(obd, buf);
3259 }
3260
3261 struct obd_ops osc_obd_ops = {
3262         .o_owner                = THIS_MODULE,
3263         .o_setup                = osc_setup,
3264         .o_precleanup           = osc_precleanup,
3265         .o_cleanup              = osc_cleanup,
3266         .o_add_conn             = client_import_add_conn,
3267         .o_del_conn             = client_import_del_conn,
3268         .o_connect              = client_connect_import,
3269         .o_reconnect            = osc_reconnect,
3270         .o_disconnect           = osc_disconnect,
3271         .o_statfs               = osc_statfs,
3272         .o_statfs_async         = osc_statfs_async,
3273         .o_unpackmd             = osc_unpackmd,
3274         .o_create               = osc_create,
3275         .o_destroy              = osc_destroy,
3276         .o_getattr              = osc_getattr,
3277         .o_getattr_async        = osc_getattr_async,
3278         .o_setattr              = osc_setattr,
3279         .o_setattr_async        = osc_setattr_async,
3280         .o_change_cbdata        = osc_change_cbdata,
3281         .o_find_cbdata          = osc_find_cbdata,
3282         .o_iocontrol            = osc_iocontrol,
3283         .o_get_info             = osc_get_info,
3284         .o_set_info_async       = osc_set_info_async,
3285         .o_import_event         = osc_import_event,
3286         .o_process_config       = osc_process_config,
3287         .o_quotactl             = osc_quotactl,
3288         .o_quotacheck           = osc_quotacheck,
3289 };
3290
3291 extern struct lu_kmem_descr osc_caches[];
3292 extern spinlock_t osc_ast_guard;
3293 extern struct lock_class_key osc_ast_guard_class;
3294
3295 int __init osc_init(void)
3296 {
3297         bool enable_proc = true;
3298         struct obd_type *type;
3299         int rc;
3300         ENTRY;
3301
3302         /* print an address of _any_ initialized kernel symbol from this
3303          * module, to allow debugging with gdb that doesn't support data
3304          * symbols from modules.*/
3305         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3306
3307         rc = lu_kmem_init(osc_caches);
3308         if (rc)
3309                 RETURN(rc);
3310
3311         type = class_search_type(LUSTRE_OSP_NAME);
3312         if (type != NULL && type->typ_procsym != NULL)
3313                 enable_proc = false;
3314
3315         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3316                                  LUSTRE_OSC_NAME, &osc_device_type);
3317         if (rc) {
3318                 lu_kmem_fini(osc_caches);
3319                 RETURN(rc);
3320         }
3321
3322         spin_lock_init(&osc_ast_guard);
3323         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3324
3325         RETURN(rc);
3326 }
3327
3328 static void /*__exit*/ osc_exit(void)
3329 {
3330         class_unregister_type(LUSTRE_OSC_NAME);
3331         lu_kmem_fini(osc_caches);
3332 }
3333
3334 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3335 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3336 MODULE_LICENSE("GPL");
3337
3338 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);