Whamcloud - gitweb
LU-4961 lustre: move ioctls to lustre_ioctl.h
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_ioctl.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 struct osc_brw_async_args {
65         struct obdo              *aa_oa;
66         int                       aa_requested_nob;
67         int                       aa_nio_count;
68         obd_count                 aa_page_count;
69         int                       aa_resends;
70         struct brw_page **aa_ppga;
71         struct client_obd        *aa_cli;
72         struct list_head          aa_oaps;
73         struct list_head          aa_exts;
74         struct obd_capa  *aa_ocapa;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_async_args {
81         struct obd_info *aa_oi;
82 };
83
84 struct osc_setattr_args {
85         struct obdo             *sa_oa;
86         obd_enqueue_update_f     sa_upcall;
87         void                    *sa_cookie;
88 };
89
90 struct osc_fsync_args {
91         struct obd_info *fa_oi;
92         obd_enqueue_update_f     fa_upcall;
93         void                    *fa_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export               *oa_exp;
98         __u64                           *oa_flags;
99         obd_enqueue_update_f             oa_upcall;
100         void                            *oa_cookie;
101         struct ost_lvb                  *oa_lvb;
102         struct lustre_handle            *oa_lockh;
103         struct ldlm_enqueue_info        *oa_ei;
104         unsigned int                     oa_agl:1;
105 };
106
107 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
108 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
109                          void *data, int rc);
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         struct obd_import *imp = class_exp2cliimp(exp);
117         ENTRY;
118
119         if (lmm != NULL) {
120                 if (lmm_bytes < sizeof(*lmm)) {
121                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
122                                exp->exp_obd->obd_name, lmm_bytes,
123                                (int)sizeof(*lmm));
124                         RETURN(-EINVAL);
125                 }
126                 /* XXX LOV_MAGIC etc check? */
127
128                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
129                         CERROR("%s: zero lmm_object_id: rc = %d\n",
130                                exp->exp_obd->obd_name, -EINVAL);
131                         RETURN(-EINVAL);
132                 }
133         }
134
135         lsm_size = lov_stripe_md_size(1);
136         if (lsmp == NULL)
137                 RETURN(lsm_size);
138
139         if (*lsmp != NULL && lmm == NULL) {
140                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 OBD_FREE(*lsmp, lsm_size);
142                 *lsmp = NULL;
143                 RETURN(0);
144         }
145
146         if (*lsmp == NULL) {
147                 OBD_ALLOC(*lsmp, lsm_size);
148                 if (unlikely(*lsmp == NULL))
149                         RETURN(-ENOMEM);
150                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
151                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
152                         OBD_FREE(*lsmp, lsm_size);
153                         RETURN(-ENOMEM);
154                 }
155                 loi_init((*lsmp)->lsm_oinfo[0]);
156         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
157                 RETURN(-EBADF);
158         }
159
160         if (lmm != NULL)
161                 /* XXX zero *lsmp? */
162                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
163
164         if (imp != NULL &&
165             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
166                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167         else
168                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
169
170         RETURN(lsm_size);
171 }
172
173 static inline void osc_pack_capa(struct ptlrpc_request *req,
174                                  struct ost_body *body, void *capa)
175 {
176         struct obd_capa *oc = (struct obd_capa *)capa;
177         struct lustre_capa *c;
178
179         if (!capa)
180                 return;
181
182         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183         LASSERT(c);
184         capa_cpy(c, oc);
185         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
186         DEBUG_CAPA(D_SEC, c, "pack");
187 }
188
189 static inline void osc_pack_req_body(struct ptlrpc_request *req,
190                                      struct obd_info *oinfo)
191 {
192         struct ost_body *body;
193
194         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195         LASSERT(body);
196
197         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
198                              oinfo->oi_oa);
199         osc_pack_capa(req, body, oinfo->oi_capa);
200 }
201
202 static inline void osc_set_capa_size(struct ptlrpc_request *req,
203                                      const struct req_msg_field *field,
204                                      struct obd_capa *oc)
205 {
206         if (oc == NULL)
207                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
208         else
209                 /* it is already calculated as sizeof struct obd_capa */
210                 ;
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218         ENTRY;
219
220         if (rc != 0)
221                 GOTO(out, rc);
222
223         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224         if (body) {
225                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
227                                      aa->aa_oi->oi_oa, &body->oa);
228
229                 /* This should really be sent by the OST */
230                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
231                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232         } else {
233                 CDEBUG(D_INFO, "can't unpack ost_body\n");
234                 rc = -EPROTO;
235                 aa->aa_oi->oi_oa->o_valid = 0;
236         }
237 out:
238         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239         RETURN(rc);
240 }
241
242 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
243                              struct ptlrpc_request_set *set)
244 {
245         struct ptlrpc_request *req;
246         struct osc_async_args *aa;
247         int                    rc;
248         ENTRY;
249
250         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251         if (req == NULL)
252                 RETURN(-ENOMEM);
253
254         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
256         if (rc) {
257                 ptlrpc_request_free(req);
258                 RETURN(rc);
259         }
260
261         osc_pack_req_body(req, oinfo);
262
263         ptlrpc_request_set_replen(req);
264         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
265
266         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
267         aa = ptlrpc_req_async_args(req);
268         aa->aa_oi = oinfo;
269
270         ptlrpc_set_add_req(set, req);
271         RETURN(0);
272 }
273
274 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
275                        struct obd_info *oinfo)
276 {
277         struct ptlrpc_request *req;
278         struct ost_body       *body;
279         int                    rc;
280         ENTRY;
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
283         if (req == NULL)
284                 RETURN(-ENOMEM);
285
286         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 RETURN(rc);
291         }
292
293         osc_pack_req_body(req, oinfo);
294
295         ptlrpc_request_set_replen(req);
296
297         rc = ptlrpc_queue_wait(req);
298         if (rc)
299                 GOTO(out, rc);
300
301         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
302         if (body == NULL)
303                 GOTO(out, rc = -EPROTO);
304
305         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
306         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
307                              &body->oa);
308
309         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
310         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
311
312         EXIT;
313  out:
314         ptlrpc_req_finished(req);
315         return rc;
316 }
317
318 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
319                        struct obd_info *oinfo, struct obd_trans_info *oti)
320 {
321         struct ptlrpc_request *req;
322         struct ost_body       *body;
323         int                    rc;
324         ENTRY;
325
326         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
327
328         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
329         if (req == NULL)
330                 RETURN(-ENOMEM);
331
332         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
333         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
334         if (rc) {
335                 ptlrpc_request_free(req);
336                 RETURN(rc);
337         }
338
339         osc_pack_req_body(req, oinfo);
340
341         ptlrpc_request_set_replen(req);
342
343         rc = ptlrpc_queue_wait(req);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
348         if (body == NULL)
349                 GOTO(out, rc = -EPROTO);
350
351         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
352                              &body->oa);
353
354         EXIT;
355 out:
356         ptlrpc_req_finished(req);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_interpret(const struct lu_env *env,
361                                  struct ptlrpc_request *req,
362                                  struct osc_setattr_args *sa, int rc)
363 {
364         struct ost_body *body;
365         ENTRY;
366
367         if (rc != 0)
368                 GOTO(out, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out, rc = -EPROTO);
373
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
375                              &body->oa);
376 out:
377         rc = sa->sa_upcall(sa->sa_cookie, rc);
378         RETURN(rc);
379 }
380
381 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
382                            struct obd_trans_info *oti,
383                            obd_enqueue_update_f upcall, void *cookie,
384                            struct ptlrpc_request_set *rqset)
385 {
386         struct ptlrpc_request   *req;
387         struct osc_setattr_args *sa;
388         int                      rc;
389         ENTRY;
390
391         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
396         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(rc);
400         }
401
402         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
403                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
404
405         osc_pack_req_body(req, oinfo);
406
407         ptlrpc_request_set_replen(req);
408
409         /* do mds to ost setattr asynchronously */
410         if (!rqset) {
411                 /* Do not wait for response. */
412                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413         } else {
414                 req->rq_interpret_reply =
415                         (ptlrpc_interpterer_t)osc_setattr_interpret;
416
417                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
418                 sa = ptlrpc_req_async_args(req);
419                 sa->sa_oa = oinfo->oi_oa;
420                 sa->sa_upcall = upcall;
421                 sa->sa_cookie = cookie;
422
423                 if (rqset == PTLRPCD_SET)
424                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
425                 else
426                         ptlrpc_set_add_req(rqset, req);
427         }
428
429         RETURN(0);
430 }
431
432 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
433                              struct obd_trans_info *oti,
434                              struct ptlrpc_request_set *rqset)
435 {
436         return osc_setattr_async_base(exp, oinfo, oti,
437                                       oinfo->oi_cb_up, oinfo, rqset);
438 }
439
440 int osc_real_create(struct obd_export *exp, struct obdo *oa,
441                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
442 {
443         struct ptlrpc_request *req;
444         struct ost_body       *body;
445         struct lov_stripe_md  *lsm;
446         int                    rc;
447         ENTRY;
448
449         LASSERT(oa);
450         LASSERT(ea);
451
452         lsm = *ea;
453         if (!lsm) {
454                 rc = obd_alloc_memmd(exp, &lsm);
455                 if (rc < 0)
456                         RETURN(rc);
457         }
458
459         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
460         if (req == NULL)
461                 GOTO(out, rc = -ENOMEM);
462
463         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
464         if (rc) {
465                 ptlrpc_request_free(req);
466                 GOTO(out, rc);
467         }
468
469         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
470         LASSERT(body);
471
472         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
473
474         ptlrpc_request_set_replen(req);
475
476         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
477             oa->o_flags == OBD_FL_DELORPHAN) {
478                 DEBUG_REQ(D_HA, req,
479                           "delorphan from OST integration");
480                 /* Don't resend the delorphan req */
481                 req->rq_no_resend = req->rq_no_delay = 1;
482         }
483
484         rc = ptlrpc_queue_wait(req);
485         if (rc)
486                 GOTO(out_req, rc);
487
488         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
489         if (body == NULL)
490                 GOTO(out_req, rc = -EPROTO);
491
492         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
493         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
494
495         oa->o_blksize = cli_brw_size(exp->exp_obd);
496         oa->o_valid |= OBD_MD_FLBLKSZ;
497
498         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
499          * have valid lsm_oinfo data structs, so don't go touching that.
500          * This needs to be fixed in a big way.
501          */
502         lsm->lsm_oi = oa->o_oi;
503         *ea = lsm;
504
505         if (oti != NULL) {
506                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
507
508                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
509                         if (!oti->oti_logcookies)
510                                 oti_alloc_cookies(oti, 1);
511                         *oti->oti_logcookies = oa->o_lcookie;
512                 }
513         }
514
515         CDEBUG(D_HA, "transno: "LPD64"\n",
516                lustre_msg_get_transno(req->rq_repmsg));
517 out_req:
518         ptlrpc_req_finished(req);
519 out:
520         if (rc && !*ea)
521                 obd_free_memmd(exp, &lsm);
522         RETURN(rc);
523 }
524
525 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
526                    obd_enqueue_update_f upcall, void *cookie,
527                    struct ptlrpc_request_set *rqset)
528 {
529         struct ptlrpc_request   *req;
530         struct osc_setattr_args *sa;
531         struct ost_body         *body;
532         int                      rc;
533         ENTRY;
534
535         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
536         if (req == NULL)
537                 RETURN(-ENOMEM);
538
539         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
540         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
541         if (rc) {
542                 ptlrpc_request_free(req);
543                 RETURN(rc);
544         }
545         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
546         ptlrpc_at_set_req_timeout(req);
547
548         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
549         LASSERT(body);
550         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
551                              oinfo->oi_oa);
552         osc_pack_capa(req, body, oinfo->oi_capa);
553
554         ptlrpc_request_set_replen(req);
555
556         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
557         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
558         sa = ptlrpc_req_async_args(req);
559         sa->sa_oa     = oinfo->oi_oa;
560         sa->sa_upcall = upcall;
561         sa->sa_cookie = cookie;
562         if (rqset == PTLRPCD_SET)
563                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
564         else
565                 ptlrpc_set_add_req(rqset, req);
566
567         RETURN(0);
568 }
569
570 static int osc_sync_interpret(const struct lu_env *env,
571                               struct ptlrpc_request *req,
572                               void *arg, int rc)
573 {
574         struct osc_fsync_args *fa = arg;
575         struct ost_body *body;
576         ENTRY;
577
578         if (rc)
579                 GOTO(out, rc);
580
581         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
582         if (body == NULL) {
583                 CERROR ("can't unpack ost_body\n");
584                 GOTO(out, rc = -EPROTO);
585         }
586
587         *fa->fa_oi->oi_oa = body->oa;
588 out:
589         rc = fa->fa_upcall(fa->fa_cookie, rc);
590         RETURN(rc);
591 }
592
593 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
594                   obd_enqueue_update_f upcall, void *cookie,
595                   struct ptlrpc_request_set *rqset)
596 {
597         struct ptlrpc_request *req;
598         struct ost_body       *body;
599         struct osc_fsync_args *fa;
600         int                    rc;
601         ENTRY;
602
603         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
604         if (req == NULL)
605                 RETURN(-ENOMEM);
606
607         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
608         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
609         if (rc) {
610                 ptlrpc_request_free(req);
611                 RETURN(rc);
612         }
613
614         /* overload the size and blocks fields in the oa with start/end */
615         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
616         LASSERT(body);
617         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
618                              oinfo->oi_oa);
619         osc_pack_capa(req, body, oinfo->oi_capa);
620
621         ptlrpc_request_set_replen(req);
622         req->rq_interpret_reply = osc_sync_interpret;
623
624         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
625         fa = ptlrpc_req_async_args(req);
626         fa->fa_oi = oinfo;
627         fa->fa_upcall = upcall;
628         fa->fa_cookie = cookie;
629
630         if (rqset == PTLRPCD_SET)
631                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
632         else
633                 ptlrpc_set_add_req(rqset, req);
634
635         RETURN (0);
636 }
637
638 /* Find and cancel locally locks matched by @mode in the resource found by
639  * @objid. Found locks are added into @cancel list. Returns the amount of
640  * locks added to @cancels list. */
641 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
642                                    cfs_list_t *cancels,
643                                    ldlm_mode_t mode, __u64 lock_flags)
644 {
645         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
646         struct ldlm_res_id res_id;
647         struct ldlm_resource *res;
648         int count;
649         ENTRY;
650
651         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
652          * export) but disabled through procfs (flag in NS).
653          *
654          * This distinguishes from a case when ELC is not supported originally,
655          * when we still want to cancel locks in advance and just cancel them
656          * locally, without sending any RPC. */
657         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
658                 RETURN(0);
659
660         ostid_build_res_name(&oa->o_oi, &res_id);
661         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
662         if (res == NULL)
663                 RETURN(0);
664
665         LDLM_RESOURCE_ADDREF(res);
666         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
667                                            lock_flags, 0, NULL);
668         LDLM_RESOURCE_DELREF(res);
669         ldlm_resource_putref(res);
670         RETURN(count);
671 }
672
673 static int osc_destroy_interpret(const struct lu_env *env,
674                                  struct ptlrpc_request *req, void *data,
675                                  int rc)
676 {
677         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
678
679         atomic_dec(&cli->cl_destroy_in_flight);
680         wake_up(&cli->cl_destroy_waitq);
681         return 0;
682 }
683
684 static int osc_can_send_destroy(struct client_obd *cli)
685 {
686         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
687             cli->cl_max_rpcs_in_flight) {
688                 /* The destroy request can be sent */
689                 return 1;
690         }
691         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
692             cli->cl_max_rpcs_in_flight) {
693                 /*
694                  * The counter has been modified between the two atomic
695                  * operations.
696                  */
697                 wake_up(&cli->cl_destroy_waitq);
698         }
699         return 0;
700 }
701
702 int osc_create(const struct lu_env *env, struct obd_export *exp,
703                struct obdo *oa, struct lov_stripe_md **ea,
704                struct obd_trans_info *oti)
705 {
706         int rc = 0;
707         ENTRY;
708
709         LASSERT(oa);
710         LASSERT(ea);
711         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
712
713         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
714             oa->o_flags == OBD_FL_RECREATE_OBJS) {
715                 RETURN(osc_real_create(exp, oa, ea, oti));
716         }
717
718         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
719                 RETURN(osc_real_create(exp, oa, ea, oti));
720
721         /* we should not get here anymore */
722         LBUG();
723
724         RETURN(rc);
725 }
726
727 /* Destroy requests can be async always on the client, and we don't even really
728  * care about the return code since the client cannot do anything at all about
729  * a destroy failure.
730  * When the MDS is unlinking a filename, it saves the file objects into a
731  * recovery llog, and these object records are cancelled when the OST reports
732  * they were destroyed and sync'd to disk (i.e. transaction committed).
733  * If the client dies, or the OST is down when the object should be destroyed,
734  * the records are not cancelled, and when the OST reconnects to the MDS next,
735  * it will retrieve the llog unlink logs and then sends the log cancellation
736  * cookies to the MDS after committing destroy transactions. */
737 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
738                        struct obdo *oa, struct lov_stripe_md *ea,
739                        struct obd_trans_info *oti, struct obd_export *md_export,
740                        void *capa)
741 {
742         struct client_obd     *cli = &exp->exp_obd->u.cli;
743         struct ptlrpc_request *req;
744         struct ost_body       *body;
745         CFS_LIST_HEAD(cancels);
746         int rc, count;
747         ENTRY;
748
749         if (!oa) {
750                 CDEBUG(D_INFO, "oa NULL\n");
751                 RETURN(-EINVAL);
752         }
753
754         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
755                                         LDLM_FL_DISCARD_DATA);
756
757         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
758         if (req == NULL) {
759                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
760                 RETURN(-ENOMEM);
761         }
762
763         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
764         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
765                                0, &cancels, count);
766         if (rc) {
767                 ptlrpc_request_free(req);
768                 RETURN(rc);
769         }
770
771         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
772         ptlrpc_at_set_req_timeout(req);
773
774         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
775                 oa->o_lcookie = *oti->oti_logcookies;
776         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
777         LASSERT(body);
778         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
779
780         osc_pack_capa(req, body, (struct obd_capa *)capa);
781         ptlrpc_request_set_replen(req);
782
783         /* If osc_destory is for destroying the unlink orphan,
784          * sent from MDT to OST, which should not be blocked here,
785          * because the process might be triggered by ptlrpcd, and
786          * it is not good to block ptlrpcd thread (b=16006)*/
787         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
788                 req->rq_interpret_reply = osc_destroy_interpret;
789                 if (!osc_can_send_destroy(cli)) {
790                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
791                                                           NULL);
792
793                         /*
794                          * Wait until the number of on-going destroy RPCs drops
795                          * under max_rpc_in_flight
796                          */
797                         l_wait_event_exclusive(cli->cl_destroy_waitq,
798                                                osc_can_send_destroy(cli), &lwi);
799                 }
800         }
801
802         /* Do not wait for response */
803         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
804         RETURN(0);
805 }
806
807 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
808                                 long writing_bytes)
809 {
810         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
811
812         LASSERT(!(oa->o_valid & bits));
813
814         oa->o_valid |= bits;
815         client_obd_list_lock(&cli->cl_loi_list_lock);
816         oa->o_dirty = cli->cl_dirty;
817         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
818                      cli->cl_dirty_max)) {
819                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
820                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
821                 oa->o_undirty = 0;
822         } else if (unlikely(atomic_read(&obd_unstable_pages) +
823                             atomic_read(&obd_dirty_pages) -
824                             atomic_read(&obd_dirty_transit_pages) >
825                             (long)(obd_max_dirty_pages + 1))) {
826                 /* The atomic_read() allowing the atomic_inc() are
827                  * not covered by a lock thus they may safely race and trip
828                  * this CERROR() unless we add in a small fudge factor (+1). */
829                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
830                        cli->cl_import->imp_obd->obd_name,
831                        atomic_read(&obd_unstable_pages),
832                        atomic_read(&obd_dirty_pages),
833                        atomic_read(&obd_dirty_transit_pages),
834                        obd_max_dirty_pages);
835                 oa->o_undirty = 0;
836         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
837                 CERROR("dirty %lu - dirty_max %lu too big???\n",
838                        cli->cl_dirty, cli->cl_dirty_max);
839                 oa->o_undirty = 0;
840         } else {
841                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
842                                       PAGE_CACHE_SHIFT) *
843                                      (cli->cl_max_rpcs_in_flight + 1);
844                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
845         }
846         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
847         oa->o_dropped = cli->cl_lost_grant;
848         cli->cl_lost_grant = 0;
849         client_obd_list_unlock(&cli->cl_loi_list_lock);
850         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
851                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
852
853 }
854
855 void osc_update_next_shrink(struct client_obd *cli)
856 {
857         cli->cl_next_shrink_grant =
858                 cfs_time_shift(cli->cl_grant_shrink_interval);
859         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
860                cli->cl_next_shrink_grant);
861 }
862
863 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
864 {
865         client_obd_list_lock(&cli->cl_loi_list_lock);
866         cli->cl_avail_grant += grant;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868 }
869
870 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
871 {
872         if (body->oa.o_valid & OBD_MD_FLGRANT) {
873                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
874                 __osc_update_grant(cli, body->oa.o_grant);
875         }
876 }
877
878 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
879                               obd_count keylen, void *key, obd_count vallen,
880                               void *val, struct ptlrpc_request_set *set);
881
882 static int osc_shrink_grant_interpret(const struct lu_env *env,
883                                       struct ptlrpc_request *req,
884                                       void *aa, int rc)
885 {
886         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
887         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
888         struct ost_body *body;
889
890         if (rc != 0) {
891                 __osc_update_grant(cli, oa->o_grant);
892                 GOTO(out, rc);
893         }
894
895         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
896         LASSERT(body);
897         osc_update_grant(cli, body);
898 out:
899         OBDO_FREE(oa);
900         return rc;
901 }
902
903 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
904 {
905         client_obd_list_lock(&cli->cl_loi_list_lock);
906         oa->o_grant = cli->cl_avail_grant / 4;
907         cli->cl_avail_grant -= oa->o_grant;
908         client_obd_list_unlock(&cli->cl_loi_list_lock);
909         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
910                 oa->o_valid |= OBD_MD_FLFLAGS;
911                 oa->o_flags = 0;
912         }
913         oa->o_flags |= OBD_FL_SHRINK_GRANT;
914         osc_update_next_shrink(cli);
915 }
916
917 /* Shrink the current grant, either from some large amount to enough for a
918  * full set of in-flight RPCs, or if we have already shrunk to that limit
919  * then to enough for a single RPC.  This avoids keeping more grant than
920  * needed, and avoids shrinking the grant piecemeal. */
921 static int osc_shrink_grant(struct client_obd *cli)
922 {
923         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
924                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
925
926         client_obd_list_lock(&cli->cl_loi_list_lock);
927         if (cli->cl_avail_grant <= target_bytes)
928                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
929         client_obd_list_unlock(&cli->cl_loi_list_lock);
930
931         return osc_shrink_grant_to_target(cli, target_bytes);
932 }
933
934 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
935 {
936         int                     rc = 0;
937         struct ost_body        *body;
938         ENTRY;
939
940         client_obd_list_lock(&cli->cl_loi_list_lock);
941         /* Don't shrink if we are already above or below the desired limit
942          * We don't want to shrink below a single RPC, as that will negatively
943          * impact block allocation and long-term performance. */
944         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
945                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
946
947         if (target_bytes >= cli->cl_avail_grant) {
948                 client_obd_list_unlock(&cli->cl_loi_list_lock);
949                 RETURN(0);
950         }
951         client_obd_list_unlock(&cli->cl_loi_list_lock);
952
953         OBD_ALLOC_PTR(body);
954         if (!body)
955                 RETURN(-ENOMEM);
956
957         osc_announce_cached(cli, &body->oa, 0);
958
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
961         cli->cl_avail_grant = target_bytes;
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
964                 body->oa.o_valid |= OBD_MD_FLFLAGS;
965                 body->oa.o_flags = 0;
966         }
967         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
968         osc_update_next_shrink(cli);
969
970         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
971                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
972                                 sizeof(*body), body, NULL);
973         if (rc != 0)
974                 __osc_update_grant(cli, body->oa.o_grant);
975         OBD_FREE_PTR(body);
976         RETURN(rc);
977 }
978
979 static int osc_should_shrink_grant(struct client_obd *client)
980 {
981         cfs_time_t time = cfs_time_current();
982         cfs_time_t next_shrink = client->cl_next_shrink_grant;
983
984         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
985              OBD_CONNECT_GRANT_SHRINK) == 0)
986                 return 0;
987
988         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
989                 /* Get the current RPC size directly, instead of going via:
990                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
991                  * Keep comment here so that it can be found by searching. */
992                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
993
994                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
995                     client->cl_avail_grant > brw_size)
996                         return 1;
997                 else
998                         osc_update_next_shrink(client);
999         }
1000         return 0;
1001 }
1002
1003 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1004 {
1005         struct client_obd *client;
1006
1007         cfs_list_for_each_entry(client, &item->ti_obd_list,
1008                                 cl_grant_shrink_list) {
1009                 if (osc_should_shrink_grant(client))
1010                         osc_shrink_grant(client);
1011         }
1012         return 0;
1013 }
1014
1015 static int osc_add_shrink_grant(struct client_obd *client)
1016 {
1017         int rc;
1018
1019         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1020                                        TIMEOUT_GRANT,
1021                                        osc_grant_shrink_grant_cb, NULL,
1022                                        &client->cl_grant_shrink_list);
1023         if (rc) {
1024                 CERROR("add grant client %s error %d\n",
1025                         client->cl_import->imp_obd->obd_name, rc);
1026                 return rc;
1027         }
1028         CDEBUG(D_CACHE, "add grant client %s \n",
1029                client->cl_import->imp_obd->obd_name);
1030         osc_update_next_shrink(client);
1031         return 0;
1032 }
1033
1034 static int osc_del_shrink_grant(struct client_obd *client)
1035 {
1036         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1037                                          TIMEOUT_GRANT);
1038 }
1039
1040 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1041 {
1042         /*
1043          * ocd_grant is the total grant amount we're expect to hold: if we've
1044          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1045          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1046          *
1047          * race is tolerable here: if we're evicted, but imp_state already
1048          * left EVICTED state, then cl_dirty must be 0 already.
1049          */
1050         client_obd_list_lock(&cli->cl_loi_list_lock);
1051         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1052                 cli->cl_avail_grant = ocd->ocd_grant;
1053         else
1054                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1055
1056         if (cli->cl_avail_grant < 0) {
1057                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1058                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1059                       ocd->ocd_grant, cli->cl_dirty);
1060                 /* workaround for servers which do not have the patch from
1061                  * LU-2679 */
1062                 cli->cl_avail_grant = ocd->ocd_grant;
1063         }
1064
1065         /* determine the appropriate chunk size used by osc_extent. */
1066         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1067         client_obd_list_unlock(&cli->cl_loi_list_lock);
1068
1069         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1070                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1071                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1072
1073         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1074             cfs_list_empty(&cli->cl_grant_shrink_list))
1075                 osc_add_shrink_grant(cli);
1076 }
1077
1078 /* We assume that the reason this OSC got a short read is because it read
1079  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1080  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1081  * this stripe never got written at or beyond this stripe offset yet. */
1082 static void handle_short_read(int nob_read, obd_count page_count,
1083                               struct brw_page **pga)
1084 {
1085         char *ptr;
1086         int i = 0;
1087
1088         /* skip bytes read OK */
1089         while (nob_read > 0) {
1090                 LASSERT (page_count > 0);
1091
1092                 if (pga[i]->count > nob_read) {
1093                         /* EOF inside this page */
1094                         ptr = kmap(pga[i]->pg) +
1095                                 (pga[i]->off & ~CFS_PAGE_MASK);
1096                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1097                         kunmap(pga[i]->pg);
1098                         page_count--;
1099                         i++;
1100                         break;
1101                 }
1102
1103                 nob_read -= pga[i]->count;
1104                 page_count--;
1105                 i++;
1106         }
1107
1108         /* zero remaining pages */
1109         while (page_count-- > 0) {
1110                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1111                 memset(ptr, 0, pga[i]->count);
1112                 kunmap(pga[i]->pg);
1113                 i++;
1114         }
1115 }
1116
1117 static int check_write_rcs(struct ptlrpc_request *req,
1118                            int requested_nob, int niocount,
1119                            obd_count page_count, struct brw_page **pga)
1120 {
1121         int     i;
1122         __u32   *remote_rcs;
1123
1124         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1125                                                   sizeof(*remote_rcs) *
1126                                                   niocount);
1127         if (remote_rcs == NULL) {
1128                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1129                 return(-EPROTO);
1130         }
1131
1132         /* return error if any niobuf was in error */
1133         for (i = 0; i < niocount; i++) {
1134                 if ((int)remote_rcs[i] < 0)
1135                         return(remote_rcs[i]);
1136
1137                 if (remote_rcs[i] != 0) {
1138                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1139                                 i, remote_rcs[i], req);
1140                         return(-EPROTO);
1141                 }
1142         }
1143
1144         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1145                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1146                        req->rq_bulk->bd_nob_transferred, requested_nob);
1147                 return(-EPROTO);
1148         }
1149
1150         return (0);
1151 }
1152
1153 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1154 {
1155         if (p1->flag != p2->flag) {
1156                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1157                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1158                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1159
1160                 /* warn if we try to combine flags that we don't know to be
1161                  * safe to combine */
1162                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1163                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1164                               "report this at http://bugs.whamcloud.com/\n",
1165                               p1->flag, p2->flag);
1166                 }
1167                 return 0;
1168         }
1169
1170         return (p1->off + p1->count == p2->off);
1171 }
1172
1173 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1174                                    struct brw_page **pga, int opc,
1175                                    cksum_type_t cksum_type)
1176 {
1177         __u32                           cksum;
1178         int                             i = 0;
1179         struct cfs_crypto_hash_desc     *hdesc;
1180         unsigned int                    bufsize;
1181         int                             err;
1182         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1183
1184         LASSERT(pg_count > 0);
1185
1186         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1187         if (IS_ERR(hdesc)) {
1188                 CERROR("Unable to initialize checksum hash %s\n",
1189                        cfs_crypto_hash_name(cfs_alg));
1190                 return PTR_ERR(hdesc);
1191         }
1192
1193         while (nob > 0 && pg_count > 0) {
1194                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1195
1196                 /* corrupt the data before we compute the checksum, to
1197                  * simulate an OST->client data error */
1198                 if (i == 0 && opc == OST_READ &&
1199                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1200                         unsigned char *ptr = kmap(pga[i]->pg);
1201                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1202                         memcpy(ptr + off, "bad1", min(4, nob));
1203                         kunmap(pga[i]->pg);
1204                 }
1205                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1206                                   pga[i]->off & ~CFS_PAGE_MASK,
1207                                   count);
1208                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1209                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1210
1211                 nob -= pga[i]->count;
1212                 pg_count--;
1213                 i++;
1214         }
1215
1216         bufsize = 4;
1217         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1218
1219         if (err)
1220                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1221
1222         /* For sending we only compute the wrong checksum instead
1223          * of corrupting the data so it is still correct on a redo */
1224         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1225                 cksum++;
1226
1227         return cksum;
1228 }
1229
1230 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1231                                 struct lov_stripe_md *lsm, obd_count page_count,
1232                                 struct brw_page **pga,
1233                                 struct ptlrpc_request **reqp,
1234                                 struct obd_capa *ocapa, int reserve,
1235                                 int resend)
1236 {
1237         struct ptlrpc_request   *req;
1238         struct ptlrpc_bulk_desc *desc;
1239         struct ost_body         *body;
1240         struct obd_ioobj        *ioobj;
1241         struct niobuf_remote    *niobuf;
1242         int niocount, i, requested_nob, opc, rc;
1243         struct osc_brw_async_args *aa;
1244         struct req_capsule      *pill;
1245         struct brw_page *pg_prev;
1246
1247         ENTRY;
1248         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249                 RETURN(-ENOMEM); /* Recoverable */
1250         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251                 RETURN(-EINVAL); /* Fatal */
1252
1253         if ((cmd & OBD_BRW_WRITE) != 0) {
1254                 opc = OST_WRITE;
1255                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256                                                 cli->cl_import->imp_rq_pool,
1257                                                 &RQF_OST_BRW_WRITE);
1258         } else {
1259                 opc = OST_READ;
1260                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1261         }
1262         if (req == NULL)
1263                 RETURN(-ENOMEM);
1264
1265         for (niocount = i = 1; i < page_count; i++) {
1266                 if (!can_merge_pages(pga[i - 1], pga[i]))
1267                         niocount++;
1268         }
1269
1270         pill = &req->rq_pill;
1271         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1272                              sizeof(*ioobj));
1273         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274                              niocount * sizeof(*niobuf));
1275         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1276
1277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1278         if (rc) {
1279                 ptlrpc_request_free(req);
1280                 RETURN(rc);
1281         }
1282         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283         ptlrpc_at_set_req_timeout(req);
1284         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1285          * retry logic */
1286         req->rq_no_retry_einprogress = 1;
1287
1288         desc = ptlrpc_prep_bulk_imp(req, page_count,
1289                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1290                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1291                 OST_BULK_PORTAL);
1292
1293         if (desc == NULL)
1294                 GOTO(out, rc = -ENOMEM);
1295         /* NB request now owns desc and will free it when it gets freed */
1296
1297         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1298         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1299         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1300         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1301
1302         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1303
1304         obdo_to_ioobj(oa, ioobj);
1305         ioobj->ioo_bufcnt = niocount;
1306         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1307          * that might be send for this request.  The actual number is decided
1308          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1309          * "max - 1" for old client compatibility sending "0", and also so the
1310          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1311         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1312         osc_pack_capa(req, body, ocapa);
1313         LASSERT(page_count > 0);
1314         pg_prev = pga[0];
1315         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1316                 struct brw_page *pg = pga[i];
1317                 int poff = pg->off & ~CFS_PAGE_MASK;
1318
1319                 LASSERT(pg->count > 0);
1320                 /* make sure there is no gap in the middle of page array */
1321                 LASSERTF(page_count == 1 ||
1322                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1323                           ergo(i > 0 && i < page_count - 1,
1324                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1325                           ergo(i == page_count - 1, poff == 0)),
1326                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1327                          i, page_count, pg, pg->off, pg->count);
1328 #ifdef __linux__
1329                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1330                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1331                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1332                          i, page_count,
1333                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1334                          pg_prev->pg, page_private(pg_prev->pg),
1335                          pg_prev->pg->index, pg_prev->off);
1336 #else
1337                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1338                          "i %d p_c %u\n", i, page_count);
1339 #endif
1340                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1341                         (pg->flag & OBD_BRW_SRVLOCK));
1342
1343                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1344                 requested_nob += pg->count;
1345
1346                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1347                         niobuf--;
1348                         niobuf->len += pg->count;
1349                 } else {
1350                         niobuf->offset = pg->off;
1351                         niobuf->len    = pg->count;
1352                         niobuf->flags  = pg->flag;
1353                 }
1354                 pg_prev = pg;
1355         }
1356
1357         LASSERTF((void *)(niobuf - niocount) ==
1358                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1359                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1360                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1361
1362         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1363         if (resend) {
1364                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1365                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1366                         body->oa.o_flags = 0;
1367                 }
1368                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1369         }
1370
1371         if (osc_should_shrink_grant(cli))
1372                 osc_shrink_grant_local(cli, &body->oa);
1373
1374         /* size[REQ_REC_OFF] still sizeof (*body) */
1375         if (opc == OST_WRITE) {
1376                 if (cli->cl_checksum &&
1377                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1378                         /* store cl_cksum_type in a local variable since
1379                          * it can be changed via lprocfs */
1380                         cksum_type_t cksum_type = cli->cl_cksum_type;
1381
1382                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1383                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1384                                 body->oa.o_flags = 0;
1385                         }
1386                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1387                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1388                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1389                                                              page_count, pga,
1390                                                              OST_WRITE,
1391                                                              cksum_type);
1392                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1393                                body->oa.o_cksum);
1394                         /* save this in 'oa', too, for later checking */
1395                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1396                         oa->o_flags |= cksum_type_pack(cksum_type);
1397                 } else {
1398                         /* clear out the checksum flag, in case this is a
1399                          * resend but cl_checksum is no longer set. b=11238 */
1400                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1401                 }
1402                 oa->o_cksum = body->oa.o_cksum;
1403                 /* 1 RC per niobuf */
1404                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1405                                      sizeof(__u32) * niocount);
1406         } else {
1407                 if (cli->cl_checksum &&
1408                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1409                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1410                                 body->oa.o_flags = 0;
1411                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1412                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1413                 }
1414         }
1415         ptlrpc_request_set_replen(req);
1416
1417         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1418         aa = ptlrpc_req_async_args(req);
1419         aa->aa_oa = oa;
1420         aa->aa_requested_nob = requested_nob;
1421         aa->aa_nio_count = niocount;
1422         aa->aa_page_count = page_count;
1423         aa->aa_resends = 0;
1424         aa->aa_ppga = pga;
1425         aa->aa_cli = cli;
1426         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1427         if (ocapa && reserve)
1428                 aa->aa_ocapa = capa_get(ocapa);
1429
1430         *reqp = req;
1431         RETURN(0);
1432
1433  out:
1434         ptlrpc_req_finished(req);
1435         RETURN(rc);
1436 }
1437
1438 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1439                                 __u32 client_cksum, __u32 server_cksum, int nob,
1440                                 obd_count page_count, struct brw_page **pga,
1441                                 cksum_type_t client_cksum_type)
1442 {
1443         __u32 new_cksum;
1444         char *msg;
1445         cksum_type_t cksum_type;
1446
1447         if (server_cksum == client_cksum) {
1448                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1449                 return 0;
1450         }
1451
1452         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1453                                        oa->o_flags : 0);
1454         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1455                                       cksum_type);
1456
1457         if (cksum_type != client_cksum_type)
1458                 msg = "the server did not use the checksum type specified in "
1459                       "the original request - likely a protocol problem";
1460         else if (new_cksum == server_cksum)
1461                 msg = "changed on the client after we checksummed it - "
1462                       "likely false positive due to mmap IO (bug 11742)";
1463         else if (new_cksum == client_cksum)
1464                 msg = "changed in transit before arrival at OST";
1465         else
1466                 msg = "changed in transit AND doesn't match the original - "
1467                       "likely false positive due to mmap IO (bug 11742)";
1468
1469         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1470                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1471                            msg, libcfs_nid2str(peer->nid),
1472                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1473                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1474                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1475                            POSTID(&oa->o_oi), pga[0]->off,
1476                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1477         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1478                "client csum now %x\n", client_cksum, client_cksum_type,
1479                server_cksum, cksum_type, new_cksum);
1480         return 1;
1481 }
1482
1483 /* Note rc enters this function as number of bytes transferred */
1484 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1485 {
1486         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1487         const lnet_process_id_t *peer =
1488                         &req->rq_import->imp_connection->c_peer;
1489         struct client_obd *cli = aa->aa_cli;
1490         struct ost_body *body;
1491         __u32 client_cksum = 0;
1492         ENTRY;
1493
1494         if (rc < 0 && rc != -EDQUOT) {
1495                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1496                 RETURN(rc);
1497         }
1498
1499         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1500         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1501         if (body == NULL) {
1502                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1503                 RETURN(-EPROTO);
1504         }
1505
1506         /* set/clear over quota flag for a uid/gid */
1507         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1508             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1509                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1510
1511                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1512                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1513                        body->oa.o_flags);
1514                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1515         }
1516
1517         osc_update_grant(cli, body);
1518
1519         if (rc < 0)
1520                 RETURN(rc);
1521
1522         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1523                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1524
1525         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1526                 if (rc > 0) {
1527                         CERROR("Unexpected +ve rc %d\n", rc);
1528                         RETURN(-EPROTO);
1529                 }
1530                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1531
1532                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1533                         RETURN(-EAGAIN);
1534
1535                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1536                     check_write_checksum(&body->oa, peer, client_cksum,
1537                                          body->oa.o_cksum, aa->aa_requested_nob,
1538                                          aa->aa_page_count, aa->aa_ppga,
1539                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1540                         RETURN(-EAGAIN);
1541
1542                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1543                                      aa->aa_page_count, aa->aa_ppga);
1544                 GOTO(out, rc);
1545         }
1546
1547         /* The rest of this function executes only for OST_READs */
1548
1549         /* if unwrap_bulk failed, return -EAGAIN to retry */
1550         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1551         if (rc < 0)
1552                 GOTO(out, rc = -EAGAIN);
1553
1554         if (rc > aa->aa_requested_nob) {
1555                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1556                        aa->aa_requested_nob);
1557                 RETURN(-EPROTO);
1558         }
1559
1560         if (rc != req->rq_bulk->bd_nob_transferred) {
1561                 CERROR ("Unexpected rc %d (%d transferred)\n",
1562                         rc, req->rq_bulk->bd_nob_transferred);
1563                 return (-EPROTO);
1564         }
1565
1566         if (rc < aa->aa_requested_nob)
1567                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1568
1569         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1570                 static int cksum_counter;
1571                 __u32      server_cksum = body->oa.o_cksum;
1572                 char      *via;
1573                 char      *router;
1574                 cksum_type_t cksum_type;
1575
1576                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1577                                                body->oa.o_flags : 0);
1578                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1579                                                  aa->aa_ppga, OST_READ,
1580                                                  cksum_type);
1581
1582                 if (peer->nid == req->rq_bulk->bd_sender) {
1583                         via = router = "";
1584                 } else {
1585                         via = " via ";
1586                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1587                 }
1588
1589                 if (server_cksum == ~0 && rc > 0) {
1590                         CERROR("Protocol error: server %s set the 'checksum' "
1591                                "bit, but didn't send a checksum.  Not fatal, "
1592                                "but please notify on http://bugs.whamcloud.com/\n",
1593                                libcfs_nid2str(peer->nid));
1594                 } else if (server_cksum != client_cksum) {
1595                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1596                                            "%s%s%s inode "DFID" object "DOSTID
1597                                            " extent ["LPU64"-"LPU64"]\n",
1598                                            req->rq_import->imp_obd->obd_name,
1599                                            libcfs_nid2str(peer->nid),
1600                                            via, router,
1601                                            body->oa.o_valid & OBD_MD_FLFID ?
1602                                                 body->oa.o_parent_seq : (__u64)0,
1603                                            body->oa.o_valid & OBD_MD_FLFID ?
1604                                                 body->oa.o_parent_oid : 0,
1605                                            body->oa.o_valid & OBD_MD_FLFID ?
1606                                                 body->oa.o_parent_ver : 0,
1607                                            POSTID(&body->oa.o_oi),
1608                                            aa->aa_ppga[0]->off,
1609                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1610                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1611                                                                         1);
1612                         CERROR("client %x, server %x, cksum_type %x\n",
1613                                client_cksum, server_cksum, cksum_type);
1614                         cksum_counter = 0;
1615                         aa->aa_oa->o_cksum = client_cksum;
1616                         rc = -EAGAIN;
1617                 } else {
1618                         cksum_counter++;
1619                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1620                         rc = 0;
1621                 }
1622         } else if (unlikely(client_cksum)) {
1623                 static int cksum_missed;
1624
1625                 cksum_missed++;
1626                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1627                         CERROR("Checksum %u requested from %s but not sent\n",
1628                                cksum_missed, libcfs_nid2str(peer->nid));
1629         } else {
1630                 rc = 0;
1631         }
1632 out:
1633         if (rc >= 0)
1634                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1635                                      aa->aa_oa, &body->oa);
1636
1637         RETURN(rc);
1638 }
1639
1640 static int osc_brw_redo_request(struct ptlrpc_request *request,
1641                                 struct osc_brw_async_args *aa, int rc)
1642 {
1643         struct ptlrpc_request *new_req;
1644         struct osc_brw_async_args *new_aa;
1645         struct osc_async_page *oap;
1646         ENTRY;
1647
1648         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1649                   "redo for recoverable error %d", rc);
1650
1651         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1652                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1653                                   aa->aa_cli, aa->aa_oa,
1654                                   NULL /* lsm unused by osc currently */,
1655                                   aa->aa_page_count, aa->aa_ppga,
1656                                   &new_req, aa->aa_ocapa, 0, 1);
1657         if (rc)
1658                 RETURN(rc);
1659
1660         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1661                 if (oap->oap_request != NULL) {
1662                         LASSERTF(request == oap->oap_request,
1663                                  "request %p != oap_request %p\n",
1664                                  request, oap->oap_request);
1665                         if (oap->oap_interrupted) {
1666                                 ptlrpc_req_finished(new_req);
1667                                 RETURN(-EINTR);
1668                         }
1669                 }
1670         }
1671         /* New request takes over pga and oaps from old request.
1672          * Note that copying a list_head doesn't work, need to move it... */
1673         aa->aa_resends++;
1674         new_req->rq_interpret_reply = request->rq_interpret_reply;
1675         new_req->rq_async_args = request->rq_async_args;
1676         new_req->rq_commit_cb = request->rq_commit_cb;
1677         /* cap resend delay to the current request timeout, this is similar to
1678          * what ptlrpc does (see after_reply()) */
1679         if (aa->aa_resends > new_req->rq_timeout)
1680                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1681         else
1682                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1683         new_req->rq_generation_set = 1;
1684         new_req->rq_import_generation = request->rq_import_generation;
1685
1686         new_aa = ptlrpc_req_async_args(new_req);
1687
1688         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1689         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1690         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1691         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1692         new_aa->aa_resends = aa->aa_resends;
1693
1694         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1695                 if (oap->oap_request) {
1696                         ptlrpc_req_finished(oap->oap_request);
1697                         oap->oap_request = ptlrpc_request_addref(new_req);
1698                 }
1699         }
1700
1701         new_aa->aa_ocapa = aa->aa_ocapa;
1702         aa->aa_ocapa = NULL;
1703
1704         /* XXX: This code will run into problem if we're going to support
1705          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1706          * and wait for all of them to be finished. We should inherit request
1707          * set from old request. */
1708         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1709
1710         DEBUG_REQ(D_INFO, new_req, "new request");
1711         RETURN(0);
1712 }
1713
1714 /*
1715  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1716  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1717  * fine for our small page arrays and doesn't require allocation.  its an
1718  * insertion sort that swaps elements that are strides apart, shrinking the
1719  * stride down until its '1' and the array is sorted.
1720  */
1721 static void sort_brw_pages(struct brw_page **array, int num)
1722 {
1723         int stride, i, j;
1724         struct brw_page *tmp;
1725
1726         if (num == 1)
1727                 return;
1728         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1729                 ;
1730
1731         do {
1732                 stride /= 3;
1733                 for (i = stride ; i < num ; i++) {
1734                         tmp = array[i];
1735                         j = i;
1736                         while (j >= stride && array[j - stride]->off > tmp->off) {
1737                                 array[j] = array[j - stride];
1738                                 j -= stride;
1739                         }
1740                         array[j] = tmp;
1741                 }
1742         } while (stride > 1);
1743 }
1744
1745 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1746 {
1747         LASSERT(ppga != NULL);
1748         OBD_FREE(ppga, sizeof(*ppga) * count);
1749 }
1750
1751 static int brw_interpret(const struct lu_env *env,
1752                          struct ptlrpc_request *req, void *data, int rc)
1753 {
1754         struct osc_brw_async_args *aa = data;
1755         struct osc_extent *ext;
1756         struct osc_extent *tmp;
1757         struct client_obd *cli = aa->aa_cli;
1758         ENTRY;
1759
1760         rc = osc_brw_fini_request(req, rc);
1761         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1762         /* When server return -EINPROGRESS, client should always retry
1763          * regardless of the number of times the bulk was resent already. */
1764         if (osc_recoverable_error(rc)) {
1765                 if (req->rq_import_generation !=
1766                     req->rq_import->imp_generation) {
1767                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1768                                ""DOSTID", rc = %d.\n",
1769                                req->rq_import->imp_obd->obd_name,
1770                                POSTID(&aa->aa_oa->o_oi), rc);
1771                 } else if (rc == -EINPROGRESS ||
1772                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1773                         rc = osc_brw_redo_request(req, aa, rc);
1774                 } else {
1775                         CERROR("%s: too many resent retries for object: "
1776                                ""LPU64":"LPU64", rc = %d.\n",
1777                                req->rq_import->imp_obd->obd_name,
1778                                POSTID(&aa->aa_oa->o_oi), rc);
1779                 }
1780
1781                 if (rc == 0)
1782                         RETURN(0);
1783                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1784                         rc = -EIO;
1785         }
1786
1787         if (aa->aa_ocapa) {
1788                 capa_put(aa->aa_ocapa);
1789                 aa->aa_ocapa = NULL;
1790         }
1791
1792         if (rc == 0) {
1793                 struct obdo *oa = aa->aa_oa;
1794                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1795                 unsigned long valid = 0;
1796                 struct cl_object *obj;
1797                 struct osc_async_page *last;
1798
1799                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1800                 obj = osc2cl(last->oap_obj);
1801
1802                 cl_object_attr_lock(obj);
1803                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1804                         attr->cat_blocks = oa->o_blocks;
1805                         valid |= CAT_BLOCKS;
1806                 }
1807                 if (oa->o_valid & OBD_MD_FLMTIME) {
1808                         attr->cat_mtime = oa->o_mtime;
1809                         valid |= CAT_MTIME;
1810                 }
1811                 if (oa->o_valid & OBD_MD_FLATIME) {
1812                         attr->cat_atime = oa->o_atime;
1813                         valid |= CAT_ATIME;
1814                 }
1815                 if (oa->o_valid & OBD_MD_FLCTIME) {
1816                         attr->cat_ctime = oa->o_ctime;
1817                         valid |= CAT_CTIME;
1818                 }
1819
1820                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1821                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1822                         loff_t last_off = last->oap_count + last->oap_obj_off;
1823
1824                         /* Change file size if this is an out of quota or
1825                          * direct IO write and it extends the file size */
1826                         if (loi->loi_lvb.lvb_size < last_off) {
1827                                 attr->cat_size = last_off;
1828                                 valid |= CAT_SIZE;
1829                         }
1830                         /* Extend KMS if it's not a lockless write */
1831                         if (loi->loi_kms < last_off &&
1832                             oap2osc_page(last)->ops_srvlock == 0) {
1833                                 attr->cat_kms = last_off;
1834                                 valid |= CAT_KMS;
1835                         }
1836                 }
1837
1838                 if (valid != 0)
1839                         cl_object_attr_set(env, obj, attr, valid);
1840                 cl_object_attr_unlock(obj);
1841         }
1842         OBDO_FREE(aa->aa_oa);
1843
1844         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1845                 cfs_list_del_init(&ext->oe_link);
1846                 osc_extent_finish(env, ext, 1, rc);
1847         }
1848         LASSERT(cfs_list_empty(&aa->aa_exts));
1849         LASSERT(cfs_list_empty(&aa->aa_oaps));
1850
1851         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1852                           req->rq_bulk->bd_nob_transferred);
1853         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1854         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1855
1856         client_obd_list_lock(&cli->cl_loi_list_lock);
1857         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1858          * is called so we know whether to go to sync BRWs or wait for more
1859          * RPCs to complete */
1860         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1861                 cli->cl_w_in_flight--;
1862         else
1863                 cli->cl_r_in_flight--;
1864         osc_wake_cache_waiters(cli);
1865         client_obd_list_unlock(&cli->cl_loi_list_lock);
1866
1867         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1868         RETURN(rc);
1869 }
1870
1871 static void brw_commit(struct ptlrpc_request *req)
1872 {
1873         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1874          * this called via the rq_commit_cb, I need to ensure
1875          * osc_dec_unstable_pages is still called. Otherwise unstable
1876          * pages may be leaked. */
1877         spin_lock(&req->rq_lock);
1878         if (likely(req->rq_unstable)) {
1879                 req->rq_unstable = 0;
1880                 spin_unlock(&req->rq_lock);
1881
1882                 osc_dec_unstable_pages(req);
1883         } else {
1884                 req->rq_committed = 1;
1885                 spin_unlock(&req->rq_lock);
1886         }
1887 }
1888
1889 /**
1890  * Build an RPC by the list of extent @ext_list. The caller must ensure
1891  * that the total pages in this list are NOT over max pages per RPC.
1892  * Extents in the list must be in OES_RPC state.
1893  */
1894 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1895                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
1896 {
1897         struct ptlrpc_request           *req = NULL;
1898         struct osc_extent               *ext;
1899         struct brw_page                 **pga = NULL;
1900         struct osc_brw_async_args       *aa = NULL;
1901         struct obdo                     *oa = NULL;
1902         struct osc_async_page           *oap;
1903         struct osc_async_page           *tmp;
1904         struct cl_req                   *clerq = NULL;
1905         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1906                                                                       CRT_READ;
1907         struct ldlm_lock                *lock = NULL;
1908         struct cl_req_attr              *crattr = NULL;
1909         obd_off                         starting_offset = OBD_OBJECT_EOF;
1910         obd_off                         ending_offset = 0;
1911         int                             mpflag = 0;
1912         int                             mem_tight = 0;
1913         int                             page_count = 0;
1914         int                             i;
1915         int                             rc;
1916         CFS_LIST_HEAD(rpc_list);
1917
1918         ENTRY;
1919         LASSERT(!cfs_list_empty(ext_list));
1920
1921         /* add pages into rpc_list to build BRW rpc */
1922         cfs_list_for_each_entry(ext, ext_list, oe_link) {
1923                 LASSERT(ext->oe_state == OES_RPC);
1924                 mem_tight |= ext->oe_memalloc;
1925                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1926                         ++page_count;
1927                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
1928                         if (starting_offset > oap->oap_obj_off)
1929                                 starting_offset = oap->oap_obj_off;
1930                         else
1931                                 LASSERT(oap->oap_page_off == 0);
1932                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1933                                 ending_offset = oap->oap_obj_off +
1934                                                 oap->oap_count;
1935                         else
1936                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1937                                         PAGE_CACHE_SIZE);
1938                 }
1939         }
1940
1941         if (mem_tight)
1942                 mpflag = cfs_memory_pressure_get_and_set();
1943
1944         OBD_ALLOC(crattr, sizeof(*crattr));
1945         if (crattr == NULL)
1946                 GOTO(out, rc = -ENOMEM);
1947
1948         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1949         if (pga == NULL)
1950                 GOTO(out, rc = -ENOMEM);
1951
1952         OBDO_ALLOC(oa);
1953         if (oa == NULL)
1954                 GOTO(out, rc = -ENOMEM);
1955
1956         i = 0;
1957         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1958                 struct cl_page *page = oap2cl_page(oap);
1959                 if (clerq == NULL) {
1960                         clerq = cl_req_alloc(env, page, crt,
1961                                              1 /* only 1-object rpcs for now */);
1962                         if (IS_ERR(clerq))
1963                                 GOTO(out, rc = PTR_ERR(clerq));
1964                         lock = oap->oap_ldlm_lock;
1965                 }
1966                 if (mem_tight)
1967                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1968                 pga[i] = &oap->oap_brw_page;
1969                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1970                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1971                        pga[i]->pg, page_index(oap->oap_page), oap,
1972                        pga[i]->flag);
1973                 i++;
1974                 cl_req_page_add(env, clerq, page);
1975         }
1976
1977         /* always get the data for the obdo for the rpc */
1978         LASSERT(clerq != NULL);
1979         crattr->cra_oa = oa;
1980         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1981         if (lock) {
1982                 oa->o_handle = lock->l_remote_handle;
1983                 oa->o_valid |= OBD_MD_FLHANDLE;
1984         }
1985
1986         rc = cl_req_prep(env, clerq);
1987         if (rc != 0) {
1988                 CERROR("cl_req_prep failed: %d\n", rc);
1989                 GOTO(out, rc);
1990         }
1991
1992         sort_brw_pages(pga, page_count);
1993         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1994                         pga, &req, crattr->cra_capa, 1, 0);
1995         if (rc != 0) {
1996                 CERROR("prep_req failed: %d\n", rc);
1997                 GOTO(out, rc);
1998         }
1999
2000         req->rq_commit_cb = brw_commit;
2001         req->rq_interpret_reply = brw_interpret;
2002
2003         if (mem_tight != 0)
2004                 req->rq_memalloc = 1;
2005
2006         /* Need to update the timestamps after the request is built in case
2007          * we race with setattr (locally or in queue at OST).  If OST gets
2008          * later setattr before earlier BRW (as determined by the request xid),
2009          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2010          * way to do this in a single call.  bug 10150 */
2011         cl_req_attr_set(env, clerq, crattr,
2012                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2013
2014         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2015
2016         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2017         aa = ptlrpc_req_async_args(req);
2018         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2019         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2020         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2021         cfs_list_splice_init(ext_list, &aa->aa_exts);
2022         aa->aa_clerq = clerq;
2023
2024         /* queued sync pages can be torn down while the pages
2025          * were between the pending list and the rpc */
2026         tmp = NULL;
2027         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2028                 /* only one oap gets a request reference */
2029                 if (tmp == NULL)
2030                         tmp = oap;
2031                 if (oap->oap_interrupted && !req->rq_intr) {
2032                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2033                                         oap, req);
2034                         ptlrpc_mark_interrupted(req);
2035                 }
2036         }
2037         if (tmp != NULL)
2038                 tmp->oap_request = ptlrpc_request_addref(req);
2039
2040         client_obd_list_lock(&cli->cl_loi_list_lock);
2041         starting_offset >>= PAGE_CACHE_SHIFT;
2042         if (cmd == OBD_BRW_READ) {
2043                 cli->cl_r_in_flight++;
2044                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2045                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2046                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2047                                       starting_offset + 1);
2048         } else {
2049                 cli->cl_w_in_flight++;
2050                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2051                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2052                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2053                                       starting_offset + 1);
2054         }
2055         client_obd_list_unlock(&cli->cl_loi_list_lock);
2056
2057         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2058                   page_count, aa, cli->cl_r_in_flight,
2059                   cli->cl_w_in_flight);
2060
2061         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2062          * see which CPU/NUMA node the majority of pages were allocated
2063          * on, and try to assign the async RPC to the CPU core
2064          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2065          *
2066          * But on the other hand, we expect that multiple ptlrpcd
2067          * threads and the initial write sponsor can run in parallel,
2068          * especially when data checksum is enabled, which is CPU-bound
2069          * operation and single ptlrpcd thread cannot process in time.
2070          * So more ptlrpcd threads sharing BRW load
2071          * (with PDL_POLICY_ROUND) seems better.
2072          */
2073         ptlrpcd_add_req(req, pol, -1);
2074         rc = 0;
2075         EXIT;
2076
2077 out:
2078         if (mem_tight != 0)
2079                 cfs_memory_pressure_restore(mpflag);
2080
2081         if (crattr != NULL) {
2082                 capa_put(crattr->cra_capa);
2083                 OBD_FREE(crattr, sizeof(*crattr));
2084         }
2085
2086         if (rc != 0) {
2087                 LASSERT(req == NULL);
2088
2089                 if (oa)
2090                         OBDO_FREE(oa);
2091                 if (pga)
2092                         OBD_FREE(pga, sizeof(*pga) * page_count);
2093                 /* this should happen rarely and is pretty bad, it makes the
2094                  * pending list not follow the dirty order */
2095                 while (!cfs_list_empty(ext_list)) {
2096                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2097                                              oe_link);
2098                         cfs_list_del_init(&ext->oe_link);
2099                         osc_extent_finish(env, ext, 0, rc);
2100                 }
2101                 if (clerq && !IS_ERR(clerq))
2102                         cl_req_completion(env, clerq, rc);
2103         }
2104         RETURN(rc);
2105 }
2106
2107 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2108                                         struct ldlm_enqueue_info *einfo)
2109 {
2110         void *data = einfo->ei_cbdata;
2111         int set = 0;
2112
2113         LASSERT(lock != NULL);
2114         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2115         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2116         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2117         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2118
2119         lock_res_and_lock(lock);
2120         spin_lock(&osc_ast_guard);
2121
2122         if (lock->l_ast_data == NULL)
2123                 lock->l_ast_data = data;
2124         if (lock->l_ast_data == data)
2125                 set = 1;
2126
2127         spin_unlock(&osc_ast_guard);
2128         unlock_res_and_lock(lock);
2129
2130         return set;
2131 }
2132
2133 static int osc_set_data_with_check(struct lustre_handle *lockh,
2134                                    struct ldlm_enqueue_info *einfo)
2135 {
2136         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2137         int set = 0;
2138
2139         if (lock != NULL) {
2140                 set = osc_set_lock_data_with_check(lock, einfo);
2141                 LDLM_LOCK_PUT(lock);
2142         } else
2143                 CERROR("lockh %p, data %p - client evicted?\n",
2144                        lockh, einfo->ei_cbdata);
2145         return set;
2146 }
2147
2148 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2149                              ldlm_iterator_t replace, void *data)
2150 {
2151         struct ldlm_res_id res_id;
2152         struct obd_device *obd = class_exp2obd(exp);
2153
2154         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2155         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2156         return 0;
2157 }
2158
2159 /* find any ldlm lock of the inode in osc
2160  * return 0    not find
2161  *        1    find one
2162  *      < 0    error */
2163 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2164                            ldlm_iterator_t replace, void *data)
2165 {
2166         struct ldlm_res_id res_id;
2167         struct obd_device *obd = class_exp2obd(exp);
2168         int rc = 0;
2169
2170         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2171         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2172         if (rc == LDLM_ITER_STOP)
2173                 return(1);
2174         if (rc == LDLM_ITER_CONTINUE)
2175                 return(0);
2176         return(rc);
2177 }
2178
2179 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2180                             obd_enqueue_update_f upcall, void *cookie,
2181                             __u64 *flags, int agl, int rc)
2182 {
2183         int intent = *flags & LDLM_FL_HAS_INTENT;
2184         ENTRY;
2185
2186         if (intent) {
2187                 /* The request was created before ldlm_cli_enqueue call. */
2188                 if (rc == ELDLM_LOCK_ABORTED) {
2189                         struct ldlm_reply *rep;
2190                         rep = req_capsule_server_get(&req->rq_pill,
2191                                                      &RMF_DLM_REP);
2192
2193                         LASSERT(rep != NULL);
2194                         rep->lock_policy_res1 =
2195                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2196                         if (rep->lock_policy_res1)
2197                                 rc = rep->lock_policy_res1;
2198                 }
2199         }
2200
2201         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2202             (rc == 0)) {
2203                 *flags |= LDLM_FL_LVB_READY;
2204                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2205                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2206         }
2207
2208         /* Call the update callback. */
2209         rc = (*upcall)(cookie, rc);
2210         RETURN(rc);
2211 }
2212
2213 static int osc_enqueue_interpret(const struct lu_env *env,
2214                                  struct ptlrpc_request *req,
2215                                  struct osc_enqueue_args *aa, int rc)
2216 {
2217         struct ldlm_lock *lock;
2218         struct lustre_handle handle;
2219         __u32 mode;
2220         struct ost_lvb *lvb;
2221         __u32 lvb_len;
2222         __u64 *flags = aa->oa_flags;
2223
2224         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2225          * might be freed anytime after lock upcall has been called. */
2226         lustre_handle_copy(&handle, aa->oa_lockh);
2227         mode = aa->oa_ei->ei_mode;
2228
2229         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2230          * be valid. */
2231         lock = ldlm_handle2lock(&handle);
2232
2233         /* Take an additional reference so that a blocking AST that
2234          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2235          * to arrive after an upcall has been executed by
2236          * osc_enqueue_fini(). */
2237         ldlm_lock_addref(&handle, mode);
2238
2239         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2240         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2241
2242         /* Let CP AST to grant the lock first. */
2243         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2244
2245         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2246                 lvb = NULL;
2247                 lvb_len = 0;
2248         } else {
2249                 lvb = aa->oa_lvb;
2250                 lvb_len = sizeof(*aa->oa_lvb);
2251         }
2252
2253         /* Complete obtaining the lock procedure. */
2254         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2255                                    mode, flags, lvb, lvb_len, &handle, rc);
2256         /* Complete osc stuff. */
2257         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2258                               flags, aa->oa_agl, rc);
2259
2260         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2261
2262         /* Release the lock for async request. */
2263         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2264                 /*
2265                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2266                  * not already released by
2267                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2268                  */
2269                 ldlm_lock_decref(&handle, mode);
2270
2271         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2272                  aa->oa_lockh, req, aa);
2273         ldlm_lock_decref(&handle, mode);
2274         LDLM_LOCK_PUT(lock);
2275         return rc;
2276 }
2277
2278 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2279
2280 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2281  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2282  * other synchronous requests, however keeping some locks and trying to obtain
2283  * others may take a considerable amount of time in a case of ost failure; and
2284  * when other sync requests do not get released lock from a client, the client
2285  * is excluded from the cluster -- such scenarious make the life difficult, so
2286  * release locks just after they are obtained. */
2287 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2288                      __u64 *flags, ldlm_policy_data_t *policy,
2289                      struct ost_lvb *lvb, int kms_valid,
2290                      obd_enqueue_update_f upcall, void *cookie,
2291                      struct ldlm_enqueue_info *einfo,
2292                      struct lustre_handle *lockh,
2293                      struct ptlrpc_request_set *rqset, int async, int agl)
2294 {
2295         struct obd_device *obd = exp->exp_obd;
2296         struct ptlrpc_request *req = NULL;
2297         int intent = *flags & LDLM_FL_HAS_INTENT;
2298         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2299         ldlm_mode_t mode;
2300         int rc;
2301         ENTRY;
2302
2303         /* Filesystem lock extents are extended to page boundaries so that
2304          * dealing with the page cache is a little smoother.  */
2305         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2306         policy->l_extent.end |= ~CFS_PAGE_MASK;
2307
2308         /*
2309          * kms is not valid when either object is completely fresh (so that no
2310          * locks are cached), or object was evicted. In the latter case cached
2311          * lock cannot be used, because it would prime inode state with
2312          * potentially stale LVB.
2313          */
2314         if (!kms_valid)
2315                 goto no_match;
2316
2317         /* Next, search for already existing extent locks that will cover us */
2318         /* If we're trying to read, we also search for an existing PW lock.  The
2319          * VFS and page cache already protect us locally, so lots of readers/
2320          * writers can share a single PW lock.
2321          *
2322          * There are problems with conversion deadlocks, so instead of
2323          * converting a read lock to a write lock, we'll just enqueue a new
2324          * one.
2325          *
2326          * At some point we should cancel the read lock instead of making them
2327          * send us a blocking callback, but there are problems with canceling
2328          * locks out from other users right now, too. */
2329         mode = einfo->ei_mode;
2330         if (einfo->ei_mode == LCK_PR)
2331                 mode |= LCK_PW;
2332         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2333                                einfo->ei_type, policy, mode, lockh, 0);
2334         if (mode) {
2335                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2336
2337                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2338                         /* For AGL, if enqueue RPC is sent but the lock is not
2339                          * granted, then skip to process this strpe.
2340                          * Return -ECANCELED to tell the caller. */
2341                         ldlm_lock_decref(lockh, mode);
2342                         LDLM_LOCK_PUT(matched);
2343                         RETURN(-ECANCELED);
2344                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2345                         *flags |= LDLM_FL_LVB_READY;
2346                         /* addref the lock only if not async requests and PW
2347                          * lock is matched whereas we asked for PR. */
2348                         if (!rqset && einfo->ei_mode != mode)
2349                                 ldlm_lock_addref(lockh, LCK_PR);
2350                         if (intent) {
2351                                 /* I would like to be able to ASSERT here that
2352                                  * rss <= kms, but I can't, for reasons which
2353                                  * are explained in lov_enqueue() */
2354                         }
2355
2356                         /* We already have a lock, and it's referenced.
2357                          *
2358                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2359                          * AGL upcall may change it to CLS_HELD directly. */
2360                         (*upcall)(cookie, ELDLM_OK);
2361
2362                         if (einfo->ei_mode != mode)
2363                                 ldlm_lock_decref(lockh, LCK_PW);
2364                         else if (rqset)
2365                                 /* For async requests, decref the lock. */
2366                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2367                         LDLM_LOCK_PUT(matched);
2368                         RETURN(ELDLM_OK);
2369                 } else {
2370                         ldlm_lock_decref(lockh, mode);
2371                         LDLM_LOCK_PUT(matched);
2372                 }
2373         }
2374
2375  no_match:
2376         if (intent) {
2377                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2378                                            &RQF_LDLM_ENQUEUE_LVB);
2379                 if (req == NULL)
2380                         RETURN(-ENOMEM);
2381
2382                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2383                 if (rc < 0) {
2384                         ptlrpc_request_free(req);
2385                         RETURN(rc);
2386                 }
2387
2388                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2389                                      sizeof *lvb);
2390                 ptlrpc_request_set_replen(req);
2391         }
2392
2393         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2394         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2395
2396         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2397                               sizeof(*lvb), LVB_T_OST, lockh, async);
2398         if (rqset) {
2399                 if (!rc) {
2400                         struct osc_enqueue_args *aa;
2401                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2402                         aa = ptlrpc_req_async_args(req);
2403                         aa->oa_ei = einfo;
2404                         aa->oa_exp = exp;
2405                         aa->oa_flags  = flags;
2406                         aa->oa_upcall = upcall;
2407                         aa->oa_cookie = cookie;
2408                         aa->oa_lvb    = lvb;
2409                         aa->oa_lockh  = lockh;
2410                         aa->oa_agl    = !!agl;
2411
2412                         req->rq_interpret_reply =
2413                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2414                         if (rqset == PTLRPCD_SET)
2415                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2416                         else
2417                                 ptlrpc_set_add_req(rqset, req);
2418                 } else if (intent) {
2419                         ptlrpc_req_finished(req);
2420                 }
2421                 RETURN(rc);
2422         }
2423
2424         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2425         if (intent)
2426                 ptlrpc_req_finished(req);
2427
2428         RETURN(rc);
2429 }
2430
2431 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2432                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2433                    __u64 *flags, void *data, struct lustre_handle *lockh,
2434                    int unref)
2435 {
2436         struct obd_device *obd = exp->exp_obd;
2437         __u64 lflags = *flags;
2438         ldlm_mode_t rc;
2439         ENTRY;
2440
2441         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2442                 RETURN(-EIO);
2443
2444         /* Filesystem lock extents are extended to page boundaries so that
2445          * dealing with the page cache is a little smoother */
2446         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2447         policy->l_extent.end |= ~CFS_PAGE_MASK;
2448
2449         /* Next, search for already existing extent locks that will cover us */
2450         /* If we're trying to read, we also search for an existing PW lock.  The
2451          * VFS and page cache already protect us locally, so lots of readers/
2452          * writers can share a single PW lock. */
2453         rc = mode;
2454         if (mode == LCK_PR)
2455                 rc |= LCK_PW;
2456         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2457                              res_id, type, policy, rc, lockh, unref);
2458         if (rc) {
2459                 if (data != NULL) {
2460                         if (!osc_set_data_with_check(lockh, data)) {
2461                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2462                                         ldlm_lock_decref(lockh, rc);
2463                                 RETURN(0);
2464                         }
2465                 }
2466                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2467                         ldlm_lock_addref(lockh, LCK_PR);
2468                         ldlm_lock_decref(lockh, LCK_PW);
2469                 }
2470                 RETURN(rc);
2471         }
2472         RETURN(rc);
2473 }
2474
2475 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2476 {
2477         ENTRY;
2478
2479         if (unlikely(mode == LCK_GROUP))
2480                 ldlm_lock_decref_and_cancel(lockh, mode);
2481         else
2482                 ldlm_lock_decref(lockh, mode);
2483
2484         RETURN(0);
2485 }
2486
2487 static int osc_statfs_interpret(const struct lu_env *env,
2488                                 struct ptlrpc_request *req,
2489                                 struct osc_async_args *aa, int rc)
2490 {
2491         struct obd_statfs *msfs;
2492         ENTRY;
2493
2494         if (rc == -EBADR)
2495                 /* The request has in fact never been sent
2496                  * due to issues at a higher level (LOV).
2497                  * Exit immediately since the caller is
2498                  * aware of the problem and takes care
2499                  * of the clean up */
2500                  RETURN(rc);
2501
2502         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2503             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2504                 GOTO(out, rc = 0);
2505
2506         if (rc != 0)
2507                 GOTO(out, rc);
2508
2509         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2510         if (msfs == NULL) {
2511                 GOTO(out, rc = -EPROTO);
2512         }
2513
2514         *aa->aa_oi->oi_osfs = *msfs;
2515 out:
2516         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2517         RETURN(rc);
2518 }
2519
2520 static int osc_statfs_async(struct obd_export *exp,
2521                             struct obd_info *oinfo, __u64 max_age,
2522                             struct ptlrpc_request_set *rqset)
2523 {
2524         struct obd_device     *obd = class_exp2obd(exp);
2525         struct ptlrpc_request *req;
2526         struct osc_async_args *aa;
2527         int                    rc;
2528         ENTRY;
2529
2530         /* We could possibly pass max_age in the request (as an absolute
2531          * timestamp or a "seconds.usec ago") so the target can avoid doing
2532          * extra calls into the filesystem if that isn't necessary (e.g.
2533          * during mount that would help a bit).  Having relative timestamps
2534          * is not so great if request processing is slow, while absolute
2535          * timestamps are not ideal because they need time synchronization. */
2536         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2537         if (req == NULL)
2538                 RETURN(-ENOMEM);
2539
2540         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2541         if (rc) {
2542                 ptlrpc_request_free(req);
2543                 RETURN(rc);
2544         }
2545         ptlrpc_request_set_replen(req);
2546         req->rq_request_portal = OST_CREATE_PORTAL;
2547         ptlrpc_at_set_req_timeout(req);
2548
2549         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2550                 /* procfs requests not want stat in wait for avoid deadlock */
2551                 req->rq_no_resend = 1;
2552                 req->rq_no_delay = 1;
2553         }
2554
2555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2557         aa = ptlrpc_req_async_args(req);
2558         aa->aa_oi = oinfo;
2559
2560         ptlrpc_set_add_req(rqset, req);
2561         RETURN(0);
2562 }
2563
2564 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2565                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2566 {
2567         struct obd_device     *obd = class_exp2obd(exp);
2568         struct obd_statfs     *msfs;
2569         struct ptlrpc_request *req;
2570         struct obd_import     *imp = NULL;
2571         int rc;
2572         ENTRY;
2573
2574         /*Since the request might also come from lprocfs, so we need
2575          *sync this with client_disconnect_export Bug15684*/
2576         down_read(&obd->u.cli.cl_sem);
2577         if (obd->u.cli.cl_import)
2578                 imp = class_import_get(obd->u.cli.cl_import);
2579         up_read(&obd->u.cli.cl_sem);
2580         if (!imp)
2581                 RETURN(-ENODEV);
2582
2583         /* We could possibly pass max_age in the request (as an absolute
2584          * timestamp or a "seconds.usec ago") so the target can avoid doing
2585          * extra calls into the filesystem if that isn't necessary (e.g.
2586          * during mount that would help a bit).  Having relative timestamps
2587          * is not so great if request processing is slow, while absolute
2588          * timestamps are not ideal because they need time synchronization. */
2589         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2590
2591         class_import_put(imp);
2592
2593         if (req == NULL)
2594                 RETURN(-ENOMEM);
2595
2596         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2597         if (rc) {
2598                 ptlrpc_request_free(req);
2599                 RETURN(rc);
2600         }
2601         ptlrpc_request_set_replen(req);
2602         req->rq_request_portal = OST_CREATE_PORTAL;
2603         ptlrpc_at_set_req_timeout(req);
2604
2605         if (flags & OBD_STATFS_NODELAY) {
2606                 /* procfs requests not want stat in wait for avoid deadlock */
2607                 req->rq_no_resend = 1;
2608                 req->rq_no_delay = 1;
2609         }
2610
2611         rc = ptlrpc_queue_wait(req);
2612         if (rc)
2613                 GOTO(out, rc);
2614
2615         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2616         if (msfs == NULL) {
2617                 GOTO(out, rc = -EPROTO);
2618         }
2619
2620         *osfs = *msfs;
2621
2622         EXIT;
2623  out:
2624         ptlrpc_req_finished(req);
2625         return rc;
2626 }
2627
2628 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2629                          void *karg, void *uarg)
2630 {
2631         struct obd_device *obd = exp->exp_obd;
2632         struct obd_ioctl_data *data = karg;
2633         int err = 0;
2634         ENTRY;
2635
2636         if (!try_module_get(THIS_MODULE)) {
2637                 CERROR("Can't get module. Is it alive?");
2638                 return -EINVAL;
2639         }
2640         switch (cmd) {
2641         case OBD_IOC_CLIENT_RECOVER:
2642                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2643                                             data->ioc_inlbuf1, 0);
2644                 if (err > 0)
2645                         err = 0;
2646                 GOTO(out, err);
2647         case IOC_OSC_SET_ACTIVE:
2648                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2649                                                data->ioc_offset);
2650                 GOTO(out, err);
2651         case OBD_IOC_POLL_QUOTACHECK:
2652                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2653                 GOTO(out, err);
2654         case OBD_IOC_PING_TARGET:
2655                 err = ptlrpc_obd_ping(obd);
2656                 GOTO(out, err);
2657         default:
2658                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2659                        cmd, current_comm());
2660                 GOTO(out, err = -ENOTTY);
2661         }
2662 out:
2663         module_put(THIS_MODULE);
2664         return err;
2665 }
2666
2667 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2668                         obd_count keylen, void *key, __u32 *vallen, void *val,
2669                         struct lov_stripe_md *lsm)
2670 {
2671         ENTRY;
2672         if (!vallen || !val)
2673                 RETURN(-EFAULT);
2674
2675         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2676                 __u32 *stripe = val;
2677                 *vallen = sizeof(*stripe);
2678                 *stripe = 0;
2679                 RETURN(0);
2680         } else if (KEY_IS(KEY_LAST_ID)) {
2681                 struct ptlrpc_request *req;
2682                 obd_id                *reply;
2683                 char                  *tmp;
2684                 int                    rc;
2685
2686                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2687                                            &RQF_OST_GET_INFO_LAST_ID);
2688                 if (req == NULL)
2689                         RETURN(-ENOMEM);
2690
2691                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2692                                      RCL_CLIENT, keylen);
2693                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2694                 if (rc) {
2695                         ptlrpc_request_free(req);
2696                         RETURN(rc);
2697                 }
2698
2699                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2700                 memcpy(tmp, key, keylen);
2701
2702                 req->rq_no_delay = req->rq_no_resend = 1;
2703                 ptlrpc_request_set_replen(req);
2704                 rc = ptlrpc_queue_wait(req);
2705                 if (rc)
2706                         GOTO(out, rc);
2707
2708                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2709                 if (reply == NULL)
2710                         GOTO(out, rc = -EPROTO);
2711
2712                 *((obd_id *)val) = *reply;
2713         out:
2714                 ptlrpc_req_finished(req);
2715                 RETURN(rc);
2716         } else if (KEY_IS(KEY_FIEMAP)) {
2717                 struct ll_fiemap_info_key *fm_key =
2718                                 (struct ll_fiemap_info_key *)key;
2719                 struct ldlm_res_id       res_id;
2720                 ldlm_policy_data_t       policy;
2721                 struct lustre_handle     lockh;
2722                 ldlm_mode_t              mode = 0;
2723                 struct ptlrpc_request   *req;
2724                 struct ll_user_fiemap   *reply;
2725                 char                    *tmp;
2726                 int                      rc;
2727
2728                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2729                         goto skip_locking;
2730
2731                 policy.l_extent.start = fm_key->fiemap.fm_start &
2732                                                 CFS_PAGE_MASK;
2733
2734                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2735                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2736                         policy.l_extent.end = OBD_OBJECT_EOF;
2737                 else
2738                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2739                                 fm_key->fiemap.fm_length +
2740                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2741
2742                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2743                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2744                                        LDLM_FL_BLOCK_GRANTED |
2745                                        LDLM_FL_LVB_READY,
2746                                        &res_id, LDLM_EXTENT, &policy,
2747                                        LCK_PR | LCK_PW, &lockh, 0);
2748                 if (mode) { /* lock is cached on client */
2749                         if (mode != LCK_PR) {
2750                                 ldlm_lock_addref(&lockh, LCK_PR);
2751                                 ldlm_lock_decref(&lockh, LCK_PW);
2752                         }
2753                 } else { /* no cached lock, needs acquire lock on server side */
2754                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2755                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2756                 }
2757
2758 skip_locking:
2759                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2760                                            &RQF_OST_GET_INFO_FIEMAP);
2761                 if (req == NULL)
2762                         GOTO(drop_lock, rc = -ENOMEM);
2763
2764                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2765                                      RCL_CLIENT, keylen);
2766                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2767                                      RCL_CLIENT, *vallen);
2768                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2769                                      RCL_SERVER, *vallen);
2770
2771                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2772                 if (rc) {
2773                         ptlrpc_request_free(req);
2774                         GOTO(drop_lock, rc);
2775                 }
2776
2777                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2778                 memcpy(tmp, key, keylen);
2779                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2780                 memcpy(tmp, val, *vallen);
2781
2782                 ptlrpc_request_set_replen(req);
2783                 rc = ptlrpc_queue_wait(req);
2784                 if (rc)
2785                         GOTO(fini_req, rc);
2786
2787                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2788                 if (reply == NULL)
2789                         GOTO(fini_req, rc = -EPROTO);
2790
2791                 memcpy(val, reply, *vallen);
2792 fini_req:
2793                 ptlrpc_req_finished(req);
2794 drop_lock:
2795                 if (mode)
2796                         ldlm_lock_decref(&lockh, LCK_PR);
2797                 RETURN(rc);
2798         }
2799
2800         RETURN(-EINVAL);
2801 }
2802
2803 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2804                               obd_count keylen, void *key, obd_count vallen,
2805                               void *val, struct ptlrpc_request_set *set)
2806 {
2807         struct ptlrpc_request *req;
2808         struct obd_device     *obd = exp->exp_obd;
2809         struct obd_import     *imp = class_exp2cliimp(exp);
2810         char                  *tmp;
2811         int                    rc;
2812         ENTRY;
2813
2814         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2815
2816         if (KEY_IS(KEY_CHECKSUM)) {
2817                 if (vallen != sizeof(int))
2818                         RETURN(-EINVAL);
2819                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2820                 RETURN(0);
2821         }
2822
2823         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2824                 sptlrpc_conf_client_adapt(obd);
2825                 RETURN(0);
2826         }
2827
2828         if (KEY_IS(KEY_FLUSH_CTX)) {
2829                 sptlrpc_import_flush_my_ctx(imp);
2830                 RETURN(0);
2831         }
2832
2833         if (KEY_IS(KEY_CACHE_SET)) {
2834                 struct client_obd *cli = &obd->u.cli;
2835
2836                 LASSERT(cli->cl_cache == NULL); /* only once */
2837                 cli->cl_cache = (struct cl_client_cache *)val;
2838                 atomic_inc(&cli->cl_cache->ccc_users);
2839                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2840
2841                 /* add this osc into entity list */
2842                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
2843                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2844                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2845                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2846
2847                 RETURN(0);
2848         }
2849
2850         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2851                 struct client_obd *cli = &obd->u.cli;
2852                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2853                 int target = *(int *)val;
2854
2855                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2856                 *(int *)val -= nr;
2857                 RETURN(0);
2858         }
2859
2860         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2861                 RETURN(-EINVAL);
2862
2863         /* We pass all other commands directly to OST. Since nobody calls osc
2864            methods directly and everybody is supposed to go through LOV, we
2865            assume lov checked invalid values for us.
2866            The only recognised values so far are evict_by_nid and mds_conn.
2867            Even if something bad goes through, we'd get a -EINVAL from OST
2868            anyway. */
2869
2870         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2871                                                 &RQF_OST_SET_GRANT_INFO :
2872                                                 &RQF_OBD_SET_INFO);
2873         if (req == NULL)
2874                 RETURN(-ENOMEM);
2875
2876         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2877                              RCL_CLIENT, keylen);
2878         if (!KEY_IS(KEY_GRANT_SHRINK))
2879                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2880                                      RCL_CLIENT, vallen);
2881         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2882         if (rc) {
2883                 ptlrpc_request_free(req);
2884                 RETURN(rc);
2885         }
2886
2887         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2888         memcpy(tmp, key, keylen);
2889         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2890                                                         &RMF_OST_BODY :
2891                                                         &RMF_SETINFO_VAL);
2892         memcpy(tmp, val, vallen);
2893
2894         if (KEY_IS(KEY_GRANT_SHRINK)) {
2895                 struct osc_grant_args *aa;
2896                 struct obdo *oa;
2897
2898                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2899                 aa = ptlrpc_req_async_args(req);
2900                 OBDO_ALLOC(oa);
2901                 if (!oa) {
2902                         ptlrpc_req_finished(req);
2903                         RETURN(-ENOMEM);
2904                 }
2905                 *oa = ((struct ost_body *)val)->oa;
2906                 aa->aa_oa = oa;
2907                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2908         }
2909
2910         ptlrpc_request_set_replen(req);
2911         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2912                 LASSERT(set != NULL);
2913                 ptlrpc_set_add_req(set, req);
2914                 ptlrpc_check_set(NULL, set);
2915         } else
2916                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2917
2918         RETURN(0);
2919 }
2920
2921 static int osc_reconnect(const struct lu_env *env,
2922                          struct obd_export *exp, struct obd_device *obd,
2923                          struct obd_uuid *cluuid,
2924                          struct obd_connect_data *data,
2925                          void *localdata)
2926 {
2927         struct client_obd *cli = &obd->u.cli;
2928
2929         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2930                 long lost_grant;
2931
2932                 client_obd_list_lock(&cli->cl_loi_list_lock);
2933                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2934                                 2 * cli_brw_size(obd);
2935                 lost_grant = cli->cl_lost_grant;
2936                 cli->cl_lost_grant = 0;
2937                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2938
2939                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2940                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2941                        data->ocd_version, data->ocd_grant, lost_grant);
2942         }
2943
2944         RETURN(0);
2945 }
2946
2947 static int osc_disconnect(struct obd_export *exp)
2948 {
2949         struct obd_device *obd = class_exp2obd(exp);
2950         struct llog_ctxt  *ctxt;
2951         int rc;
2952
2953         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2954         if (ctxt) {
2955                 if (obd->u.cli.cl_conn_count == 1) {
2956                         /* Flush any remaining cancel messages out to the
2957                          * target */
2958                         llog_sync(ctxt, exp, 0);
2959                 }
2960                 llog_ctxt_put(ctxt);
2961         } else {
2962                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2963                        obd);
2964         }
2965
2966         rc = client_disconnect_export(exp);
2967         /**
2968          * Initially we put del_shrink_grant before disconnect_export, but it
2969          * causes the following problem if setup (connect) and cleanup
2970          * (disconnect) are tangled together.
2971          *      connect p1                     disconnect p2
2972          *   ptlrpc_connect_import
2973          *     ...............               class_manual_cleanup
2974          *                                     osc_disconnect
2975          *                                     del_shrink_grant
2976          *   ptlrpc_connect_interrupt
2977          *     init_grant_shrink
2978          *   add this client to shrink list
2979          *                                      cleanup_osc
2980          * Bang! pinger trigger the shrink.
2981          * So the osc should be disconnected from the shrink list, after we
2982          * are sure the import has been destroyed. BUG18662
2983          */
2984         if (obd->u.cli.cl_import == NULL)
2985                 osc_del_shrink_grant(&obd->u.cli);
2986         return rc;
2987 }
2988
2989 static int osc_import_event(struct obd_device *obd,
2990                             struct obd_import *imp,
2991                             enum obd_import_event event)
2992 {
2993         struct client_obd *cli;
2994         int rc = 0;
2995
2996         ENTRY;
2997         LASSERT(imp->imp_obd == obd);
2998
2999         switch (event) {
3000         case IMP_EVENT_DISCON: {
3001                 cli = &obd->u.cli;
3002                 client_obd_list_lock(&cli->cl_loi_list_lock);
3003                 cli->cl_avail_grant = 0;
3004                 cli->cl_lost_grant = 0;
3005                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3006                 break;
3007         }
3008         case IMP_EVENT_INACTIVE: {
3009                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3010                 break;
3011         }
3012         case IMP_EVENT_INVALIDATE: {
3013                 struct ldlm_namespace *ns = obd->obd_namespace;
3014                 struct lu_env         *env;
3015                 int                    refcheck;
3016
3017                 env = cl_env_get(&refcheck);
3018                 if (!IS_ERR(env)) {
3019                         /* Reset grants */
3020                         cli = &obd->u.cli;
3021                         /* all pages go to failing rpcs due to the invalid
3022                          * import */
3023                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3024
3025                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3026                         cl_env_put(env, &refcheck);
3027                 } else
3028                         rc = PTR_ERR(env);
3029                 break;
3030         }
3031         case IMP_EVENT_ACTIVE: {
3032                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3033                 break;
3034         }
3035         case IMP_EVENT_OCD: {
3036                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3037
3038                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3039                         osc_init_grant(&obd->u.cli, ocd);
3040
3041                 /* See bug 7198 */
3042                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3043                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3044
3045                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3046                 break;
3047         }
3048         case IMP_EVENT_DEACTIVATE: {
3049                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3050                 break;
3051         }
3052         case IMP_EVENT_ACTIVATE: {
3053                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3054                 break;
3055         }
3056         default:
3057                 CERROR("Unknown import event %d\n", event);
3058                 LBUG();
3059         }
3060         RETURN(rc);
3061 }
3062
3063 /**
3064  * Determine whether the lock can be canceled before replaying the lock
3065  * during recovery, see bug16774 for detailed information.
3066  *
3067  * \retval zero the lock can't be canceled
3068  * \retval other ok to cancel
3069  */
3070 static int osc_cancel_weight(struct ldlm_lock *lock)
3071 {
3072         /*
3073          * Cancel all unused and granted extent lock.
3074          */
3075         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3076             lock->l_granted_mode == lock->l_req_mode &&
3077             osc_ldlm_weigh_ast(lock) == 0)
3078                 RETURN(1);
3079
3080         RETURN(0);
3081 }
3082
3083 static int brw_queue_work(const struct lu_env *env, void *data)
3084 {
3085         struct client_obd *cli = data;
3086
3087         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3088
3089         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3090         RETURN(0);
3091 }
3092
3093 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3094 {
3095         struct client_obd *cli = &obd->u.cli;
3096         struct obd_type   *type;
3097         void              *handler;
3098         int                rc;
3099         ENTRY;
3100
3101         rc = ptlrpcd_addref();
3102         if (rc)
3103                 RETURN(rc);
3104
3105         rc = client_obd_setup(obd, lcfg);
3106         if (rc)
3107                 GOTO(out_ptlrpcd, rc);
3108
3109         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3110         if (IS_ERR(handler))
3111                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3112         cli->cl_writeback_work = handler;
3113
3114         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3115         if (IS_ERR(handler))
3116                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3117         cli->cl_lru_work = handler;
3118
3119         rc = osc_quota_setup(obd);
3120         if (rc)
3121                 GOTO(out_ptlrpcd_work, rc);
3122
3123         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3124
3125 #ifdef LPROCFS
3126         obd->obd_vars = lprocfs_osc_obd_vars;
3127 #endif
3128         /* If this is true then both client (osc) and server (osp) are on the
3129          * same node. The osp layer if loaded first will register the osc proc
3130          * directory. In that case this obd_device will be attached its proc
3131          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3132         type = class_search_type(LUSTRE_OSP_NAME);
3133         if (type && type->typ_procsym) {
3134                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3135                                                            type->typ_procsym,
3136                                                            obd->obd_vars, obd);
3137                 if (IS_ERR(obd->obd_proc_entry)) {
3138                         rc = PTR_ERR(obd->obd_proc_entry);
3139                         CERROR("error %d setting up lprocfs for %s\n", rc,
3140                                obd->obd_name);
3141                         obd->obd_proc_entry = NULL;
3142                 }
3143         } else {
3144                 rc = lprocfs_seq_obd_setup(obd);
3145         }
3146
3147         /* If the basic OSC proc tree construction succeeded then
3148          * lets do the rest. */
3149         if (rc == 0) {
3150                 lproc_osc_attach_seqstat(obd);
3151                 sptlrpc_lprocfs_cliobd_attach(obd);
3152                 ptlrpc_lprocfs_register_obd(obd);
3153         }
3154
3155         /* We need to allocate a few requests more, because
3156          * brw_interpret tries to create new requests before freeing
3157          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3158          * reserved, but I'm afraid that might be too much wasted RAM
3159          * in fact, so 2 is just my guess and still should work. */
3160         cli->cl_import->imp_rq_pool =
3161                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3162                                     OST_MAXREQSIZE,
3163                                     ptlrpc_add_rqs_to_pool);
3164
3165         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3166         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3167         RETURN(0);
3168
3169 out_ptlrpcd_work:
3170         if (cli->cl_writeback_work != NULL) {
3171                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3172                 cli->cl_writeback_work = NULL;
3173         }
3174         if (cli->cl_lru_work != NULL) {
3175                 ptlrpcd_destroy_work(cli->cl_lru_work);
3176                 cli->cl_lru_work = NULL;
3177         }
3178 out_client_setup:
3179         client_obd_cleanup(obd);
3180 out_ptlrpcd:
3181         ptlrpcd_decref();
3182         RETURN(rc);
3183 }
3184
3185 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3186 {
3187         int rc = 0;
3188         ENTRY;
3189
3190         switch (stage) {
3191         case OBD_CLEANUP_EARLY: {
3192                 struct obd_import *imp;
3193                 imp = obd->u.cli.cl_import;
3194                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3195                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3196                 ptlrpc_deactivate_import(imp);
3197                 spin_lock(&imp->imp_lock);
3198                 imp->imp_pingable = 0;
3199                 spin_unlock(&imp->imp_lock);
3200                 break;
3201         }
3202         case OBD_CLEANUP_EXPORTS: {
3203                 struct client_obd *cli = &obd->u.cli;
3204                 /* LU-464
3205                  * for echo client, export may be on zombie list, wait for
3206                  * zombie thread to cull it, because cli.cl_import will be
3207                  * cleared in client_disconnect_export():
3208                  *   class_export_destroy() -> obd_cleanup() ->
3209                  *   echo_device_free() -> echo_client_cleanup() ->
3210                  *   obd_disconnect() -> osc_disconnect() ->
3211                  *   client_disconnect_export()
3212                  */
3213                 obd_zombie_barrier();
3214                 if (cli->cl_writeback_work) {
3215                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3216                         cli->cl_writeback_work = NULL;
3217                 }
3218                 if (cli->cl_lru_work) {
3219                         ptlrpcd_destroy_work(cli->cl_lru_work);
3220                         cli->cl_lru_work = NULL;
3221                 }
3222                 obd_cleanup_client_import(obd);
3223                 ptlrpc_lprocfs_unregister_obd(obd);
3224                 lprocfs_obd_cleanup(obd);
3225                 rc = obd_llog_finish(obd, 0);
3226                 if (rc != 0)
3227                         CERROR("failed to cleanup llogging subsystems\n");
3228                 break;
3229                 }
3230         }
3231         RETURN(rc);
3232 }
3233
3234 int osc_cleanup(struct obd_device *obd)
3235 {
3236         struct client_obd *cli = &obd->u.cli;
3237         int rc;
3238
3239         ENTRY;
3240
3241         /* lru cleanup */
3242         if (cli->cl_cache != NULL) {
3243                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3244                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3245                 cfs_list_del_init(&cli->cl_lru_osc);
3246                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3247                 cli->cl_lru_left = NULL;
3248                 atomic_dec(&cli->cl_cache->ccc_users);
3249                 cli->cl_cache = NULL;
3250         }
3251
3252         /* free memory of osc quota cache */
3253         osc_quota_cleanup(obd);
3254
3255         rc = client_obd_cleanup(obd);
3256
3257         ptlrpcd_decref();
3258         RETURN(rc);
3259 }
3260
3261 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3262 {
3263         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3264                                               lcfg, obd);
3265         return rc > 0 ? 0: rc;
3266 }
3267
3268 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3269 {
3270         return osc_process_config_base(obd, buf);
3271 }
3272
3273 struct obd_ops osc_obd_ops = {
3274         .o_owner                = THIS_MODULE,
3275         .o_setup                = osc_setup,
3276         .o_precleanup           = osc_precleanup,
3277         .o_cleanup              = osc_cleanup,
3278         .o_add_conn             = client_import_add_conn,
3279         .o_del_conn             = client_import_del_conn,
3280         .o_connect              = client_connect_import,
3281         .o_reconnect            = osc_reconnect,
3282         .o_disconnect           = osc_disconnect,
3283         .o_statfs               = osc_statfs,
3284         .o_statfs_async         = osc_statfs_async,
3285         .o_unpackmd             = osc_unpackmd,
3286         .o_create               = osc_create,
3287         .o_destroy              = osc_destroy,
3288         .o_getattr              = osc_getattr,
3289         .o_getattr_async        = osc_getattr_async,
3290         .o_setattr              = osc_setattr,
3291         .o_setattr_async        = osc_setattr_async,
3292         .o_change_cbdata        = osc_change_cbdata,
3293         .o_find_cbdata          = osc_find_cbdata,
3294         .o_iocontrol            = osc_iocontrol,
3295         .o_get_info             = osc_get_info,
3296         .o_set_info_async       = osc_set_info_async,
3297         .o_import_event         = osc_import_event,
3298         .o_process_config       = osc_process_config,
3299         .o_quotactl             = osc_quotactl,
3300         .o_quotacheck           = osc_quotacheck,
3301 };
3302
3303 extern struct lu_kmem_descr osc_caches[];
3304 extern spinlock_t osc_ast_guard;
3305 extern struct lock_class_key osc_ast_guard_class;
3306
3307 int __init osc_init(void)
3308 {
3309         bool enable_proc = true;
3310         struct obd_type *type;
3311         int rc;
3312         ENTRY;
3313
3314         /* print an address of _any_ initialized kernel symbol from this
3315          * module, to allow debugging with gdb that doesn't support data
3316          * symbols from modules.*/
3317         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3318
3319         rc = lu_kmem_init(osc_caches);
3320         if (rc)
3321                 RETURN(rc);
3322
3323         type = class_search_type(LUSTRE_OSP_NAME);
3324         if (type != NULL && type->typ_procsym != NULL)
3325                 enable_proc = false;
3326
3327         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3328 #ifndef HAVE_ONLY_PROCFS_SEQ
3329                                  NULL,
3330 #endif
3331                                  LUSTRE_OSC_NAME, &osc_device_type);
3332         if (rc) {
3333                 lu_kmem_fini(osc_caches);
3334                 RETURN(rc);
3335         }
3336
3337         spin_lock_init(&osc_ast_guard);
3338         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3339
3340         RETURN(rc);
3341 }
3342
3343 #ifdef __KERNEL__
3344 static void /*__exit*/ osc_exit(void)
3345 {
3346         class_unregister_type(LUSTRE_OSC_NAME);
3347         lu_kmem_fini(osc_caches);
3348 }
3349
3350 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3351 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3352 MODULE_LICENSE("GPL");
3353
3354 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3355 #endif