Whamcloud - gitweb
LU-4793 clio: Reduce memory overhead of per-page allocation
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_ioctl.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_fid.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 struct osc_brw_async_args {
65         struct obdo              *aa_oa;
66         int                       aa_requested_nob;
67         int                       aa_nio_count;
68         obd_count                 aa_page_count;
69         int                       aa_resends;
70         struct brw_page **aa_ppga;
71         struct client_obd        *aa_cli;
72         struct list_head          aa_oaps;
73         struct list_head          aa_exts;
74         struct obd_capa  *aa_ocapa;
75         struct cl_req            *aa_clerq;
76 };
77
78 #define osc_grant_args osc_brw_async_args
79
80 struct osc_async_args {
81         struct obd_info *aa_oi;
82 };
83
84 struct osc_setattr_args {
85         struct obdo             *sa_oa;
86         obd_enqueue_update_f     sa_upcall;
87         void                    *sa_cookie;
88 };
89
90 struct osc_fsync_args {
91         struct obd_info *fa_oi;
92         obd_enqueue_update_f     fa_upcall;
93         void                    *fa_cookie;
94 };
95
96 struct osc_enqueue_args {
97         struct obd_export               *oa_exp;
98         __u64                           *oa_flags;
99         obd_enqueue_update_f             oa_upcall;
100         void                            *oa_cookie;
101         struct ost_lvb                  *oa_lvb;
102         struct lustre_handle            *oa_lockh;
103         struct ldlm_enqueue_info        *oa_ei;
104         unsigned int                     oa_agl:1;
105 };
106
107 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
108 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
109                          void *data, int rc);
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         struct obd_import *imp = class_exp2cliimp(exp);
117         ENTRY;
118
119         if (lmm != NULL) {
120                 if (lmm_bytes < sizeof(*lmm)) {
121                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
122                                exp->exp_obd->obd_name, lmm_bytes,
123                                (int)sizeof(*lmm));
124                         RETURN(-EINVAL);
125                 }
126                 /* XXX LOV_MAGIC etc check? */
127
128                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
129                         CERROR("%s: zero lmm_object_id: rc = %d\n",
130                                exp->exp_obd->obd_name, -EINVAL);
131                         RETURN(-EINVAL);
132                 }
133         }
134
135         lsm_size = lov_stripe_md_size(1);
136         if (lsmp == NULL)
137                 RETURN(lsm_size);
138
139         if (*lsmp != NULL && lmm == NULL) {
140                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 OBD_FREE(*lsmp, lsm_size);
142                 *lsmp = NULL;
143                 RETURN(0);
144         }
145
146         if (*lsmp == NULL) {
147                 OBD_ALLOC(*lsmp, lsm_size);
148                 if (unlikely(*lsmp == NULL))
149                         RETURN(-ENOMEM);
150                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
151                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
152                         OBD_FREE(*lsmp, lsm_size);
153                         RETURN(-ENOMEM);
154                 }
155                 loi_init((*lsmp)->lsm_oinfo[0]);
156         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
157                 RETURN(-EBADF);
158         }
159
160         if (lmm != NULL)
161                 /* XXX zero *lsmp? */
162                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
163
164         if (imp != NULL &&
165             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
166                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
167         else
168                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
169
170         RETURN(lsm_size);
171 }
172
173 static inline void osc_pack_capa(struct ptlrpc_request *req,
174                                  struct ost_body *body, void *capa)
175 {
176         struct obd_capa *oc = (struct obd_capa *)capa;
177         struct lustre_capa *c;
178
179         if (!capa)
180                 return;
181
182         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
183         LASSERT(c);
184         capa_cpy(c, oc);
185         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
186         DEBUG_CAPA(D_SEC, c, "pack");
187 }
188
189 static inline void osc_pack_req_body(struct ptlrpc_request *req,
190                                      struct obd_info *oinfo)
191 {
192         struct ost_body *body;
193
194         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
195         LASSERT(body);
196
197         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
198                              oinfo->oi_oa);
199         osc_pack_capa(req, body, oinfo->oi_capa);
200 }
201
202 static inline void osc_set_capa_size(struct ptlrpc_request *req,
203                                      const struct req_msg_field *field,
204                                      struct obd_capa *oc)
205 {
206         if (oc == NULL)
207                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
208         else
209                 /* it is already calculated as sizeof struct obd_capa */
210                 ;
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218         ENTRY;
219
220         if (rc != 0)
221                 GOTO(out, rc);
222
223         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224         if (body) {
225                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
226                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
227                                      aa->aa_oi->oi_oa, &body->oa);
228
229                 /* This should really be sent by the OST */
230                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
231                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232         } else {
233                 CDEBUG(D_INFO, "can't unpack ost_body\n");
234                 rc = -EPROTO;
235                 aa->aa_oi->oi_oa->o_valid = 0;
236         }
237 out:
238         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
239         RETURN(rc);
240 }
241
242 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
243                              struct ptlrpc_request_set *set)
244 {
245         struct ptlrpc_request *req;
246         struct osc_async_args *aa;
247         int                    rc;
248         ENTRY;
249
250         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
251         if (req == NULL)
252                 RETURN(-ENOMEM);
253
254         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
255         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
256         if (rc) {
257                 ptlrpc_request_free(req);
258                 RETURN(rc);
259         }
260
261         osc_pack_req_body(req, oinfo);
262
263         ptlrpc_request_set_replen(req);
264         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
265
266         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
267         aa = ptlrpc_req_async_args(req);
268         aa->aa_oi = oinfo;
269
270         ptlrpc_set_add_req(set, req);
271         RETURN(0);
272 }
273
274 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
275                        struct obd_info *oinfo)
276 {
277         struct ptlrpc_request *req;
278         struct ost_body       *body;
279         int                    rc;
280         ENTRY;
281
282         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
283         if (req == NULL)
284                 RETURN(-ENOMEM);
285
286         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
287         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
288         if (rc) {
289                 ptlrpc_request_free(req);
290                 RETURN(rc);
291         }
292
293         osc_pack_req_body(req, oinfo);
294
295         ptlrpc_request_set_replen(req);
296
297         rc = ptlrpc_queue_wait(req);
298         if (rc)
299                 GOTO(out, rc);
300
301         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
302         if (body == NULL)
303                 GOTO(out, rc = -EPROTO);
304
305         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
306         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
307                              &body->oa);
308
309         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
310         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
311
312         EXIT;
313  out:
314         ptlrpc_req_finished(req);
315         return rc;
316 }
317
318 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
319                        struct obd_info *oinfo, struct obd_trans_info *oti)
320 {
321         struct ptlrpc_request *req;
322         struct ost_body       *body;
323         int                    rc;
324         ENTRY;
325
326         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
327
328         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
329         if (req == NULL)
330                 RETURN(-ENOMEM);
331
332         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
333         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
334         if (rc) {
335                 ptlrpc_request_free(req);
336                 RETURN(rc);
337         }
338
339         osc_pack_req_body(req, oinfo);
340
341         ptlrpc_request_set_replen(req);
342
343         rc = ptlrpc_queue_wait(req);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
348         if (body == NULL)
349                 GOTO(out, rc = -EPROTO);
350
351         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
352                              &body->oa);
353
354         EXIT;
355 out:
356         ptlrpc_req_finished(req);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_interpret(const struct lu_env *env,
361                                  struct ptlrpc_request *req,
362                                  struct osc_setattr_args *sa, int rc)
363 {
364         struct ost_body *body;
365         ENTRY;
366
367         if (rc != 0)
368                 GOTO(out, rc);
369
370         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
371         if (body == NULL)
372                 GOTO(out, rc = -EPROTO);
373
374         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
375                              &body->oa);
376 out:
377         rc = sa->sa_upcall(sa->sa_cookie, rc);
378         RETURN(rc);
379 }
380
381 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
382                            struct obd_trans_info *oti,
383                            obd_enqueue_update_f upcall, void *cookie,
384                            struct ptlrpc_request_set *rqset)
385 {
386         struct ptlrpc_request   *req;
387         struct osc_setattr_args *sa;
388         int                      rc;
389         ENTRY;
390
391         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
392         if (req == NULL)
393                 RETURN(-ENOMEM);
394
395         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
396         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
397         if (rc) {
398                 ptlrpc_request_free(req);
399                 RETURN(rc);
400         }
401
402         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
403                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
404
405         osc_pack_req_body(req, oinfo);
406
407         ptlrpc_request_set_replen(req);
408
409         /* do mds to ost setattr asynchronously */
410         if (!rqset) {
411                 /* Do not wait for response. */
412                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413         } else {
414                 req->rq_interpret_reply =
415                         (ptlrpc_interpterer_t)osc_setattr_interpret;
416
417                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
418                 sa = ptlrpc_req_async_args(req);
419                 sa->sa_oa = oinfo->oi_oa;
420                 sa->sa_upcall = upcall;
421                 sa->sa_cookie = cookie;
422
423                 if (rqset == PTLRPCD_SET)
424                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
425                 else
426                         ptlrpc_set_add_req(rqset, req);
427         }
428
429         RETURN(0);
430 }
431
432 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
433                              struct obd_trans_info *oti,
434                              struct ptlrpc_request_set *rqset)
435 {
436         return osc_setattr_async_base(exp, oinfo, oti,
437                                       oinfo->oi_cb_up, oinfo, rqset);
438 }
439
440 int osc_real_create(struct obd_export *exp, struct obdo *oa,
441                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
442 {
443         struct ptlrpc_request *req;
444         struct ost_body       *body;
445         struct lov_stripe_md  *lsm;
446         int                    rc;
447         ENTRY;
448
449         LASSERT(oa);
450         LASSERT(ea);
451
452         lsm = *ea;
453         if (!lsm) {
454                 rc = obd_alloc_memmd(exp, &lsm);
455                 if (rc < 0)
456                         RETURN(rc);
457         }
458
459         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
460         if (req == NULL)
461                 GOTO(out, rc = -ENOMEM);
462
463         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
464         if (rc) {
465                 ptlrpc_request_free(req);
466                 GOTO(out, rc);
467         }
468
469         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
470         LASSERT(body);
471
472         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
473
474         ptlrpc_request_set_replen(req);
475
476         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
477             oa->o_flags == OBD_FL_DELORPHAN) {
478                 DEBUG_REQ(D_HA, req,
479                           "delorphan from OST integration");
480                 /* Don't resend the delorphan req */
481                 req->rq_no_resend = req->rq_no_delay = 1;
482         }
483
484         rc = ptlrpc_queue_wait(req);
485         if (rc)
486                 GOTO(out_req, rc);
487
488         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
489         if (body == NULL)
490                 GOTO(out_req, rc = -EPROTO);
491
492         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
493         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
494
495         oa->o_blksize = cli_brw_size(exp->exp_obd);
496         oa->o_valid |= OBD_MD_FLBLKSZ;
497
498         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
499          * have valid lsm_oinfo data structs, so don't go touching that.
500          * This needs to be fixed in a big way.
501          */
502         lsm->lsm_oi = oa->o_oi;
503         *ea = lsm;
504
505         if (oti != NULL) {
506                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507                         if (oti->oti_logcookies == NULL)
508                                 oti->oti_logcookies = &oti->oti_onecookie;
509
510                         *oti->oti_logcookies = oa->o_lcookie;
511                 }
512         }
513
514         CDEBUG(D_HA, "transno: "LPD64"\n",
515                lustre_msg_get_transno(req->rq_repmsg));
516 out_req:
517         ptlrpc_req_finished(req);
518 out:
519         if (rc && !*ea)
520                 obd_free_memmd(exp, &lsm);
521         RETURN(rc);
522 }
523
524 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request   *req;
529         struct osc_setattr_args *sa;
530         struct ost_body         *body;
531         int                      rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
550                              oinfo->oi_oa);
551         osc_pack_capa(req, body, oinfo->oi_capa);
552
553         ptlrpc_request_set_replen(req);
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
556         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
557         sa = ptlrpc_req_async_args(req);
558         sa->sa_oa     = oinfo->oi_oa;
559         sa->sa_upcall = upcall;
560         sa->sa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_sync_interpret(const struct lu_env *env,
570                               struct ptlrpc_request *req,
571                               void *arg, int rc)
572 {
573         struct osc_fsync_args *fa = arg;
574         struct ost_body *body;
575         ENTRY;
576
577         if (rc)
578                 GOTO(out, rc);
579
580         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581         if (body == NULL) {
582                 CERROR ("can't unpack ost_body\n");
583                 GOTO(out, rc = -EPROTO);
584         }
585
586         *fa->fa_oi->oi_oa = body->oa;
587 out:
588         rc = fa->fa_upcall(fa->fa_cookie, rc);
589         RETURN(rc);
590 }
591
592 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
593                   obd_enqueue_update_f upcall, void *cookie,
594                   struct ptlrpc_request_set *rqset)
595 {
596         struct ptlrpc_request *req;
597         struct ost_body       *body;
598         struct osc_fsync_args *fa;
599         int                    rc;
600         ENTRY;
601
602         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
603         if (req == NULL)
604                 RETURN(-ENOMEM);
605
606         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
607         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
608         if (rc) {
609                 ptlrpc_request_free(req);
610                 RETURN(rc);
611         }
612
613         /* overload the size and blocks fields in the oa with start/end */
614         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615         LASSERT(body);
616         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
617                              oinfo->oi_oa);
618         osc_pack_capa(req, body, oinfo->oi_capa);
619
620         ptlrpc_request_set_replen(req);
621         req->rq_interpret_reply = osc_sync_interpret;
622
623         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
624         fa = ptlrpc_req_async_args(req);
625         fa->fa_oi = oinfo;
626         fa->fa_upcall = upcall;
627         fa->fa_cookie = cookie;
628
629         if (rqset == PTLRPCD_SET)
630                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
631         else
632                 ptlrpc_set_add_req(rqset, req);
633
634         RETURN (0);
635 }
636
637 /* Find and cancel locally locks matched by @mode in the resource found by
638  * @objid. Found locks are added into @cancel list. Returns the amount of
639  * locks added to @cancels list. */
640 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641                                    struct list_head *cancels,
642                                    ldlm_mode_t mode, __u64 lock_flags)
643 {
644         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
645         struct ldlm_res_id res_id;
646         struct ldlm_resource *res;
647         int count;
648         ENTRY;
649
650         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
651          * export) but disabled through procfs (flag in NS).
652          *
653          * This distinguishes from a case when ELC is not supported originally,
654          * when we still want to cancel locks in advance and just cancel them
655          * locally, without sending any RPC. */
656         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
657                 RETURN(0);
658
659         ostid_build_res_name(&oa->o_oi, &res_id);
660         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661         if (res == NULL)
662                 RETURN(0);
663
664         LDLM_RESOURCE_ADDREF(res);
665         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
666                                            lock_flags, 0, NULL);
667         LDLM_RESOURCE_DELREF(res);
668         ldlm_resource_putref(res);
669         RETURN(count);
670 }
671
672 static int osc_destroy_interpret(const struct lu_env *env,
673                                  struct ptlrpc_request *req, void *data,
674                                  int rc)
675 {
676         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
677
678         atomic_dec(&cli->cl_destroy_in_flight);
679         wake_up(&cli->cl_destroy_waitq);
680         return 0;
681 }
682
683 static int osc_can_send_destroy(struct client_obd *cli)
684 {
685         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
686             cli->cl_max_rpcs_in_flight) {
687                 /* The destroy request can be sent */
688                 return 1;
689         }
690         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
691             cli->cl_max_rpcs_in_flight) {
692                 /*
693                  * The counter has been modified between the two atomic
694                  * operations.
695                  */
696                 wake_up(&cli->cl_destroy_waitq);
697         }
698         return 0;
699 }
700
701 int osc_create(const struct lu_env *env, struct obd_export *exp,
702                struct obdo *oa, struct lov_stripe_md **ea,
703                struct obd_trans_info *oti)
704 {
705         int rc = 0;
706         ENTRY;
707
708         LASSERT(oa);
709         LASSERT(ea);
710         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
711
712         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
713             oa->o_flags == OBD_FL_RECREATE_OBJS) {
714                 RETURN(osc_real_create(exp, oa, ea, oti));
715         }
716
717         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
718                 RETURN(osc_real_create(exp, oa, ea, oti));
719
720         /* we should not get here anymore */
721         LBUG();
722
723         RETURN(rc);
724 }
725
726 /* Destroy requests can be async always on the client, and we don't even really
727  * care about the return code since the client cannot do anything at all about
728  * a destroy failure.
729  * When the MDS is unlinking a filename, it saves the file objects into a
730  * recovery llog, and these object records are cancelled when the OST reports
731  * they were destroyed and sync'd to disk (i.e. transaction committed).
732  * If the client dies, or the OST is down when the object should be destroyed,
733  * the records are not cancelled, and when the OST reconnects to the MDS next,
734  * it will retrieve the llog unlink logs and then sends the log cancellation
735  * cookies to the MDS after committing destroy transactions. */
736 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
737                        struct obdo *oa, struct lov_stripe_md *ea,
738                        struct obd_trans_info *oti, struct obd_export *md_export,
739                        void *capa)
740 {
741         struct client_obd     *cli = &exp->exp_obd->u.cli;
742         struct ptlrpc_request *req;
743         struct ost_body       *body;
744         struct list_head       cancels = LIST_HEAD_INIT(cancels);
745         int rc, count;
746         ENTRY;
747
748         if (!oa) {
749                 CDEBUG(D_INFO, "oa NULL\n");
750                 RETURN(-EINVAL);
751         }
752
753         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
754                                         LDLM_FL_DISCARD_DATA);
755
756         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
757         if (req == NULL) {
758                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
759                 RETURN(-ENOMEM);
760         }
761
762         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
763         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
764                                0, &cancels, count);
765         if (rc) {
766                 ptlrpc_request_free(req);
767                 RETURN(rc);
768         }
769
770         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
771         ptlrpc_at_set_req_timeout(req);
772
773         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
774                 oa->o_lcookie = *oti->oti_logcookies;
775         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
776         LASSERT(body);
777         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
778
779         osc_pack_capa(req, body, (struct obd_capa *)capa);
780         ptlrpc_request_set_replen(req);
781
782         /* If osc_destory is for destroying the unlink orphan,
783          * sent from MDT to OST, which should not be blocked here,
784          * because the process might be triggered by ptlrpcd, and
785          * it is not good to block ptlrpcd thread (b=16006)*/
786         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
787                 req->rq_interpret_reply = osc_destroy_interpret;
788                 if (!osc_can_send_destroy(cli)) {
789                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
790                                                           NULL);
791
792                         /*
793                          * Wait until the number of on-going destroy RPCs drops
794                          * under max_rpc_in_flight
795                          */
796                         l_wait_event_exclusive(cli->cl_destroy_waitq,
797                                                osc_can_send_destroy(cli), &lwi);
798                 }
799         }
800
801         /* Do not wait for response */
802         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
803         RETURN(0);
804 }
805
806 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
807                                 long writing_bytes)
808 {
809         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
810
811         LASSERT(!(oa->o_valid & bits));
812
813         oa->o_valid |= bits;
814         client_obd_list_lock(&cli->cl_loi_list_lock);
815         oa->o_dirty = cli->cl_dirty;
816         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
817                      cli->cl_dirty_max)) {
818                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
819                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
820                 oa->o_undirty = 0;
821         } else if (unlikely(atomic_read(&obd_unstable_pages) +
822                             atomic_read(&obd_dirty_pages) -
823                             atomic_read(&obd_dirty_transit_pages) >
824                             (long)(obd_max_dirty_pages + 1))) {
825                 /* The atomic_read() allowing the atomic_inc() are
826                  * not covered by a lock thus they may safely race and trip
827                  * this CERROR() unless we add in a small fudge factor (+1). */
828                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
829                        cli->cl_import->imp_obd->obd_name,
830                        atomic_read(&obd_unstable_pages),
831                        atomic_read(&obd_dirty_pages),
832                        atomic_read(&obd_dirty_transit_pages),
833                        obd_max_dirty_pages);
834                 oa->o_undirty = 0;
835         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
836                 CERROR("dirty %lu - dirty_max %lu too big???\n",
837                        cli->cl_dirty, cli->cl_dirty_max);
838                 oa->o_undirty = 0;
839         } else {
840                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
841                                       PAGE_CACHE_SHIFT) *
842                                      (cli->cl_max_rpcs_in_flight + 1);
843                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
844         }
845         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
846         oa->o_dropped = cli->cl_lost_grant;
847         cli->cl_lost_grant = 0;
848         client_obd_list_unlock(&cli->cl_loi_list_lock);
849         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
850                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
851
852 }
853
854 void osc_update_next_shrink(struct client_obd *cli)
855 {
856         cli->cl_next_shrink_grant =
857                 cfs_time_shift(cli->cl_grant_shrink_interval);
858         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
859                cli->cl_next_shrink_grant);
860 }
861
862 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
863 {
864         client_obd_list_lock(&cli->cl_loi_list_lock);
865         cli->cl_avail_grant += grant;
866         client_obd_list_unlock(&cli->cl_loi_list_lock);
867 }
868
869 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
870 {
871         if (body->oa.o_valid & OBD_MD_FLGRANT) {
872                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
873                 __osc_update_grant(cli, body->oa.o_grant);
874         }
875 }
876
877 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
878                               obd_count keylen, void *key, obd_count vallen,
879                               void *val, struct ptlrpc_request_set *set);
880
881 static int osc_shrink_grant_interpret(const struct lu_env *env,
882                                       struct ptlrpc_request *req,
883                                       void *aa, int rc)
884 {
885         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
886         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
887         struct ost_body *body;
888
889         if (rc != 0) {
890                 __osc_update_grant(cli, oa->o_grant);
891                 GOTO(out, rc);
892         }
893
894         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
895         LASSERT(body);
896         osc_update_grant(cli, body);
897 out:
898         OBDO_FREE(oa);
899         return rc;
900 }
901
902 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
903 {
904         client_obd_list_lock(&cli->cl_loi_list_lock);
905         oa->o_grant = cli->cl_avail_grant / 4;
906         cli->cl_avail_grant -= oa->o_grant;
907         client_obd_list_unlock(&cli->cl_loi_list_lock);
908         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
909                 oa->o_valid |= OBD_MD_FLFLAGS;
910                 oa->o_flags = 0;
911         }
912         oa->o_flags |= OBD_FL_SHRINK_GRANT;
913         osc_update_next_shrink(cli);
914 }
915
916 /* Shrink the current grant, either from some large amount to enough for a
917  * full set of in-flight RPCs, or if we have already shrunk to that limit
918  * then to enough for a single RPC.  This avoids keeping more grant than
919  * needed, and avoids shrinking the grant piecemeal. */
920 static int osc_shrink_grant(struct client_obd *cli)
921 {
922         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
923                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
924
925         client_obd_list_lock(&cli->cl_loi_list_lock);
926         if (cli->cl_avail_grant <= target_bytes)
927                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
928         client_obd_list_unlock(&cli->cl_loi_list_lock);
929
930         return osc_shrink_grant_to_target(cli, target_bytes);
931 }
932
933 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
934 {
935         int                     rc = 0;
936         struct ost_body        *body;
937         ENTRY;
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         /* Don't shrink if we are already above or below the desired limit
941          * We don't want to shrink below a single RPC, as that will negatively
942          * impact block allocation and long-term performance. */
943         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
944                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
945
946         if (target_bytes >= cli->cl_avail_grant) {
947                 client_obd_list_unlock(&cli->cl_loi_list_lock);
948                 RETURN(0);
949         }
950         client_obd_list_unlock(&cli->cl_loi_list_lock);
951
952         OBD_ALLOC_PTR(body);
953         if (!body)
954                 RETURN(-ENOMEM);
955
956         osc_announce_cached(cli, &body->oa, 0);
957
958         client_obd_list_lock(&cli->cl_loi_list_lock);
959         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
960         cli->cl_avail_grant = target_bytes;
961         client_obd_list_unlock(&cli->cl_loi_list_lock);
962         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
963                 body->oa.o_valid |= OBD_MD_FLFLAGS;
964                 body->oa.o_flags = 0;
965         }
966         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
967         osc_update_next_shrink(cli);
968
969         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
970                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
971                                 sizeof(*body), body, NULL);
972         if (rc != 0)
973                 __osc_update_grant(cli, body->oa.o_grant);
974         OBD_FREE_PTR(body);
975         RETURN(rc);
976 }
977
978 static int osc_should_shrink_grant(struct client_obd *client)
979 {
980         cfs_time_t time = cfs_time_current();
981         cfs_time_t next_shrink = client->cl_next_shrink_grant;
982
983         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
984              OBD_CONNECT_GRANT_SHRINK) == 0)
985                 return 0;
986
987         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
988                 /* Get the current RPC size directly, instead of going via:
989                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
990                  * Keep comment here so that it can be found by searching. */
991                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
992
993                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994                     client->cl_avail_grant > brw_size)
995                         return 1;
996                 else
997                         osc_update_next_shrink(client);
998         }
999         return 0;
1000 }
1001
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1003 {
1004         struct client_obd *client;
1005
1006         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007                 if (osc_should_shrink_grant(client))
1008                         osc_shrink_grant(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_add_shrink_grant(struct client_obd *client)
1014 {
1015         int rc;
1016
1017         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1018                                        TIMEOUT_GRANT,
1019                                        osc_grant_shrink_grant_cb, NULL,
1020                                        &client->cl_grant_shrink_list);
1021         if (rc) {
1022                 CERROR("add grant client %s error %d\n",
1023                         client->cl_import->imp_obd->obd_name, rc);
1024                 return rc;
1025         }
1026         CDEBUG(D_CACHE, "add grant client %s \n",
1027                client->cl_import->imp_obd->obd_name);
1028         osc_update_next_shrink(client);
1029         return 0;
1030 }
1031
1032 static int osc_del_shrink_grant(struct client_obd *client)
1033 {
1034         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1035                                          TIMEOUT_GRANT);
1036 }
1037
1038 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1039 {
1040         /*
1041          * ocd_grant is the total grant amount we're expect to hold: if we've
1042          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1043          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1044          *
1045          * race is tolerable here: if we're evicted, but imp_state already
1046          * left EVICTED state, then cl_dirty must be 0 already.
1047          */
1048         client_obd_list_lock(&cli->cl_loi_list_lock);
1049         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1050                 cli->cl_avail_grant = ocd->ocd_grant;
1051         else
1052                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1053
1054         if (cli->cl_avail_grant < 0) {
1055                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1056                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1057                       ocd->ocd_grant, cli->cl_dirty);
1058                 /* workaround for servers which do not have the patch from
1059                  * LU-2679 */
1060                 cli->cl_avail_grant = ocd->ocd_grant;
1061         }
1062
1063         /* determine the appropriate chunk size used by osc_extent. */
1064         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1065         client_obd_list_unlock(&cli->cl_loi_list_lock);
1066
1067         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1068                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1069                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1070
1071         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1072             list_empty(&cli->cl_grant_shrink_list))
1073                 osc_add_shrink_grant(cli);
1074 }
1075
1076 /* We assume that the reason this OSC got a short read is because it read
1077  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1078  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1079  * this stripe never got written at or beyond this stripe offset yet. */
1080 static void handle_short_read(int nob_read, obd_count page_count,
1081                               struct brw_page **pga)
1082 {
1083         char *ptr;
1084         int i = 0;
1085
1086         /* skip bytes read OK */
1087         while (nob_read > 0) {
1088                 LASSERT (page_count > 0);
1089
1090                 if (pga[i]->count > nob_read) {
1091                         /* EOF inside this page */
1092                         ptr = kmap(pga[i]->pg) +
1093                                 (pga[i]->off & ~CFS_PAGE_MASK);
1094                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1095                         kunmap(pga[i]->pg);
1096                         page_count--;
1097                         i++;
1098                         break;
1099                 }
1100
1101                 nob_read -= pga[i]->count;
1102                 page_count--;
1103                 i++;
1104         }
1105
1106         /* zero remaining pages */
1107         while (page_count-- > 0) {
1108                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1109                 memset(ptr, 0, pga[i]->count);
1110                 kunmap(pga[i]->pg);
1111                 i++;
1112         }
1113 }
1114
1115 static int check_write_rcs(struct ptlrpc_request *req,
1116                            int requested_nob, int niocount,
1117                            obd_count page_count, struct brw_page **pga)
1118 {
1119         int     i;
1120         __u32   *remote_rcs;
1121
1122         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1123                                                   sizeof(*remote_rcs) *
1124                                                   niocount);
1125         if (remote_rcs == NULL) {
1126                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1127                 return(-EPROTO);
1128         }
1129
1130         /* return error if any niobuf was in error */
1131         for (i = 0; i < niocount; i++) {
1132                 if ((int)remote_rcs[i] < 0)
1133                         return(remote_rcs[i]);
1134
1135                 if (remote_rcs[i] != 0) {
1136                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1137                                 i, remote_rcs[i], req);
1138                         return(-EPROTO);
1139                 }
1140         }
1141
1142         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1143                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1144                        req->rq_bulk->bd_nob_transferred, requested_nob);
1145                 return(-EPROTO);
1146         }
1147
1148         return (0);
1149 }
1150
1151 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1152 {
1153         if (p1->flag != p2->flag) {
1154                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1155                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1156                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1157
1158                 /* warn if we try to combine flags that we don't know to be
1159                  * safe to combine */
1160                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1161                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1162                               "report this at http://bugs.whamcloud.com/\n",
1163                               p1->flag, p2->flag);
1164                 }
1165                 return 0;
1166         }
1167
1168         return (p1->off + p1->count == p2->off);
1169 }
1170
1171 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1172                                    struct brw_page **pga, int opc,
1173                                    cksum_type_t cksum_type)
1174 {
1175         __u32                           cksum;
1176         int                             i = 0;
1177         struct cfs_crypto_hash_desc     *hdesc;
1178         unsigned int                    bufsize;
1179         int                             err;
1180         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1181
1182         LASSERT(pg_count > 0);
1183
1184         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1185         if (IS_ERR(hdesc)) {
1186                 CERROR("Unable to initialize checksum hash %s\n",
1187                        cfs_crypto_hash_name(cfs_alg));
1188                 return PTR_ERR(hdesc);
1189         }
1190
1191         while (nob > 0 && pg_count > 0) {
1192                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1193
1194                 /* corrupt the data before we compute the checksum, to
1195                  * simulate an OST->client data error */
1196                 if (i == 0 && opc == OST_READ &&
1197                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1198                         unsigned char *ptr = kmap(pga[i]->pg);
1199                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1200                         memcpy(ptr + off, "bad1", min(4, nob));
1201                         kunmap(pga[i]->pg);
1202                 }
1203                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1204                                   pga[i]->off & ~CFS_PAGE_MASK,
1205                                   count);
1206                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1207                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1208
1209                 nob -= pga[i]->count;
1210                 pg_count--;
1211                 i++;
1212         }
1213
1214         bufsize = 4;
1215         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1216
1217         if (err)
1218                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1219
1220         /* For sending we only compute the wrong checksum instead
1221          * of corrupting the data so it is still correct on a redo */
1222         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1223                 cksum++;
1224
1225         return cksum;
1226 }
1227
1228 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1229                                 struct lov_stripe_md *lsm, obd_count page_count,
1230                                 struct brw_page **pga,
1231                                 struct ptlrpc_request **reqp,
1232                                 struct obd_capa *ocapa, int reserve,
1233                                 int resend)
1234 {
1235         struct ptlrpc_request   *req;
1236         struct ptlrpc_bulk_desc *desc;
1237         struct ost_body         *body;
1238         struct obd_ioobj        *ioobj;
1239         struct niobuf_remote    *niobuf;
1240         int niocount, i, requested_nob, opc, rc;
1241         struct osc_brw_async_args *aa;
1242         struct req_capsule      *pill;
1243         struct brw_page *pg_prev;
1244
1245         ENTRY;
1246         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1247                 RETURN(-ENOMEM); /* Recoverable */
1248         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1249                 RETURN(-EINVAL); /* Fatal */
1250
1251         if ((cmd & OBD_BRW_WRITE) != 0) {
1252                 opc = OST_WRITE;
1253                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1254                                                 cli->cl_import->imp_rq_pool,
1255                                                 &RQF_OST_BRW_WRITE);
1256         } else {
1257                 opc = OST_READ;
1258                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1259         }
1260         if (req == NULL)
1261                 RETURN(-ENOMEM);
1262
1263         for (niocount = i = 1; i < page_count; i++) {
1264                 if (!can_merge_pages(pga[i - 1], pga[i]))
1265                         niocount++;
1266         }
1267
1268         pill = &req->rq_pill;
1269         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1270                              sizeof(*ioobj));
1271         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1272                              niocount * sizeof(*niobuf));
1273         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1274
1275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1276         if (rc) {
1277                 ptlrpc_request_free(req);
1278                 RETURN(rc);
1279         }
1280         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1281         ptlrpc_at_set_req_timeout(req);
1282         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1283          * retry logic */
1284         req->rq_no_retry_einprogress = 1;
1285
1286         desc = ptlrpc_prep_bulk_imp(req, page_count,
1287                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1288                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1289                 OST_BULK_PORTAL);
1290
1291         if (desc == NULL)
1292                 GOTO(out, rc = -ENOMEM);
1293         /* NB request now owns desc and will free it when it gets freed */
1294
1295         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1296         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1297         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1298         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1299
1300         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1301
1302         obdo_to_ioobj(oa, ioobj);
1303         ioobj->ioo_bufcnt = niocount;
1304         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1305          * that might be send for this request.  The actual number is decided
1306          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1307          * "max - 1" for old client compatibility sending "0", and also so the
1308          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1309         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1310         osc_pack_capa(req, body, ocapa);
1311         LASSERT(page_count > 0);
1312         pg_prev = pga[0];
1313         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1314                 struct brw_page *pg = pga[i];
1315                 int poff = pg->off & ~CFS_PAGE_MASK;
1316
1317                 LASSERT(pg->count > 0);
1318                 /* make sure there is no gap in the middle of page array */
1319                 LASSERTF(page_count == 1 ||
1320                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1321                           ergo(i > 0 && i < page_count - 1,
1322                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1323                           ergo(i == page_count - 1, poff == 0)),
1324                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1325                          i, page_count, pg, pg->off, pg->count);
1326 #ifdef __linux__
1327                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1328                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1329                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1330                          i, page_count,
1331                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1332                          pg_prev->pg, page_private(pg_prev->pg),
1333                          pg_prev->pg->index, pg_prev->off);
1334 #else
1335                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1336                          "i %d p_c %u\n", i, page_count);
1337 #endif
1338                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1339                         (pg->flag & OBD_BRW_SRVLOCK));
1340
1341                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1342                 requested_nob += pg->count;
1343
1344                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1345                         niobuf--;
1346                         niobuf->len += pg->count;
1347                 } else {
1348                         niobuf->offset = pg->off;
1349                         niobuf->len    = pg->count;
1350                         niobuf->flags  = pg->flag;
1351                 }
1352                 pg_prev = pg;
1353         }
1354
1355         LASSERTF((void *)(niobuf - niocount) ==
1356                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1357                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1358                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1359
1360         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1361         if (resend) {
1362                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1364                         body->oa.o_flags = 0;
1365                 }
1366                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1367         }
1368
1369         if (osc_should_shrink_grant(cli))
1370                 osc_shrink_grant_local(cli, &body->oa);
1371
1372         /* size[REQ_REC_OFF] still sizeof (*body) */
1373         if (opc == OST_WRITE) {
1374                 if (cli->cl_checksum &&
1375                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1376                         /* store cl_cksum_type in a local variable since
1377                          * it can be changed via lprocfs */
1378                         cksum_type_t cksum_type = cli->cl_cksum_type;
1379
1380                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1381                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1382                                 body->oa.o_flags = 0;
1383                         }
1384                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1385                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1386                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1387                                                              page_count, pga,
1388                                                              OST_WRITE,
1389                                                              cksum_type);
1390                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1391                                body->oa.o_cksum);
1392                         /* save this in 'oa', too, for later checking */
1393                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                         oa->o_flags |= cksum_type_pack(cksum_type);
1395                 } else {
1396                         /* clear out the checksum flag, in case this is a
1397                          * resend but cl_checksum is no longer set. b=11238 */
1398                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1399                 }
1400                 oa->o_cksum = body->oa.o_cksum;
1401                 /* 1 RC per niobuf */
1402                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1403                                      sizeof(__u32) * niocount);
1404         } else {
1405                 if (cli->cl_checksum &&
1406                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1407                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1408                                 body->oa.o_flags = 0;
1409                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1410                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1411                 }
1412         }
1413         ptlrpc_request_set_replen(req);
1414
1415         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1416         aa = ptlrpc_req_async_args(req);
1417         aa->aa_oa = oa;
1418         aa->aa_requested_nob = requested_nob;
1419         aa->aa_nio_count = niocount;
1420         aa->aa_page_count = page_count;
1421         aa->aa_resends = 0;
1422         aa->aa_ppga = pga;
1423         aa->aa_cli = cli;
1424         INIT_LIST_HEAD(&aa->aa_oaps);
1425         if (ocapa && reserve)
1426                 aa->aa_ocapa = capa_get(ocapa);
1427
1428         *reqp = req;
1429         RETURN(0);
1430
1431  out:
1432         ptlrpc_req_finished(req);
1433         RETURN(rc);
1434 }
1435
1436 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1437                                 __u32 client_cksum, __u32 server_cksum, int nob,
1438                                 obd_count page_count, struct brw_page **pga,
1439                                 cksum_type_t client_cksum_type)
1440 {
1441         __u32 new_cksum;
1442         char *msg;
1443         cksum_type_t cksum_type;
1444
1445         if (server_cksum == client_cksum) {
1446                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1447                 return 0;
1448         }
1449
1450         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1451                                        oa->o_flags : 0);
1452         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1453                                       cksum_type);
1454
1455         if (cksum_type != client_cksum_type)
1456                 msg = "the server did not use the checksum type specified in "
1457                       "the original request - likely a protocol problem";
1458         else if (new_cksum == server_cksum)
1459                 msg = "changed on the client after we checksummed it - "
1460                       "likely false positive due to mmap IO (bug 11742)";
1461         else if (new_cksum == client_cksum)
1462                 msg = "changed in transit before arrival at OST";
1463         else
1464                 msg = "changed in transit AND doesn't match the original - "
1465                       "likely false positive due to mmap IO (bug 11742)";
1466
1467         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1468                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1469                            msg, libcfs_nid2str(peer->nid),
1470                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1471                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1472                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1473                            POSTID(&oa->o_oi), pga[0]->off,
1474                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1475         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1476                "client csum now %x\n", client_cksum, client_cksum_type,
1477                server_cksum, cksum_type, new_cksum);
1478         return 1;
1479 }
1480
1481 /* Note rc enters this function as number of bytes transferred */
1482 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1483 {
1484         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1485         const lnet_process_id_t *peer =
1486                         &req->rq_import->imp_connection->c_peer;
1487         struct client_obd *cli = aa->aa_cli;
1488         struct ost_body *body;
1489         __u32 client_cksum = 0;
1490         ENTRY;
1491
1492         if (rc < 0 && rc != -EDQUOT) {
1493                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1494                 RETURN(rc);
1495         }
1496
1497         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1498         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1499         if (body == NULL) {
1500                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1501                 RETURN(-EPROTO);
1502         }
1503
1504         /* set/clear over quota flag for a uid/gid */
1505         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1506             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1507                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1508
1509                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1510                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1511                        body->oa.o_flags);
1512                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1513         }
1514
1515         osc_update_grant(cli, body);
1516
1517         if (rc < 0)
1518                 RETURN(rc);
1519
1520         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1521                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1522
1523         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1524                 if (rc > 0) {
1525                         CERROR("Unexpected +ve rc %d\n", rc);
1526                         RETURN(-EPROTO);
1527                 }
1528                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1529
1530                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1531                         RETURN(-EAGAIN);
1532
1533                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1534                     check_write_checksum(&body->oa, peer, client_cksum,
1535                                          body->oa.o_cksum, aa->aa_requested_nob,
1536                                          aa->aa_page_count, aa->aa_ppga,
1537                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1538                         RETURN(-EAGAIN);
1539
1540                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1541                                      aa->aa_page_count, aa->aa_ppga);
1542                 GOTO(out, rc);
1543         }
1544
1545         /* The rest of this function executes only for OST_READs */
1546
1547         /* if unwrap_bulk failed, return -EAGAIN to retry */
1548         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1549         if (rc < 0)
1550                 GOTO(out, rc = -EAGAIN);
1551
1552         if (rc > aa->aa_requested_nob) {
1553                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1554                        aa->aa_requested_nob);
1555                 RETURN(-EPROTO);
1556         }
1557
1558         if (rc != req->rq_bulk->bd_nob_transferred) {
1559                 CERROR ("Unexpected rc %d (%d transferred)\n",
1560                         rc, req->rq_bulk->bd_nob_transferred);
1561                 return (-EPROTO);
1562         }
1563
1564         if (rc < aa->aa_requested_nob)
1565                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1566
1567         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1568                 static int cksum_counter;
1569                 __u32      server_cksum = body->oa.o_cksum;
1570                 char      *via;
1571                 char      *router;
1572                 cksum_type_t cksum_type;
1573
1574                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1575                                                body->oa.o_flags : 0);
1576                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1577                                                  aa->aa_ppga, OST_READ,
1578                                                  cksum_type);
1579
1580                 if (peer->nid == req->rq_bulk->bd_sender) {
1581                         via = router = "";
1582                 } else {
1583                         via = " via ";
1584                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1585                 }
1586
1587                 if (server_cksum != client_cksum) {
1588                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1589                                            "%s%s%s inode "DFID" object "DOSTID
1590                                            " extent ["LPU64"-"LPU64"]\n",
1591                                            req->rq_import->imp_obd->obd_name,
1592                                            libcfs_nid2str(peer->nid),
1593                                            via, router,
1594                                            body->oa.o_valid & OBD_MD_FLFID ?
1595                                                 body->oa.o_parent_seq : (__u64)0,
1596                                            body->oa.o_valid & OBD_MD_FLFID ?
1597                                                 body->oa.o_parent_oid : 0,
1598                                            body->oa.o_valid & OBD_MD_FLFID ?
1599                                                 body->oa.o_parent_ver : 0,
1600                                            POSTID(&body->oa.o_oi),
1601                                            aa->aa_ppga[0]->off,
1602                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1603                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1604                                                                         1);
1605                         CERROR("client %x, server %x, cksum_type %x\n",
1606                                client_cksum, server_cksum, cksum_type);
1607                         cksum_counter = 0;
1608                         aa->aa_oa->o_cksum = client_cksum;
1609                         rc = -EAGAIN;
1610                 } else {
1611                         cksum_counter++;
1612                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1613                         rc = 0;
1614                 }
1615         } else if (unlikely(client_cksum)) {
1616                 static int cksum_missed;
1617
1618                 cksum_missed++;
1619                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1620                         CERROR("Checksum %u requested from %s but not sent\n",
1621                                cksum_missed, libcfs_nid2str(peer->nid));
1622         } else {
1623                 rc = 0;
1624         }
1625 out:
1626         if (rc >= 0)
1627                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1628                                      aa->aa_oa, &body->oa);
1629
1630         RETURN(rc);
1631 }
1632
1633 static int osc_brw_redo_request(struct ptlrpc_request *request,
1634                                 struct osc_brw_async_args *aa, int rc)
1635 {
1636         struct ptlrpc_request *new_req;
1637         struct osc_brw_async_args *new_aa;
1638         struct osc_async_page *oap;
1639         ENTRY;
1640
1641         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1642                   "redo for recoverable error %d", rc);
1643
1644         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1645                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1646                                   aa->aa_cli, aa->aa_oa,
1647                                   NULL /* lsm unused by osc currently */,
1648                                   aa->aa_page_count, aa->aa_ppga,
1649                                   &new_req, aa->aa_ocapa, 0, 1);
1650         if (rc)
1651                 RETURN(rc);
1652
1653         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1654                 if (oap->oap_request != NULL) {
1655                         LASSERTF(request == oap->oap_request,
1656                                  "request %p != oap_request %p\n",
1657                                  request, oap->oap_request);
1658                         if (oap->oap_interrupted) {
1659                                 ptlrpc_req_finished(new_req);
1660                                 RETURN(-EINTR);
1661                         }
1662                 }
1663         }
1664         /* New request takes over pga and oaps from old request.
1665          * Note that copying a list_head doesn't work, need to move it... */
1666         aa->aa_resends++;
1667         new_req->rq_interpret_reply = request->rq_interpret_reply;
1668         new_req->rq_async_args = request->rq_async_args;
1669         new_req->rq_commit_cb = request->rq_commit_cb;
1670         /* cap resend delay to the current request timeout, this is similar to
1671          * what ptlrpc does (see after_reply()) */
1672         if (aa->aa_resends > new_req->rq_timeout)
1673                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1674         else
1675                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1676         new_req->rq_generation_set = 1;
1677         new_req->rq_import_generation = request->rq_import_generation;
1678
1679         new_aa = ptlrpc_req_async_args(new_req);
1680
1681         INIT_LIST_HEAD(&new_aa->aa_oaps);
1682         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1683         INIT_LIST_HEAD(&new_aa->aa_exts);
1684         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1685         new_aa->aa_resends = aa->aa_resends;
1686
1687         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1688                 if (oap->oap_request) {
1689                         ptlrpc_req_finished(oap->oap_request);
1690                         oap->oap_request = ptlrpc_request_addref(new_req);
1691                 }
1692         }
1693
1694         new_aa->aa_ocapa = aa->aa_ocapa;
1695         aa->aa_ocapa = NULL;
1696
1697         /* XXX: This code will run into problem if we're going to support
1698          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1699          * and wait for all of them to be finished. We should inherit request
1700          * set from old request. */
1701         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1702
1703         DEBUG_REQ(D_INFO, new_req, "new request");
1704         RETURN(0);
1705 }
1706
1707 /*
1708  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1709  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1710  * fine for our small page arrays and doesn't require allocation.  its an
1711  * insertion sort that swaps elements that are strides apart, shrinking the
1712  * stride down until its '1' and the array is sorted.
1713  */
1714 static void sort_brw_pages(struct brw_page **array, int num)
1715 {
1716         int stride, i, j;
1717         struct brw_page *tmp;
1718
1719         if (num == 1)
1720                 return;
1721         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1722                 ;
1723
1724         do {
1725                 stride /= 3;
1726                 for (i = stride ; i < num ; i++) {
1727                         tmp = array[i];
1728                         j = i;
1729                         while (j >= stride && array[j - stride]->off > tmp->off) {
1730                                 array[j] = array[j - stride];
1731                                 j -= stride;
1732                         }
1733                         array[j] = tmp;
1734                 }
1735         } while (stride > 1);
1736 }
1737
1738 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1739 {
1740         LASSERT(ppga != NULL);
1741         OBD_FREE(ppga, sizeof(*ppga) * count);
1742 }
1743
1744 static int brw_interpret(const struct lu_env *env,
1745                          struct ptlrpc_request *req, void *data, int rc)
1746 {
1747         struct osc_brw_async_args *aa = data;
1748         struct osc_extent *ext;
1749         struct osc_extent *tmp;
1750         struct client_obd *cli = aa->aa_cli;
1751         ENTRY;
1752
1753         rc = osc_brw_fini_request(req, rc);
1754         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1755         /* When server return -EINPROGRESS, client should always retry
1756          * regardless of the number of times the bulk was resent already. */
1757         if (osc_recoverable_error(rc)) {
1758                 if (req->rq_import_generation !=
1759                     req->rq_import->imp_generation) {
1760                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1761                                ""DOSTID", rc = %d.\n",
1762                                req->rq_import->imp_obd->obd_name,
1763                                POSTID(&aa->aa_oa->o_oi), rc);
1764                 } else if (rc == -EINPROGRESS ||
1765                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1766                         rc = osc_brw_redo_request(req, aa, rc);
1767                 } else {
1768                         CERROR("%s: too many resent retries for object: "
1769                                ""LPU64":"LPU64", rc = %d.\n",
1770                                req->rq_import->imp_obd->obd_name,
1771                                POSTID(&aa->aa_oa->o_oi), rc);
1772                 }
1773
1774                 if (rc == 0)
1775                         RETURN(0);
1776                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1777                         rc = -EIO;
1778         }
1779
1780         if (aa->aa_ocapa) {
1781                 capa_put(aa->aa_ocapa);
1782                 aa->aa_ocapa = NULL;
1783         }
1784
1785         if (rc == 0) {
1786                 struct obdo *oa = aa->aa_oa;
1787                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1788                 unsigned long valid = 0;
1789                 struct cl_object *obj;
1790                 struct osc_async_page *last;
1791
1792                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1793                 obj = osc2cl(last->oap_obj);
1794
1795                 cl_object_attr_lock(obj);
1796                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1797                         attr->cat_blocks = oa->o_blocks;
1798                         valid |= CAT_BLOCKS;
1799                 }
1800                 if (oa->o_valid & OBD_MD_FLMTIME) {
1801                         attr->cat_mtime = oa->o_mtime;
1802                         valid |= CAT_MTIME;
1803                 }
1804                 if (oa->o_valid & OBD_MD_FLATIME) {
1805                         attr->cat_atime = oa->o_atime;
1806                         valid |= CAT_ATIME;
1807                 }
1808                 if (oa->o_valid & OBD_MD_FLCTIME) {
1809                         attr->cat_ctime = oa->o_ctime;
1810                         valid |= CAT_CTIME;
1811                 }
1812
1813                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1814                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1815                         loff_t last_off = last->oap_count + last->oap_obj_off;
1816
1817                         /* Change file size if this is an out of quota or
1818                          * direct IO write and it extends the file size */
1819                         if (loi->loi_lvb.lvb_size < last_off) {
1820                                 attr->cat_size = last_off;
1821                                 valid |= CAT_SIZE;
1822                         }
1823                         /* Extend KMS if it's not a lockless write */
1824                         if (loi->loi_kms < last_off &&
1825                             oap2osc_page(last)->ops_srvlock == 0) {
1826                                 attr->cat_kms = last_off;
1827                                 valid |= CAT_KMS;
1828                         }
1829                 }
1830
1831                 if (valid != 0)
1832                         cl_object_attr_set(env, obj, attr, valid);
1833                 cl_object_attr_unlock(obj);
1834         }
1835         OBDO_FREE(aa->aa_oa);
1836
1837         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1838                 list_del_init(&ext->oe_link);
1839                 osc_extent_finish(env, ext, 1, rc);
1840         }
1841         LASSERT(list_empty(&aa->aa_exts));
1842         LASSERT(list_empty(&aa->aa_oaps));
1843
1844         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1845                           req->rq_bulk->bd_nob_transferred);
1846         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1847         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1848
1849         client_obd_list_lock(&cli->cl_loi_list_lock);
1850         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1851          * is called so we know whether to go to sync BRWs or wait for more
1852          * RPCs to complete */
1853         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1854                 cli->cl_w_in_flight--;
1855         else
1856                 cli->cl_r_in_flight--;
1857         osc_wake_cache_waiters(cli);
1858         client_obd_list_unlock(&cli->cl_loi_list_lock);
1859
1860         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1861         RETURN(rc);
1862 }
1863
1864 static void brw_commit(struct ptlrpc_request *req)
1865 {
1866         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1867          * this called via the rq_commit_cb, I need to ensure
1868          * osc_dec_unstable_pages is still called. Otherwise unstable
1869          * pages may be leaked. */
1870         spin_lock(&req->rq_lock);
1871         if (likely(req->rq_unstable)) {
1872                 req->rq_unstable = 0;
1873                 spin_unlock(&req->rq_lock);
1874
1875                 osc_dec_unstable_pages(req);
1876         } else {
1877                 req->rq_committed = 1;
1878                 spin_unlock(&req->rq_lock);
1879         }
1880 }
1881
1882 /**
1883  * Build an RPC by the list of extent @ext_list. The caller must ensure
1884  * that the total pages in this list are NOT over max pages per RPC.
1885  * Extents in the list must be in OES_RPC state.
1886  */
1887 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1888                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1889 {
1890         struct ptlrpc_request           *req = NULL;
1891         struct osc_extent               *ext;
1892         struct brw_page                 **pga = NULL;
1893         struct osc_brw_async_args       *aa = NULL;
1894         struct obdo                     *oa = NULL;
1895         struct osc_async_page           *oap;
1896         struct osc_async_page           *tmp;
1897         struct cl_req                   *clerq = NULL;
1898         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1899                                                                       CRT_READ;
1900         struct cl_req_attr              *crattr = NULL;
1901         obd_off                         starting_offset = OBD_OBJECT_EOF;
1902         obd_off                         ending_offset = 0;
1903         int                             mpflag = 0;
1904         int                             mem_tight = 0;
1905         int                             page_count = 0;
1906         int                             i;
1907         int                             rc;
1908         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1909
1910         ENTRY;
1911         LASSERT(!list_empty(ext_list));
1912
1913         /* add pages into rpc_list to build BRW rpc */
1914         list_for_each_entry(ext, ext_list, oe_link) {
1915                 LASSERT(ext->oe_state == OES_RPC);
1916                 mem_tight |= ext->oe_memalloc;
1917                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1918                         ++page_count;
1919                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1920                         if (starting_offset > oap->oap_obj_off)
1921                                 starting_offset = oap->oap_obj_off;
1922                         else
1923                                 LASSERT(oap->oap_page_off == 0);
1924                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1925                                 ending_offset = oap->oap_obj_off +
1926                                                 oap->oap_count;
1927                         else
1928                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1929                                         PAGE_CACHE_SIZE);
1930                 }
1931         }
1932
1933         if (mem_tight)
1934                 mpflag = cfs_memory_pressure_get_and_set();
1935
1936         OBD_ALLOC(crattr, sizeof(*crattr));
1937         if (crattr == NULL)
1938                 GOTO(out, rc = -ENOMEM);
1939
1940         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1941         if (pga == NULL)
1942                 GOTO(out, rc = -ENOMEM);
1943
1944         OBDO_ALLOC(oa);
1945         if (oa == NULL)
1946                 GOTO(out, rc = -ENOMEM);
1947
1948         i = 0;
1949         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1950                 struct cl_page *page = oap2cl_page(oap);
1951                 if (clerq == NULL) {
1952                         clerq = cl_req_alloc(env, page, crt,
1953                                              1 /* only 1-object rpcs for now */);
1954                         if (IS_ERR(clerq))
1955                                 GOTO(out, rc = PTR_ERR(clerq));
1956                 }
1957                 if (mem_tight)
1958                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1959                 pga[i] = &oap->oap_brw_page;
1960                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1961                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1962                        pga[i]->pg, page_index(oap->oap_page), oap,
1963                        pga[i]->flag);
1964                 i++;
1965                 cl_req_page_add(env, clerq, page);
1966         }
1967
1968         /* always get the data for the obdo for the rpc */
1969         LASSERT(clerq != NULL);
1970         crattr->cra_oa = oa;
1971         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1972
1973         rc = cl_req_prep(env, clerq);
1974         if (rc != 0) {
1975                 CERROR("cl_req_prep failed: %d\n", rc);
1976                 GOTO(out, rc);
1977         }
1978
1979         sort_brw_pages(pga, page_count);
1980         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1981                         pga, &req, crattr->cra_capa, 1, 0);
1982         if (rc != 0) {
1983                 CERROR("prep_req failed: %d\n", rc);
1984                 GOTO(out, rc);
1985         }
1986
1987         req->rq_commit_cb = brw_commit;
1988         req->rq_interpret_reply = brw_interpret;
1989
1990         if (mem_tight != 0)
1991                 req->rq_memalloc = 1;
1992
1993         /* Need to update the timestamps after the request is built in case
1994          * we race with setattr (locally or in queue at OST).  If OST gets
1995          * later setattr before earlier BRW (as determined by the request xid),
1996          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1997          * way to do this in a single call.  bug 10150 */
1998         cl_req_attr_set(env, clerq, crattr,
1999                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2000
2001         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2002
2003         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2004         aa = ptlrpc_req_async_args(req);
2005         INIT_LIST_HEAD(&aa->aa_oaps);
2006         list_splice_init(&rpc_list, &aa->aa_oaps);
2007         INIT_LIST_HEAD(&aa->aa_exts);
2008         list_splice_init(ext_list, &aa->aa_exts);
2009         aa->aa_clerq = clerq;
2010
2011         /* queued sync pages can be torn down while the pages
2012          * were between the pending list and the rpc */
2013         tmp = NULL;
2014         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2015                 /* only one oap gets a request reference */
2016                 if (tmp == NULL)
2017                         tmp = oap;
2018                 if (oap->oap_interrupted && !req->rq_intr) {
2019                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2020                                         oap, req);
2021                         ptlrpc_mark_interrupted(req);
2022                 }
2023         }
2024         if (tmp != NULL)
2025                 tmp->oap_request = ptlrpc_request_addref(req);
2026
2027         client_obd_list_lock(&cli->cl_loi_list_lock);
2028         starting_offset >>= PAGE_CACHE_SHIFT;
2029         if (cmd == OBD_BRW_READ) {
2030                 cli->cl_r_in_flight++;
2031                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2032                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2033                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2034                                       starting_offset + 1);
2035         } else {
2036                 cli->cl_w_in_flight++;
2037                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2038                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2039                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2040                                       starting_offset + 1);
2041         }
2042         client_obd_list_unlock(&cli->cl_loi_list_lock);
2043
2044         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2045                   page_count, aa, cli->cl_r_in_flight,
2046                   cli->cl_w_in_flight);
2047
2048         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2049          * see which CPU/NUMA node the majority of pages were allocated
2050          * on, and try to assign the async RPC to the CPU core
2051          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2052          *
2053          * But on the other hand, we expect that multiple ptlrpcd
2054          * threads and the initial write sponsor can run in parallel,
2055          * especially when data checksum is enabled, which is CPU-bound
2056          * operation and single ptlrpcd thread cannot process in time.
2057          * So more ptlrpcd threads sharing BRW load
2058          * (with PDL_POLICY_ROUND) seems better.
2059          */
2060         ptlrpcd_add_req(req, pol, -1);
2061         rc = 0;
2062         EXIT;
2063
2064 out:
2065         if (mem_tight != 0)
2066                 cfs_memory_pressure_restore(mpflag);
2067
2068         if (crattr != NULL) {
2069                 capa_put(crattr->cra_capa);
2070                 OBD_FREE(crattr, sizeof(*crattr));
2071         }
2072
2073         if (rc != 0) {
2074                 LASSERT(req == NULL);
2075
2076                 if (oa)
2077                         OBDO_FREE(oa);
2078                 if (pga)
2079                         OBD_FREE(pga, sizeof(*pga) * page_count);
2080                 /* this should happen rarely and is pretty bad, it makes the
2081                  * pending list not follow the dirty order */
2082                 while (!list_empty(ext_list)) {
2083                         ext = list_entry(ext_list->next, struct osc_extent,
2084                                          oe_link);
2085                         list_del_init(&ext->oe_link);
2086                         osc_extent_finish(env, ext, 0, rc);
2087                 }
2088                 if (clerq && !IS_ERR(clerq))
2089                         cl_req_completion(env, clerq, rc);
2090         }
2091         RETURN(rc);
2092 }
2093
2094 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2095                                         struct ldlm_enqueue_info *einfo)
2096 {
2097         void *data = einfo->ei_cbdata;
2098         int set = 0;
2099
2100         LASSERT(lock != NULL);
2101         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2102         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2103         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2104         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2105
2106         lock_res_and_lock(lock);
2107         spin_lock(&osc_ast_guard);
2108
2109         if (lock->l_ast_data == NULL)
2110                 lock->l_ast_data = data;
2111         if (lock->l_ast_data == data)
2112                 set = 1;
2113
2114         spin_unlock(&osc_ast_guard);
2115         unlock_res_and_lock(lock);
2116
2117         return set;
2118 }
2119
2120 static int osc_set_data_with_check(struct lustre_handle *lockh,
2121                                    struct ldlm_enqueue_info *einfo)
2122 {
2123         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2124         int set = 0;
2125
2126         if (lock != NULL) {
2127                 set = osc_set_lock_data_with_check(lock, einfo);
2128                 LDLM_LOCK_PUT(lock);
2129         } else
2130                 CERROR("lockh %p, data %p - client evicted?\n",
2131                        lockh, einfo->ei_cbdata);
2132         return set;
2133 }
2134
2135 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2136                              ldlm_iterator_t replace, void *data)
2137 {
2138         struct ldlm_res_id res_id;
2139         struct obd_device *obd = class_exp2obd(exp);
2140
2141         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2142         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2143         return 0;
2144 }
2145
2146 /* find any ldlm lock of the inode in osc
2147  * return 0    not find
2148  *        1    find one
2149  *      < 0    error */
2150 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2151                            ldlm_iterator_t replace, void *data)
2152 {
2153         struct ldlm_res_id res_id;
2154         struct obd_device *obd = class_exp2obd(exp);
2155         int rc = 0;
2156
2157         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2158         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2159         if (rc == LDLM_ITER_STOP)
2160                 return(1);
2161         if (rc == LDLM_ITER_CONTINUE)
2162                 return(0);
2163         return(rc);
2164 }
2165
2166 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2167                             obd_enqueue_update_f upcall, void *cookie,
2168                             __u64 *flags, int agl, int rc)
2169 {
2170         int intent = *flags & LDLM_FL_HAS_INTENT;
2171         ENTRY;
2172
2173         if (intent) {
2174                 /* The request was created before ldlm_cli_enqueue call. */
2175                 if (rc == ELDLM_LOCK_ABORTED) {
2176                         struct ldlm_reply *rep;
2177                         rep = req_capsule_server_get(&req->rq_pill,
2178                                                      &RMF_DLM_REP);
2179
2180                         LASSERT(rep != NULL);
2181                         rep->lock_policy_res1 =
2182                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2183                         if (rep->lock_policy_res1)
2184                                 rc = rep->lock_policy_res1;
2185                 }
2186         }
2187
2188         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2189             (rc == 0)) {
2190                 *flags |= LDLM_FL_LVB_READY;
2191                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2192                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2193         }
2194
2195         /* Call the update callback. */
2196         rc = (*upcall)(cookie, rc);
2197         RETURN(rc);
2198 }
2199
2200 static int osc_enqueue_interpret(const struct lu_env *env,
2201                                  struct ptlrpc_request *req,
2202                                  struct osc_enqueue_args *aa, int rc)
2203 {
2204         struct ldlm_lock *lock;
2205         struct lustre_handle handle;
2206         __u32 mode;
2207         struct ost_lvb *lvb;
2208         __u32 lvb_len;
2209         __u64 *flags = aa->oa_flags;
2210
2211         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2212          * might be freed anytime after lock upcall has been called. */
2213         lustre_handle_copy(&handle, aa->oa_lockh);
2214         mode = aa->oa_ei->ei_mode;
2215
2216         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2217          * be valid. */
2218         lock = ldlm_handle2lock(&handle);
2219
2220         /* Take an additional reference so that a blocking AST that
2221          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2222          * to arrive after an upcall has been executed by
2223          * osc_enqueue_fini(). */
2224         ldlm_lock_addref(&handle, mode);
2225
2226         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2227         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2228
2229         /* Let CP AST to grant the lock first. */
2230         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2231
2232         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2233                 lvb = NULL;
2234                 lvb_len = 0;
2235         } else {
2236                 lvb = aa->oa_lvb;
2237                 lvb_len = sizeof(*aa->oa_lvb);
2238         }
2239
2240         /* Complete obtaining the lock procedure. */
2241         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2242                                    mode, flags, lvb, lvb_len, &handle, rc);
2243         /* Complete osc stuff. */
2244         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2245                               flags, aa->oa_agl, rc);
2246
2247         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2248
2249         /* Release the lock for async request. */
2250         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2251                 /*
2252                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2253                  * not already released by
2254                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2255                  */
2256                 ldlm_lock_decref(&handle, mode);
2257
2258         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2259                  aa->oa_lockh, req, aa);
2260         ldlm_lock_decref(&handle, mode);
2261         LDLM_LOCK_PUT(lock);
2262         return rc;
2263 }
2264
2265 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2266
2267 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2268  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2269  * other synchronous requests, however keeping some locks and trying to obtain
2270  * others may take a considerable amount of time in a case of ost failure; and
2271  * when other sync requests do not get released lock from a client, the client
2272  * is excluded from the cluster -- such scenarious make the life difficult, so
2273  * release locks just after they are obtained. */
2274 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2275                      __u64 *flags, ldlm_policy_data_t *policy,
2276                      struct ost_lvb *lvb, int kms_valid,
2277                      obd_enqueue_update_f upcall, void *cookie,
2278                      struct ldlm_enqueue_info *einfo,
2279                      struct lustre_handle *lockh,
2280                      struct ptlrpc_request_set *rqset, int async, int agl)
2281 {
2282         struct obd_device *obd = exp->exp_obd;
2283         struct ptlrpc_request *req = NULL;
2284         int intent = *flags & LDLM_FL_HAS_INTENT;
2285         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2286         ldlm_mode_t mode;
2287         int rc;
2288         ENTRY;
2289
2290         /* Filesystem lock extents are extended to page boundaries so that
2291          * dealing with the page cache is a little smoother.  */
2292         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2293         policy->l_extent.end |= ~CFS_PAGE_MASK;
2294
2295         /*
2296          * kms is not valid when either object is completely fresh (so that no
2297          * locks are cached), or object was evicted. In the latter case cached
2298          * lock cannot be used, because it would prime inode state with
2299          * potentially stale LVB.
2300          */
2301         if (!kms_valid)
2302                 goto no_match;
2303
2304         /* Next, search for already existing extent locks that will cover us */
2305         /* If we're trying to read, we also search for an existing PW lock.  The
2306          * VFS and page cache already protect us locally, so lots of readers/
2307          * writers can share a single PW lock.
2308          *
2309          * There are problems with conversion deadlocks, so instead of
2310          * converting a read lock to a write lock, we'll just enqueue a new
2311          * one.
2312          *
2313          * At some point we should cancel the read lock instead of making them
2314          * send us a blocking callback, but there are problems with canceling
2315          * locks out from other users right now, too. */
2316         mode = einfo->ei_mode;
2317         if (einfo->ei_mode == LCK_PR)
2318                 mode |= LCK_PW;
2319         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2320                                einfo->ei_type, policy, mode, lockh, 0);
2321         if (mode) {
2322                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2323
2324                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2325                         /* For AGL, if enqueue RPC is sent but the lock is not
2326                          * granted, then skip to process this strpe.
2327                          * Return -ECANCELED to tell the caller. */
2328                         ldlm_lock_decref(lockh, mode);
2329                         LDLM_LOCK_PUT(matched);
2330                         RETURN(-ECANCELED);
2331                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2332                         *flags |= LDLM_FL_LVB_READY;
2333                         /* addref the lock only if not async requests and PW
2334                          * lock is matched whereas we asked for PR. */
2335                         if (!rqset && einfo->ei_mode != mode)
2336                                 ldlm_lock_addref(lockh, LCK_PR);
2337                         if (intent) {
2338                                 /* I would like to be able to ASSERT here that
2339                                  * rss <= kms, but I can't, for reasons which
2340                                  * are explained in lov_enqueue() */
2341                         }
2342
2343                         /* We already have a lock, and it's referenced.
2344                          *
2345                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2346                          * AGL upcall may change it to CLS_HELD directly. */
2347                         (*upcall)(cookie, ELDLM_OK);
2348
2349                         if (einfo->ei_mode != mode)
2350                                 ldlm_lock_decref(lockh, LCK_PW);
2351                         else if (rqset)
2352                                 /* For async requests, decref the lock. */
2353                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2354                         LDLM_LOCK_PUT(matched);
2355                         RETURN(ELDLM_OK);
2356                 } else {
2357                         ldlm_lock_decref(lockh, mode);
2358                         LDLM_LOCK_PUT(matched);
2359                 }
2360         }
2361
2362  no_match:
2363         if (intent) {
2364                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2365                                            &RQF_LDLM_ENQUEUE_LVB);
2366                 if (req == NULL)
2367                         RETURN(-ENOMEM);
2368
2369                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2370                 if (rc < 0) {
2371                         ptlrpc_request_free(req);
2372                         RETURN(rc);
2373                 }
2374
2375                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2376                                      sizeof *lvb);
2377                 ptlrpc_request_set_replen(req);
2378         }
2379
2380         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2381         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2382
2383         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2384                               sizeof(*lvb), LVB_T_OST, lockh, async);
2385         if (rqset) {
2386                 if (!rc) {
2387                         struct osc_enqueue_args *aa;
2388                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2389                         aa = ptlrpc_req_async_args(req);
2390                         aa->oa_ei = einfo;
2391                         aa->oa_exp = exp;
2392                         aa->oa_flags  = flags;
2393                         aa->oa_upcall = upcall;
2394                         aa->oa_cookie = cookie;
2395                         aa->oa_lvb    = lvb;
2396                         aa->oa_lockh  = lockh;
2397                         aa->oa_agl    = !!agl;
2398
2399                         req->rq_interpret_reply =
2400                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2401                         if (rqset == PTLRPCD_SET)
2402                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2403                         else
2404                                 ptlrpc_set_add_req(rqset, req);
2405                 } else if (intent) {
2406                         ptlrpc_req_finished(req);
2407                 }
2408                 RETURN(rc);
2409         }
2410
2411         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2412         if (intent)
2413                 ptlrpc_req_finished(req);
2414
2415         RETURN(rc);
2416 }
2417
2418 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2419                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2420                    __u64 *flags, void *data, struct lustre_handle *lockh,
2421                    int unref)
2422 {
2423         struct obd_device *obd = exp->exp_obd;
2424         __u64 lflags = *flags;
2425         ldlm_mode_t rc;
2426         ENTRY;
2427
2428         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2429                 RETURN(-EIO);
2430
2431         /* Filesystem lock extents are extended to page boundaries so that
2432          * dealing with the page cache is a little smoother */
2433         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2434         policy->l_extent.end |= ~CFS_PAGE_MASK;
2435
2436         /* Next, search for already existing extent locks that will cover us */
2437         /* If we're trying to read, we also search for an existing PW lock.  The
2438          * VFS and page cache already protect us locally, so lots of readers/
2439          * writers can share a single PW lock. */
2440         rc = mode;
2441         if (mode == LCK_PR)
2442                 rc |= LCK_PW;
2443         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2444                              res_id, type, policy, rc, lockh, unref);
2445         if (rc) {
2446                 if (data != NULL) {
2447                         if (!osc_set_data_with_check(lockh, data)) {
2448                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2449                                         ldlm_lock_decref(lockh, rc);
2450                                 RETURN(0);
2451                         }
2452                 }
2453                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2454                         ldlm_lock_addref(lockh, LCK_PR);
2455                         ldlm_lock_decref(lockh, LCK_PW);
2456                 }
2457                 RETURN(rc);
2458         }
2459         RETURN(rc);
2460 }
2461
2462 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2463 {
2464         ENTRY;
2465
2466         if (unlikely(mode == LCK_GROUP))
2467                 ldlm_lock_decref_and_cancel(lockh, mode);
2468         else
2469                 ldlm_lock_decref(lockh, mode);
2470
2471         RETURN(0);
2472 }
2473
2474 static int osc_statfs_interpret(const struct lu_env *env,
2475                                 struct ptlrpc_request *req,
2476                                 struct osc_async_args *aa, int rc)
2477 {
2478         struct obd_statfs *msfs;
2479         ENTRY;
2480
2481         if (rc == -EBADR)
2482                 /* The request has in fact never been sent
2483                  * due to issues at a higher level (LOV).
2484                  * Exit immediately since the caller is
2485                  * aware of the problem and takes care
2486                  * of the clean up */
2487                  RETURN(rc);
2488
2489         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2490             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2491                 GOTO(out, rc = 0);
2492
2493         if (rc != 0)
2494                 GOTO(out, rc);
2495
2496         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2497         if (msfs == NULL) {
2498                 GOTO(out, rc = -EPROTO);
2499         }
2500
2501         *aa->aa_oi->oi_osfs = *msfs;
2502 out:
2503         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2504         RETURN(rc);
2505 }
2506
2507 static int osc_statfs_async(struct obd_export *exp,
2508                             struct obd_info *oinfo, __u64 max_age,
2509                             struct ptlrpc_request_set *rqset)
2510 {
2511         struct obd_device     *obd = class_exp2obd(exp);
2512         struct ptlrpc_request *req;
2513         struct osc_async_args *aa;
2514         int                    rc;
2515         ENTRY;
2516
2517         /* We could possibly pass max_age in the request (as an absolute
2518          * timestamp or a "seconds.usec ago") so the target can avoid doing
2519          * extra calls into the filesystem if that isn't necessary (e.g.
2520          * during mount that would help a bit).  Having relative timestamps
2521          * is not so great if request processing is slow, while absolute
2522          * timestamps are not ideal because they need time synchronization. */
2523         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2524         if (req == NULL)
2525                 RETURN(-ENOMEM);
2526
2527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2528         if (rc) {
2529                 ptlrpc_request_free(req);
2530                 RETURN(rc);
2531         }
2532         ptlrpc_request_set_replen(req);
2533         req->rq_request_portal = OST_CREATE_PORTAL;
2534         ptlrpc_at_set_req_timeout(req);
2535
2536         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2537                 /* procfs requests not want stat in wait for avoid deadlock */
2538                 req->rq_no_resend = 1;
2539                 req->rq_no_delay = 1;
2540         }
2541
2542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2543         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2544         aa = ptlrpc_req_async_args(req);
2545         aa->aa_oi = oinfo;
2546
2547         ptlrpc_set_add_req(rqset, req);
2548         RETURN(0);
2549 }
2550
2551 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2552                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2553 {
2554         struct obd_device     *obd = class_exp2obd(exp);
2555         struct obd_statfs     *msfs;
2556         struct ptlrpc_request *req;
2557         struct obd_import     *imp = NULL;
2558         int rc;
2559         ENTRY;
2560
2561         /*Since the request might also come from lprocfs, so we need
2562          *sync this with client_disconnect_export Bug15684*/
2563         down_read(&obd->u.cli.cl_sem);
2564         if (obd->u.cli.cl_import)
2565                 imp = class_import_get(obd->u.cli.cl_import);
2566         up_read(&obd->u.cli.cl_sem);
2567         if (!imp)
2568                 RETURN(-ENODEV);
2569
2570         /* We could possibly pass max_age in the request (as an absolute
2571          * timestamp or a "seconds.usec ago") so the target can avoid doing
2572          * extra calls into the filesystem if that isn't necessary (e.g.
2573          * during mount that would help a bit).  Having relative timestamps
2574          * is not so great if request processing is slow, while absolute
2575          * timestamps are not ideal because they need time synchronization. */
2576         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2577
2578         class_import_put(imp);
2579
2580         if (req == NULL)
2581                 RETURN(-ENOMEM);
2582
2583         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2584         if (rc) {
2585                 ptlrpc_request_free(req);
2586                 RETURN(rc);
2587         }
2588         ptlrpc_request_set_replen(req);
2589         req->rq_request_portal = OST_CREATE_PORTAL;
2590         ptlrpc_at_set_req_timeout(req);
2591
2592         if (flags & OBD_STATFS_NODELAY) {
2593                 /* procfs requests not want stat in wait for avoid deadlock */
2594                 req->rq_no_resend = 1;
2595                 req->rq_no_delay = 1;
2596         }
2597
2598         rc = ptlrpc_queue_wait(req);
2599         if (rc)
2600                 GOTO(out, rc);
2601
2602         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2603         if (msfs == NULL) {
2604                 GOTO(out, rc = -EPROTO);
2605         }
2606
2607         *osfs = *msfs;
2608
2609         EXIT;
2610  out:
2611         ptlrpc_req_finished(req);
2612         return rc;
2613 }
2614
2615 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2616                          void *karg, void *uarg)
2617 {
2618         struct obd_device *obd = exp->exp_obd;
2619         struct obd_ioctl_data *data = karg;
2620         int err = 0;
2621         ENTRY;
2622
2623         if (!try_module_get(THIS_MODULE)) {
2624                 CERROR("Can't get module. Is it alive?");
2625                 return -EINVAL;
2626         }
2627         switch (cmd) {
2628         case OBD_IOC_CLIENT_RECOVER:
2629                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2630                                             data->ioc_inlbuf1, 0);
2631                 if (err > 0)
2632                         err = 0;
2633                 GOTO(out, err);
2634         case IOC_OSC_SET_ACTIVE:
2635                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2636                                                data->ioc_offset);
2637                 GOTO(out, err);
2638         case OBD_IOC_POLL_QUOTACHECK:
2639                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2640                 GOTO(out, err);
2641         case OBD_IOC_PING_TARGET:
2642                 err = ptlrpc_obd_ping(obd);
2643                 GOTO(out, err);
2644         default:
2645                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2646                        cmd, current_comm());
2647                 GOTO(out, err = -ENOTTY);
2648         }
2649 out:
2650         module_put(THIS_MODULE);
2651         return err;
2652 }
2653
2654 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2655                         obd_count keylen, void *key, __u32 *vallen, void *val,
2656                         struct lov_stripe_md *lsm)
2657 {
2658         ENTRY;
2659         if (!vallen || !val)
2660                 RETURN(-EFAULT);
2661
2662         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2663                 __u32 *stripe = val;
2664                 *vallen = sizeof(*stripe);
2665                 *stripe = 0;
2666                 RETURN(0);
2667         } else if (KEY_IS(KEY_LAST_ID)) {
2668                 struct ptlrpc_request *req;
2669                 obd_id                *reply;
2670                 char                  *tmp;
2671                 int                    rc;
2672
2673                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2674                                            &RQF_OST_GET_INFO_LAST_ID);
2675                 if (req == NULL)
2676                         RETURN(-ENOMEM);
2677
2678                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2679                                      RCL_CLIENT, keylen);
2680                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2681                 if (rc) {
2682                         ptlrpc_request_free(req);
2683                         RETURN(rc);
2684                 }
2685
2686                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2687                 memcpy(tmp, key, keylen);
2688
2689                 req->rq_no_delay = req->rq_no_resend = 1;
2690                 ptlrpc_request_set_replen(req);
2691                 rc = ptlrpc_queue_wait(req);
2692                 if (rc)
2693                         GOTO(out, rc);
2694
2695                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2696                 if (reply == NULL)
2697                         GOTO(out, rc = -EPROTO);
2698
2699                 *((obd_id *)val) = *reply;
2700         out:
2701                 ptlrpc_req_finished(req);
2702                 RETURN(rc);
2703         } else if (KEY_IS(KEY_FIEMAP)) {
2704                 struct ll_fiemap_info_key *fm_key =
2705                                 (struct ll_fiemap_info_key *)key;
2706                 struct ldlm_res_id       res_id;
2707                 ldlm_policy_data_t       policy;
2708                 struct lustre_handle     lockh;
2709                 ldlm_mode_t              mode = 0;
2710                 struct ptlrpc_request   *req;
2711                 struct ll_user_fiemap   *reply;
2712                 char                    *tmp;
2713                 int                      rc;
2714
2715                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2716                         goto skip_locking;
2717
2718                 policy.l_extent.start = fm_key->fiemap.fm_start &
2719                                                 CFS_PAGE_MASK;
2720
2721                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2722                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2723                         policy.l_extent.end = OBD_OBJECT_EOF;
2724                 else
2725                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2726                                 fm_key->fiemap.fm_length +
2727                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2728
2729                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2730                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2731                                        LDLM_FL_BLOCK_GRANTED |
2732                                        LDLM_FL_LVB_READY,
2733                                        &res_id, LDLM_EXTENT, &policy,
2734                                        LCK_PR | LCK_PW, &lockh, 0);
2735                 if (mode) { /* lock is cached on client */
2736                         if (mode != LCK_PR) {
2737                                 ldlm_lock_addref(&lockh, LCK_PR);
2738                                 ldlm_lock_decref(&lockh, LCK_PW);
2739                         }
2740                 } else { /* no cached lock, needs acquire lock on server side */
2741                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2742                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2743                 }
2744
2745 skip_locking:
2746                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2747                                            &RQF_OST_GET_INFO_FIEMAP);
2748                 if (req == NULL)
2749                         GOTO(drop_lock, rc = -ENOMEM);
2750
2751                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2752                                      RCL_CLIENT, keylen);
2753                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2754                                      RCL_CLIENT, *vallen);
2755                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2756                                      RCL_SERVER, *vallen);
2757
2758                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2759                 if (rc) {
2760                         ptlrpc_request_free(req);
2761                         GOTO(drop_lock, rc);
2762                 }
2763
2764                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2765                 memcpy(tmp, key, keylen);
2766                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2767                 memcpy(tmp, val, *vallen);
2768
2769                 ptlrpc_request_set_replen(req);
2770                 rc = ptlrpc_queue_wait(req);
2771                 if (rc)
2772                         GOTO(fini_req, rc);
2773
2774                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2775                 if (reply == NULL)
2776                         GOTO(fini_req, rc = -EPROTO);
2777
2778                 memcpy(val, reply, *vallen);
2779 fini_req:
2780                 ptlrpc_req_finished(req);
2781 drop_lock:
2782                 if (mode)
2783                         ldlm_lock_decref(&lockh, LCK_PR);
2784                 RETURN(rc);
2785         }
2786
2787         RETURN(-EINVAL);
2788 }
2789
2790 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2791                               obd_count keylen, void *key, obd_count vallen,
2792                               void *val, struct ptlrpc_request_set *set)
2793 {
2794         struct ptlrpc_request *req;
2795         struct obd_device     *obd = exp->exp_obd;
2796         struct obd_import     *imp = class_exp2cliimp(exp);
2797         char                  *tmp;
2798         int                    rc;
2799         ENTRY;
2800
2801         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2802
2803         if (KEY_IS(KEY_CHECKSUM)) {
2804                 if (vallen != sizeof(int))
2805                         RETURN(-EINVAL);
2806                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2807                 RETURN(0);
2808         }
2809
2810         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2811                 sptlrpc_conf_client_adapt(obd);
2812                 RETURN(0);
2813         }
2814
2815         if (KEY_IS(KEY_FLUSH_CTX)) {
2816                 sptlrpc_import_flush_my_ctx(imp);
2817                 RETURN(0);
2818         }
2819
2820         if (KEY_IS(KEY_CACHE_SET)) {
2821                 struct client_obd *cli = &obd->u.cli;
2822
2823                 LASSERT(cli->cl_cache == NULL); /* only once */
2824                 cli->cl_cache = (struct cl_client_cache *)val;
2825                 atomic_inc(&cli->cl_cache->ccc_users);
2826                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2827
2828                 /* add this osc into entity list */
2829                 LASSERT(list_empty(&cli->cl_lru_osc));
2830                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2831                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2832                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2833
2834                 RETURN(0);
2835         }
2836
2837         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2838                 struct client_obd *cli = &obd->u.cli;
2839                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2840                 int target = *(int *)val;
2841
2842                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2843                 *(int *)val -= nr;
2844                 RETURN(0);
2845         }
2846
2847         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2848                 RETURN(-EINVAL);
2849
2850         /* We pass all other commands directly to OST. Since nobody calls osc
2851            methods directly and everybody is supposed to go through LOV, we
2852            assume lov checked invalid values for us.
2853            The only recognised values so far are evict_by_nid and mds_conn.
2854            Even if something bad goes through, we'd get a -EINVAL from OST
2855            anyway. */
2856
2857         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2858                                                 &RQF_OST_SET_GRANT_INFO :
2859                                                 &RQF_OBD_SET_INFO);
2860         if (req == NULL)
2861                 RETURN(-ENOMEM);
2862
2863         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2864                              RCL_CLIENT, keylen);
2865         if (!KEY_IS(KEY_GRANT_SHRINK))
2866                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2867                                      RCL_CLIENT, vallen);
2868         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2869         if (rc) {
2870                 ptlrpc_request_free(req);
2871                 RETURN(rc);
2872         }
2873
2874         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2875         memcpy(tmp, key, keylen);
2876         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2877                                                         &RMF_OST_BODY :
2878                                                         &RMF_SETINFO_VAL);
2879         memcpy(tmp, val, vallen);
2880
2881         if (KEY_IS(KEY_GRANT_SHRINK)) {
2882                 struct osc_grant_args *aa;
2883                 struct obdo *oa;
2884
2885                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2886                 aa = ptlrpc_req_async_args(req);
2887                 OBDO_ALLOC(oa);
2888                 if (!oa) {
2889                         ptlrpc_req_finished(req);
2890                         RETURN(-ENOMEM);
2891                 }
2892                 *oa = ((struct ost_body *)val)->oa;
2893                 aa->aa_oa = oa;
2894                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2895         }
2896
2897         ptlrpc_request_set_replen(req);
2898         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2899                 LASSERT(set != NULL);
2900                 ptlrpc_set_add_req(set, req);
2901                 ptlrpc_check_set(NULL, set);
2902         } else
2903                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2904
2905         RETURN(0);
2906 }
2907
2908 static int osc_reconnect(const struct lu_env *env,
2909                          struct obd_export *exp, struct obd_device *obd,
2910                          struct obd_uuid *cluuid,
2911                          struct obd_connect_data *data,
2912                          void *localdata)
2913 {
2914         struct client_obd *cli = &obd->u.cli;
2915
2916         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2917                 long lost_grant;
2918
2919                 client_obd_list_lock(&cli->cl_loi_list_lock);
2920                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2921                                 2 * cli_brw_size(obd);
2922                 lost_grant = cli->cl_lost_grant;
2923                 cli->cl_lost_grant = 0;
2924                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2925
2926                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2927                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2928                        data->ocd_version, data->ocd_grant, lost_grant);
2929         }
2930
2931         RETURN(0);
2932 }
2933
2934 static int osc_disconnect(struct obd_export *exp)
2935 {
2936         struct obd_device *obd = class_exp2obd(exp);
2937         struct llog_ctxt  *ctxt;
2938         int rc;
2939
2940         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2941         if (ctxt) {
2942                 if (obd->u.cli.cl_conn_count == 1) {
2943                         /* Flush any remaining cancel messages out to the
2944                          * target */
2945                         llog_sync(ctxt, exp, 0);
2946                 }
2947                 llog_ctxt_put(ctxt);
2948         } else {
2949                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2950                        obd);
2951         }
2952
2953         rc = client_disconnect_export(exp);
2954         /**
2955          * Initially we put del_shrink_grant before disconnect_export, but it
2956          * causes the following problem if setup (connect) and cleanup
2957          * (disconnect) are tangled together.
2958          *      connect p1                     disconnect p2
2959          *   ptlrpc_connect_import
2960          *     ...............               class_manual_cleanup
2961          *                                     osc_disconnect
2962          *                                     del_shrink_grant
2963          *   ptlrpc_connect_interrupt
2964          *     init_grant_shrink
2965          *   add this client to shrink list
2966          *                                      cleanup_osc
2967          * Bang! pinger trigger the shrink.
2968          * So the osc should be disconnected from the shrink list, after we
2969          * are sure the import has been destroyed. BUG18662
2970          */
2971         if (obd->u.cli.cl_import == NULL)
2972                 osc_del_shrink_grant(&obd->u.cli);
2973         return rc;
2974 }
2975
2976 static int osc_import_event(struct obd_device *obd,
2977                             struct obd_import *imp,
2978                             enum obd_import_event event)
2979 {
2980         struct client_obd *cli;
2981         int rc = 0;
2982
2983         ENTRY;
2984         LASSERT(imp->imp_obd == obd);
2985
2986         switch (event) {
2987         case IMP_EVENT_DISCON: {
2988                 cli = &obd->u.cli;
2989                 client_obd_list_lock(&cli->cl_loi_list_lock);
2990                 cli->cl_avail_grant = 0;
2991                 cli->cl_lost_grant = 0;
2992                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2993                 break;
2994         }
2995         case IMP_EVENT_INACTIVE: {
2996                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2997                 break;
2998         }
2999         case IMP_EVENT_INVALIDATE: {
3000                 struct ldlm_namespace *ns = obd->obd_namespace;
3001                 struct lu_env         *env;
3002                 int                    refcheck;
3003
3004                 env = cl_env_get(&refcheck);
3005                 if (!IS_ERR(env)) {
3006                         /* Reset grants */
3007                         cli = &obd->u.cli;
3008                         /* all pages go to failing rpcs due to the invalid
3009                          * import */
3010                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3011
3012                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3013                         cl_env_put(env, &refcheck);
3014                 } else
3015                         rc = PTR_ERR(env);
3016                 break;
3017         }
3018         case IMP_EVENT_ACTIVE: {
3019                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3020                 break;
3021         }
3022         case IMP_EVENT_OCD: {
3023                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3024
3025                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3026                         osc_init_grant(&obd->u.cli, ocd);
3027
3028                 /* See bug 7198 */
3029                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3030                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3031
3032                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3033                 break;
3034         }
3035         case IMP_EVENT_DEACTIVATE: {
3036                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3037                 break;
3038         }
3039         case IMP_EVENT_ACTIVATE: {
3040                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3041                 break;
3042         }
3043         default:
3044                 CERROR("Unknown import event %d\n", event);
3045                 LBUG();
3046         }
3047         RETURN(rc);
3048 }
3049
3050 /**
3051  * Determine whether the lock can be canceled before replaying the lock
3052  * during recovery, see bug16774 for detailed information.
3053  *
3054  * \retval zero the lock can't be canceled
3055  * \retval other ok to cancel
3056  */
3057 static int osc_cancel_weight(struct ldlm_lock *lock)
3058 {
3059         /*
3060          * Cancel all unused and granted extent lock.
3061          */
3062         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3063             lock->l_granted_mode == lock->l_req_mode &&
3064             osc_ldlm_weigh_ast(lock) == 0)
3065                 RETURN(1);
3066
3067         RETURN(0);
3068 }
3069
3070 static int brw_queue_work(const struct lu_env *env, void *data)
3071 {
3072         struct client_obd *cli = data;
3073
3074         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3075
3076         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3077         RETURN(0);
3078 }
3079
3080 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3081 {
3082         struct client_obd *cli = &obd->u.cli;
3083         struct obd_type   *type;
3084         void              *handler;
3085         int                rc;
3086         ENTRY;
3087
3088         rc = ptlrpcd_addref();
3089         if (rc)
3090                 RETURN(rc);
3091
3092         rc = client_obd_setup(obd, lcfg);
3093         if (rc)
3094                 GOTO(out_ptlrpcd, rc);
3095
3096         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3097         if (IS_ERR(handler))
3098                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3099         cli->cl_writeback_work = handler;
3100
3101         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3102         if (IS_ERR(handler))
3103                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3104         cli->cl_lru_work = handler;
3105
3106         rc = osc_quota_setup(obd);
3107         if (rc)
3108                 GOTO(out_ptlrpcd_work, rc);
3109
3110         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3111
3112 #ifdef LPROCFS
3113         obd->obd_vars = lprocfs_osc_obd_vars;
3114 #endif
3115         /* If this is true then both client (osc) and server (osp) are on the
3116          * same node. The osp layer if loaded first will register the osc proc
3117          * directory. In that case this obd_device will be attached its proc
3118          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3119         type = class_search_type(LUSTRE_OSP_NAME);
3120         if (type && type->typ_procsym) {
3121                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3122                                                            type->typ_procsym,
3123                                                            obd->obd_vars, obd);
3124                 if (IS_ERR(obd->obd_proc_entry)) {
3125                         rc = PTR_ERR(obd->obd_proc_entry);
3126                         CERROR("error %d setting up lprocfs for %s\n", rc,
3127                                obd->obd_name);
3128                         obd->obd_proc_entry = NULL;
3129                 }
3130         } else {
3131                 rc = lprocfs_seq_obd_setup(obd);
3132         }
3133
3134         /* If the basic OSC proc tree construction succeeded then
3135          * lets do the rest. */
3136         if (rc == 0) {
3137                 lproc_osc_attach_seqstat(obd);
3138                 sptlrpc_lprocfs_cliobd_attach(obd);
3139                 ptlrpc_lprocfs_register_obd(obd);
3140         }
3141
3142         /* We need to allocate a few requests more, because
3143          * brw_interpret tries to create new requests before freeing
3144          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3145          * reserved, but I'm afraid that might be too much wasted RAM
3146          * in fact, so 2 is just my guess and still should work. */
3147         cli->cl_import->imp_rq_pool =
3148                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3149                                     OST_MAXREQSIZE,
3150                                     ptlrpc_add_rqs_to_pool);
3151
3152         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3153         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3154         RETURN(0);
3155
3156 out_ptlrpcd_work:
3157         if (cli->cl_writeback_work != NULL) {
3158                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3159                 cli->cl_writeback_work = NULL;
3160         }
3161         if (cli->cl_lru_work != NULL) {
3162                 ptlrpcd_destroy_work(cli->cl_lru_work);
3163                 cli->cl_lru_work = NULL;
3164         }
3165 out_client_setup:
3166         client_obd_cleanup(obd);
3167 out_ptlrpcd:
3168         ptlrpcd_decref();
3169         RETURN(rc);
3170 }
3171
3172 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3173 {
3174         int rc = 0;
3175         ENTRY;
3176
3177         switch (stage) {
3178         case OBD_CLEANUP_EARLY: {
3179                 struct obd_import *imp;
3180                 imp = obd->u.cli.cl_import;
3181                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3182                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3183                 ptlrpc_deactivate_import(imp);
3184                 spin_lock(&imp->imp_lock);
3185                 imp->imp_pingable = 0;
3186                 spin_unlock(&imp->imp_lock);
3187                 break;
3188         }
3189         case OBD_CLEANUP_EXPORTS: {
3190                 struct client_obd *cli = &obd->u.cli;
3191                 /* LU-464
3192                  * for echo client, export may be on zombie list, wait for
3193                  * zombie thread to cull it, because cli.cl_import will be
3194                  * cleared in client_disconnect_export():
3195                  *   class_export_destroy() -> obd_cleanup() ->
3196                  *   echo_device_free() -> echo_client_cleanup() ->
3197                  *   obd_disconnect() -> osc_disconnect() ->
3198                  *   client_disconnect_export()
3199                  */
3200                 obd_zombie_barrier();
3201                 if (cli->cl_writeback_work) {
3202                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3203                         cli->cl_writeback_work = NULL;
3204                 }
3205                 if (cli->cl_lru_work) {
3206                         ptlrpcd_destroy_work(cli->cl_lru_work);
3207                         cli->cl_lru_work = NULL;
3208                 }
3209                 obd_cleanup_client_import(obd);
3210                 ptlrpc_lprocfs_unregister_obd(obd);
3211                 lprocfs_obd_cleanup(obd);
3212                 rc = obd_llog_finish(obd, 0);
3213                 if (rc != 0)
3214                         CERROR("failed to cleanup llogging subsystems\n");
3215                 break;
3216                 }
3217         }
3218         RETURN(rc);
3219 }
3220
3221 int osc_cleanup(struct obd_device *obd)
3222 {
3223         struct client_obd *cli = &obd->u.cli;
3224         int rc;
3225
3226         ENTRY;
3227
3228         /* lru cleanup */
3229         if (cli->cl_cache != NULL) {
3230                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3231                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3232                 list_del_init(&cli->cl_lru_osc);
3233                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3234                 cli->cl_lru_left = NULL;
3235                 atomic_dec(&cli->cl_cache->ccc_users);
3236                 cli->cl_cache = NULL;
3237         }
3238
3239         /* free memory of osc quota cache */
3240         osc_quota_cleanup(obd);
3241
3242         rc = client_obd_cleanup(obd);
3243
3244         ptlrpcd_decref();
3245         RETURN(rc);
3246 }
3247
3248 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3249 {
3250         int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
3251                                               lcfg, obd);
3252         return rc > 0 ? 0: rc;
3253 }
3254
3255 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3256 {
3257         return osc_process_config_base(obd, buf);
3258 }
3259
3260 struct obd_ops osc_obd_ops = {
3261         .o_owner                = THIS_MODULE,
3262         .o_setup                = osc_setup,
3263         .o_precleanup           = osc_precleanup,
3264         .o_cleanup              = osc_cleanup,
3265         .o_add_conn             = client_import_add_conn,
3266         .o_del_conn             = client_import_del_conn,
3267         .o_connect              = client_connect_import,
3268         .o_reconnect            = osc_reconnect,
3269         .o_disconnect           = osc_disconnect,
3270         .o_statfs               = osc_statfs,
3271         .o_statfs_async         = osc_statfs_async,
3272         .o_unpackmd             = osc_unpackmd,
3273         .o_create               = osc_create,
3274         .o_destroy              = osc_destroy,
3275         .o_getattr              = osc_getattr,
3276         .o_getattr_async        = osc_getattr_async,
3277         .o_setattr              = osc_setattr,
3278         .o_setattr_async        = osc_setattr_async,
3279         .o_change_cbdata        = osc_change_cbdata,
3280         .o_find_cbdata          = osc_find_cbdata,
3281         .o_iocontrol            = osc_iocontrol,
3282         .o_get_info             = osc_get_info,
3283         .o_set_info_async       = osc_set_info_async,
3284         .o_import_event         = osc_import_event,
3285         .o_process_config       = osc_process_config,
3286         .o_quotactl             = osc_quotactl,
3287         .o_quotacheck           = osc_quotacheck,
3288 };
3289
3290 extern struct lu_kmem_descr osc_caches[];
3291 extern spinlock_t osc_ast_guard;
3292 extern struct lock_class_key osc_ast_guard_class;
3293
3294 int __init osc_init(void)
3295 {
3296         bool enable_proc = true;
3297         struct obd_type *type;
3298         int rc;
3299         ENTRY;
3300
3301         /* print an address of _any_ initialized kernel symbol from this
3302          * module, to allow debugging with gdb that doesn't support data
3303          * symbols from modules.*/
3304         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3305
3306         rc = lu_kmem_init(osc_caches);
3307         if (rc)
3308                 RETURN(rc);
3309
3310         type = class_search_type(LUSTRE_OSP_NAME);
3311         if (type != NULL && type->typ_procsym != NULL)
3312                 enable_proc = false;
3313
3314         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3315 #ifndef HAVE_ONLY_PROCFS_SEQ
3316                                  NULL,
3317 #endif
3318                                  LUSTRE_OSC_NAME, &osc_device_type);
3319         if (rc) {
3320                 lu_kmem_fini(osc_caches);
3321                 RETURN(rc);
3322         }
3323
3324         spin_lock_init(&osc_ast_guard);
3325         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3326
3327         RETURN(rc);
3328 }
3329
3330 #ifdef __KERNEL__
3331 static void /*__exit*/ osc_exit(void)
3332 {
3333         class_unregister_type(LUSTRE_OSC_NAME);
3334         lu_kmem_fini(osc_caches);
3335 }
3336
3337 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3338 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3339 MODULE_LICENSE("GPL");
3340
3341 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3342 #endif