Whamcloud - gitweb
b=17167 libcfs: ensure all libcfs exported symbols to have cfs_ prefix
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_async_args *aa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
362 out:
363         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364         RETURN(rc);
365 }
366
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368                              struct obd_trans_info *oti,
369                              struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request *req;
372         struct osc_async_args *aa;
373         int                    rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PSCOPE_OTHER);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403                 aa = ptlrpc_req_async_args(req);
404                 aa->aa_oi = oinfo;
405
406                 ptlrpc_set_add_req(rqset, req);
407         }
408
409         RETURN(0);
410 }
411
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct lov_stripe_md  *lsm;
418         int                    rc;
419         ENTRY;
420
421         LASSERT(oa);
422         LASSERT(ea);
423
424         lsm = *ea;
425         if (!lsm) {
426                 rc = obd_alloc_memmd(exp, &lsm);
427                 if (rc < 0)
428                         RETURN(rc);
429         }
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
432         if (req == NULL)
433                 GOTO(out, rc = -ENOMEM);
434
435         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 GOTO(out, rc);
439         }
440
441         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442         LASSERT(body);
443         lustre_set_wire_obdo(&body->oa, oa);
444
445         ptlrpc_request_set_replen(req);
446
447         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448             oa->o_flags == OBD_FL_DELORPHAN) {
449                 DEBUG_REQ(D_HA, req,
450                           "delorphan from OST integration");
451                 /* Don't resend the delorphan req */
452                 req->rq_no_resend = req->rq_no_delay = 1;
453         }
454
455         rc = ptlrpc_queue_wait(req);
456         if (rc)
457                 GOTO(out_req, rc);
458
459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460         if (body == NULL)
461                 GOTO(out_req, rc = -EPROTO);
462
463         lustre_get_wire_obdo(oa, &body->oa);
464
465         /* This should really be sent by the OST */
466         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467         oa->o_valid |= OBD_MD_FLBLKSZ;
468
469         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470          * have valid lsm_oinfo data structs, so don't go touching that.
471          * This needs to be fixed in a big way.
472          */
473         lsm->lsm_object_id = oa->o_id;
474         lsm->lsm_object_gr = oa->o_gr;
475         *ea = lsm;
476
477         if (oti != NULL) {
478                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
479
480                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481                         if (!oti->oti_logcookies)
482                                 oti_alloc_cookies(oti, 1);
483                         *oti->oti_logcookies = oa->o_lcookie;
484                 }
485         }
486
487         CDEBUG(D_HA, "transno: "LPD64"\n",
488                lustre_msg_get_transno(req->rq_repmsg));
489 out_req:
490         ptlrpc_req_finished(req);
491 out:
492         if (rc && !*ea)
493                 obd_free_memmd(exp, &lsm);
494         RETURN(rc);
495 }
496
497 static int osc_punch_interpret(const struct lu_env *env,
498                                struct ptlrpc_request *req,
499                                struct osc_punch_args *aa, int rc)
500 {
501         struct ost_body *body;
502         ENTRY;
503
504         if (rc != 0)
505                 GOTO(out, rc);
506
507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508         if (body == NULL)
509                 GOTO(out, rc = -EPROTO);
510
511         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
512 out:
513         rc = aa->pa_upcall(aa->pa_cookie, rc);
514         RETURN(rc);
515 }
516
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518                    struct obd_capa *capa,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request *req;
523         struct osc_punch_args *aa;
524         struct ost_body       *body;
525         int                    rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&body->oa, oa);
544         osc_pack_capa(req, body, capa);
545
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551         aa = ptlrpc_req_async_args(req);
552         aa->pa_oa     = oa;
553         aa->pa_upcall = upcall;
554         aa->pa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PSCOPE_OTHER);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564                      struct obd_trans_info *oti,
565                      struct ptlrpc_request_set *rqset)
566 {
567         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
568         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571                               oinfo->oi_cb_up, oinfo, rqset);
572 }
573
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575                     struct lov_stripe_md *md, obd_size start, obd_size end,
576                     void *capa)
577 {
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         int                    rc;
581         ENTRY;
582
583         if (!oa) {
584                 CDEBUG(D_INFO, "oa NULL\n");
585                 RETURN(-EINVAL);
586         }
587
588         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
589         if (req == NULL)
590                 RETURN(-ENOMEM);
591
592         osc_set_capa_size(req, &RMF_CAPA1, capa);
593         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
594         if (rc) {
595                 ptlrpc_request_free(req);
596                 RETURN(rc);
597         }
598
599         /* overload the size and blocks fields in the oa with start/end */
600         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
601         LASSERT(body);
602         lustre_set_wire_obdo(&body->oa, oa);
603         body->oa.o_size = start;
604         body->oa.o_blocks = end;
605         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606         osc_pack_capa(req, body, capa);
607
608         ptlrpc_request_set_replen(req);
609
610         rc = ptlrpc_queue_wait(req);
611         if (rc)
612                 GOTO(out, rc);
613
614         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
615         if (body == NULL)
616                 GOTO(out, rc = -EPROTO);
617
618         lustre_get_wire_obdo(oa, &body->oa);
619
620         EXIT;
621  out:
622         ptlrpc_req_finished(req);
623         return rc;
624 }
625
626 /* Find and cancel locally locks matched by @mode in the resource found by
627  * @objid. Found locks are added into @cancel list. Returns the amount of
628  * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
630                                    cfs_list_t *cancels,
631                                    ldlm_mode_t mode, int lock_flags)
632 {
633         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634         struct ldlm_res_id res_id;
635         struct ldlm_resource *res;
636         int count;
637         ENTRY;
638
639         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
641         if (res == NULL)
642                 RETURN(0);
643
644         LDLM_RESOURCE_ADDREF(res);
645         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646                                            lock_flags, 0, NULL);
647         LDLM_RESOURCE_DELREF(res);
648         ldlm_resource_putref(res);
649         RETURN(count);
650 }
651
652 static int osc_destroy_interpret(const struct lu_env *env,
653                                  struct ptlrpc_request *req, void *data,
654                                  int rc)
655 {
656         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
657
658         cfs_atomic_dec(&cli->cl_destroy_in_flight);
659         cfs_waitq_signal(&cli->cl_destroy_waitq);
660         return 0;
661 }
662
663 static int osc_can_send_destroy(struct client_obd *cli)
664 {
665         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
666             cli->cl_max_rpcs_in_flight) {
667                 /* The destroy request can be sent */
668                 return 1;
669         }
670         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
671             cli->cl_max_rpcs_in_flight) {
672                 /*
673                  * The counter has been modified between the two atomic
674                  * operations.
675                  */
676                 cfs_waitq_signal(&cli->cl_destroy_waitq);
677         }
678         return 0;
679 }
680
681 /* Destroy requests can be async always on the client, and we don't even really
682  * care about the return code since the client cannot do anything at all about
683  * a destroy failure.
684  * When the MDS is unlinking a filename, it saves the file objects into a
685  * recovery llog, and these object records are cancelled when the OST reports
686  * they were destroyed and sync'd to disk (i.e. transaction committed).
687  * If the client dies, or the OST is down when the object should be destroyed,
688  * the records are not cancelled, and when the OST reconnects to the MDS next,
689  * it will retrieve the llog unlink logs and then sends the log cancellation
690  * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
693                        struct obd_export *md_export, void *capa)
694 {
695         struct client_obd     *cli = &exp->exp_obd->u.cli;
696         struct ptlrpc_request *req;
697         struct ost_body       *body;
698         CFS_LIST_HEAD(cancels);
699         int rc, count;
700         ENTRY;
701
702         if (!oa) {
703                 CDEBUG(D_INFO, "oa NULL\n");
704                 RETURN(-EINVAL);
705         }
706
707         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708                                         LDLM_FL_DISCARD_DATA);
709
710         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
711         if (req == NULL) {
712                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
713                 RETURN(-ENOMEM);
714         }
715
716         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
718                                0, &cancels, count);
719         if (rc) {
720                 ptlrpc_request_free(req);
721                 RETURN(rc);
722         }
723
724         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725         ptlrpc_at_set_req_timeout(req);
726
727         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728                 oa->o_lcookie = *oti->oti_logcookies;
729         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
730         LASSERT(body);
731         lustre_set_wire_obdo(&body->oa, oa);
732
733         osc_pack_capa(req, body, (struct obd_capa *)capa);
734         ptlrpc_request_set_replen(req);
735
736         /* don't throttle destroy RPCs for the MDT */
737         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738                 req->rq_interpret_reply = osc_destroy_interpret;
739                 if (!osc_can_send_destroy(cli)) {
740                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
741                                                           NULL);
742
743                         /*
744                          * Wait until the number of on-going destroy RPCs drops
745                          * under max_rpc_in_flight
746                          */
747                         l_wait_event_exclusive(cli->cl_destroy_waitq,
748                                                osc_can_send_destroy(cli), &lwi);
749                 }
750         }
751
752         /* Do not wait for response */
753         ptlrpcd_add_req(req, PSCOPE_OTHER);
754         RETURN(0);
755 }
756
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
758                                 long writing_bytes)
759 {
760         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
761
762         LASSERT(!(oa->o_valid & bits));
763
764         oa->o_valid |= bits;
765         client_obd_list_lock(&cli->cl_loi_list_lock);
766         oa->o_dirty = cli->cl_dirty;
767         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
770                 oa->o_undirty = 0;
771         } else if (cfs_atomic_read(&obd_dirty_pages) -
772                    cfs_atomic_read(&obd_dirty_transit_pages) >
773                    obd_max_dirty_pages + 1){
774                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
775                  * not covered by a lock thus they may safely race and trip
776                  * this CERROR() unless we add in a small fudge factor (+1). */
777                 CERROR("dirty %d - %d > system dirty_max %d\n",
778                        cfs_atomic_read(&obd_dirty_pages),
779                        cfs_atomic_read(&obd_dirty_transit_pages),
780                        obd_max_dirty_pages);
781                 oa->o_undirty = 0;
782         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
783                 CERROR("dirty %lu - dirty_max %lu too big???\n",
784                        cli->cl_dirty, cli->cl_dirty_max);
785                 oa->o_undirty = 0;
786         } else {
787                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
788                                 (cli->cl_max_rpcs_in_flight + 1);
789                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
790         }
791         oa->o_grant = cli->cl_avail_grant;
792         oa->o_dropped = cli->cl_lost_grant;
793         cli->cl_lost_grant = 0;
794         client_obd_list_unlock(&cli->cl_loi_list_lock);
795         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
796                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
797
798 }
799
800 static void osc_update_next_shrink(struct client_obd *cli)
801 {
802         cli->cl_next_shrink_grant =
803                 cfs_time_shift(cli->cl_grant_shrink_interval);
804         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
805                cli->cl_next_shrink_grant);
806 }
807
808 /* caller must hold loi_list_lock */
809 static void osc_consume_write_grant(struct client_obd *cli,
810                                     struct brw_page *pga)
811 {
812         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         cfs_atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
833         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
834                 EXIT;
835                 return;
836         }
837
838         pga->flag &= ~OBD_BRW_FROM_GRANT;
839         cfs_atomic_dec(&obd_dirty_pages);
840         cli->cl_dirty -= CFS_PAGE_SIZE;
841         if (pga->flag & OBD_BRW_NOCACHE) {
842                 pga->flag &= ~OBD_BRW_NOCACHE;
843                 cfs_atomic_dec(&obd_dirty_transit_pages);
844                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
845         }
846         if (!sent) {
847                 cli->cl_lost_grant += CFS_PAGE_SIZE;
848                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
849                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
850         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
851                 /* For short writes we shouldn't count parts of pages that
852                  * span a whole block on the OST side, or our accounting goes
853                  * wrong.  Should match the code in filter_grant_check. */
854                 int offset = pga->off & ~CFS_PAGE_MASK;
855                 int count = pga->count + (offset & (blocksize - 1));
856                 int end = (offset + pga->count) & (blocksize - 1);
857                 if (end)
858                         count += blocksize - end;
859
860                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
861                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
862                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
863                        cli->cl_avail_grant, cli->cl_dirty);
864         }
865
866         EXIT;
867 }
868
869 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 {
871         return cli->cl_r_in_flight + cli->cl_w_in_flight;
872 }
873
874 /* caller must hold loi_list_lock */
875 void osc_wake_cache_waiters(struct client_obd *cli)
876 {
877         cfs_list_t *l, *tmp;
878         struct osc_cache_waiter *ocw;
879
880         ENTRY;
881         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
882                 /* if we can't dirty more, we must wait until some is written */
883                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
884                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
885                     obd_max_dirty_pages)) {
886                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887                                "osc max %ld, sys max %d\n", cli->cl_dirty,
888                                cli->cl_dirty_max, obd_max_dirty_pages);
889                         return;
890                 }
891
892                 /* if still dirty cache but no grant wait for pending RPCs that
893                  * may yet return us some grant before doing sync writes */
894                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896                                cli->cl_w_in_flight);
897                         return;
898                 }
899
900                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
901                 cfs_list_del_init(&ocw->ocw_entry);
902                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903                         /* no more RPCs in flight to return grant, do sync IO */
904                         ocw->ocw_rc = -EDQUOT;
905                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
906                 } else {
907                         osc_consume_write_grant(cli,
908                                                 &ocw->ocw_oap->oap_brw_page);
909                 }
910
911                 cfs_waitq_signal(&ocw->ocw_waitq);
912         }
913
914         EXIT;
915 }
916
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
918 {
919         client_obd_list_lock(&cli->cl_loi_list_lock);
920         cli->cl_avail_grant += grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922 }
923
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
925 {
926         if (body->oa.o_valid & OBD_MD_FLGRANT) {
927                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928                 __osc_update_grant(cli, body->oa.o_grant);
929         }
930 }
931
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933                               void *key, obd_count vallen, void *val,
934                               struct ptlrpc_request_set *set);
935
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937                                       struct ptlrpc_request *req,
938                                       void *aa, int rc)
939 {
940         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942         struct ost_body *body;
943
944         if (rc != 0) {
945                 __osc_update_grant(cli, oa->o_grant);
946                 GOTO(out, rc);
947         }
948
949         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
950         LASSERT(body);
951         osc_update_grant(cli, body);
952 out:
953         OBD_FREE_PTR(oa);
954         return rc;
955 }
956
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
958 {
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         oa->o_grant = cli->cl_avail_grant / 4;
961         cli->cl_avail_grant -= oa->o_grant;
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         oa->o_flags |= OBD_FL_SHRINK_GRANT;
964         osc_update_next_shrink(cli);
965 }
966
967 /* Shrink the current grant, either from some large amount to enough for a
968  * full set of in-flight RPCs, or if we have already shrunk to that limit
969  * then to enough for a single RPC.  This avoids keeping more grant than
970  * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
972 {
973         long target = (cli->cl_max_rpcs_in_flight + 1) *
974                       cli->cl_max_pages_per_rpc;
975
976         client_obd_list_lock(&cli->cl_loi_list_lock);
977         if (cli->cl_avail_grant <= target)
978                 target = cli->cl_max_pages_per_rpc;
979         client_obd_list_unlock(&cli->cl_loi_list_lock);
980
981         return osc_shrink_grant_to_target(cli, target);
982 }
983
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
985 {
986         int    rc = 0;
987         struct ost_body     *body;
988         ENTRY;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         /* Don't shrink if we are already above or below the desired limit
992          * We don't want to shrink below a single RPC, as that will negatively
993          * impact block allocation and long-term performance. */
994         if (target < cli->cl_max_pages_per_rpc)
995                 target = cli->cl_max_pages_per_rpc;
996
997         if (target >= cli->cl_avail_grant) {
998                 client_obd_list_unlock(&cli->cl_loi_list_lock);
999                 RETURN(0);
1000         }
1001         client_obd_list_unlock(&cli->cl_loi_list_lock);
1002
1003         OBD_ALLOC_PTR(body);
1004         if (!body)
1005                 RETURN(-ENOMEM);
1006
1007         osc_announce_cached(cli, &body->oa, 0);
1008
1009         client_obd_list_lock(&cli->cl_loi_list_lock);
1010         body->oa.o_grant = cli->cl_avail_grant - target;
1011         cli->cl_avail_grant = target;
1012         client_obd_list_unlock(&cli->cl_loi_list_lock);
1013         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014         osc_update_next_shrink(cli);
1015
1016         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018                                 sizeof(*body), body, NULL);
1019         if (rc != 0)
1020                 __osc_update_grant(cli, body->oa.o_grant);
1021         OBD_FREE_PTR(body);
1022         RETURN(rc);
1023 }
1024
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1027 {
1028         cfs_time_t time = cfs_time_current();
1029         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1033                         return 1;
1034                 else
1035                         osc_update_next_shrink(client);
1036         }
1037         return 0;
1038 }
1039
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1041 {
1042         struct client_obd *client;
1043
1044         cfs_list_for_each_entry(client, &item->ti_obd_list,
1045                                 cl_grant_shrink_list) {
1046                 if (osc_should_shrink_grant(client))
1047                         osc_shrink_grant(client);
1048         }
1049         return 0;
1050 }
1051
1052 static int osc_add_shrink_grant(struct client_obd *client)
1053 {
1054         int rc;
1055
1056         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057                                        TIMEOUT_GRANT,
1058                                        osc_grant_shrink_grant_cb, NULL,
1059                                        &client->cl_grant_shrink_list);
1060         if (rc) {
1061                 CERROR("add grant client %s error %d\n",
1062                         client->cl_import->imp_obd->obd_name, rc);
1063                 return rc;
1064         }
1065         CDEBUG(D_CACHE, "add grant client %s \n",
1066                client->cl_import->imp_obd->obd_name);
1067         osc_update_next_shrink(client);
1068         return 0;
1069 }
1070
1071 static int osc_del_shrink_grant(struct client_obd *client)
1072 {
1073         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074                                          TIMEOUT_GRANT);
1075 }
1076
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 {
1079         /*
1080          * ocd_grant is the total grant amount we're expect to hold: if we've
1081          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1082          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1083          *
1084          * race is tolerable here: if we're evicted, but imp_state already
1085          * left EVICTED state, then cl_dirty must be 0 already.
1086          */
1087         client_obd_list_lock(&cli->cl_loi_list_lock);
1088         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1089                 cli->cl_avail_grant = ocd->ocd_grant;
1090         else
1091                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1092         client_obd_list_unlock(&cli->cl_loi_list_lock);
1093
1094         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1095                cli->cl_avail_grant, cli->cl_lost_grant);
1096         LASSERT(cli->cl_avail_grant >= 0);
1097
1098         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1099             cfs_list_empty(&cli->cl_grant_shrink_list))
1100                 osc_add_shrink_grant(cli);
1101 }
1102
1103 /* We assume that the reason this OSC got a short read is because it read
1104  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1105  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1106  * this stripe never got written at or beyond this stripe offset yet. */
1107 static void handle_short_read(int nob_read, obd_count page_count,
1108                               struct brw_page **pga)
1109 {
1110         char *ptr;
1111         int i = 0;
1112
1113         /* skip bytes read OK */
1114         while (nob_read > 0) {
1115                 LASSERT (page_count > 0);
1116
1117                 if (pga[i]->count > nob_read) {
1118                         /* EOF inside this page */
1119                         ptr = cfs_kmap(pga[i]->pg) +
1120                                 (pga[i]->off & ~CFS_PAGE_MASK);
1121                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122                         cfs_kunmap(pga[i]->pg);
1123                         page_count--;
1124                         i++;
1125                         break;
1126                 }
1127
1128                 nob_read -= pga[i]->count;
1129                 page_count--;
1130                 i++;
1131         }
1132
1133         /* zero remaining pages */
1134         while (page_count-- > 0) {
1135                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1136                 memset(ptr, 0, pga[i]->count);
1137                 cfs_kunmap(pga[i]->pg);
1138                 i++;
1139         }
1140 }
1141
1142 static int check_write_rcs(struct ptlrpc_request *req,
1143                            int requested_nob, int niocount,
1144                            obd_count page_count, struct brw_page **pga)
1145 {
1146         int     i;
1147         __u32   *remote_rcs;
1148
1149         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1150                                                   sizeof(*remote_rcs) *
1151                                                   niocount);
1152         if (remote_rcs == NULL) {
1153                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1154                 return(-EPROTO);
1155         }
1156
1157         /* return error if any niobuf was in error */
1158         for (i = 0; i < niocount; i++) {
1159                 if (remote_rcs[i] < 0)
1160                         return(remote_rcs[i]);
1161
1162                 if (remote_rcs[i] != 0) {
1163                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1164                                 i, remote_rcs[i], req);
1165                         return(-EPROTO);
1166                 }
1167         }
1168
1169         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1170                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1171                        req->rq_bulk->bd_nob_transferred, requested_nob);
1172                 return(-EPROTO);
1173         }
1174
1175         return (0);
1176 }
1177
1178 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1179 {
1180         if (p1->flag != p2->flag) {
1181                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1182                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1183
1184                 /* warn if we try to combine flags that we don't know to be
1185                  * safe to combine */
1186                 if ((p1->flag & mask) != (p2->flag & mask))
1187                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1188                                "same brw?\n", p1->flag, p2->flag);
1189                 return 0;
1190         }
1191
1192         return (p1->off + p1->count == p2->off);
1193 }
1194
1195 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1196                                    struct brw_page **pga, int opc,
1197                                    cksum_type_t cksum_type)
1198 {
1199         __u32 cksum;
1200         int i = 0;
1201
1202         LASSERT (pg_count > 0);
1203         cksum = init_checksum(cksum_type);
1204         while (nob > 0 && pg_count > 0) {
1205                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1206                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1207                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (i == 0 && opc == OST_READ &&
1212                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1213                         memcpy(ptr + off, "bad1", min(4, nob));
1214                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1215                 cfs_kunmap(pga[i]->pg);
1216                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1217                                off, cksum);
1218
1219                 nob -= pga[i]->count;
1220                 pg_count--;
1221                 i++;
1222         }
1223         /* For sending we only compute the wrong checksum instead
1224          * of corrupting the data so it is still correct on a redo */
1225         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1226                 cksum++;
1227
1228         return cksum;
1229 }
1230
1231 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1232                                 struct lov_stripe_md *lsm, obd_count page_count,
1233                                 struct brw_page **pga,
1234                                 struct ptlrpc_request **reqp,
1235                                 struct obd_capa *ocapa, int reserve)
1236 {
1237         struct ptlrpc_request   *req;
1238         struct ptlrpc_bulk_desc *desc;
1239         struct ost_body         *body;
1240         struct obd_ioobj        *ioobj;
1241         struct niobuf_remote    *niobuf;
1242         int niocount, i, requested_nob, opc, rc;
1243         struct osc_brw_async_args *aa;
1244         struct req_capsule      *pill;
1245         struct brw_page *pg_prev;
1246
1247         ENTRY;
1248         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249                 RETURN(-ENOMEM); /* Recoverable */
1250         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251                 RETURN(-EINVAL); /* Fatal */
1252
1253         if ((cmd & OBD_BRW_WRITE) != 0) {
1254                 opc = OST_WRITE;
1255                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256                                                 cli->cl_import->imp_rq_pool,
1257                                                 &RQF_OST_BRW);
1258         } else {
1259                 opc = OST_READ;
1260                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1261         }
1262         if (req == NULL)
1263                 RETURN(-ENOMEM);
1264
1265         for (niocount = i = 1; i < page_count; i++) {
1266                 if (!can_merge_pages(pga[i - 1], pga[i]))
1267                         niocount++;
1268         }
1269
1270         pill = &req->rq_pill;
1271         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1272                              sizeof(*ioobj));
1273         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274                              niocount * sizeof(*niobuf));
1275         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1276
1277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1278         if (rc) {
1279                 ptlrpc_request_free(req);
1280                 RETURN(rc);
1281         }
1282         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283         ptlrpc_at_set_req_timeout(req);
1284
1285         if (opc == OST_WRITE)
1286                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1287                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1288         else
1289                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1290                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1291
1292         if (desc == NULL)
1293                 GOTO(out, rc = -ENOMEM);
1294         /* NB request now owns desc and will free it when it gets freed */
1295
1296         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1297         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1298         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1299         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1300
1301         lustre_set_wire_obdo(&body->oa, oa);
1302
1303         obdo_to_ioobj(oa, ioobj);
1304         ioobj->ioo_bufcnt = niocount;
1305         osc_pack_capa(req, body, ocapa);
1306         LASSERT (page_count > 0);
1307         pg_prev = pga[0];
1308         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309                 struct brw_page *pg = pga[i];
1310
1311                 LASSERT(pg->count > 0);
1312                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1313                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1314                          pg->off, pg->count);
1315 #ifdef __linux__
1316                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1317                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1318                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1319                          i, page_count,
1320                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1321                          pg_prev->pg, page_private(pg_prev->pg),
1322                          pg_prev->pg->index, pg_prev->off);
1323 #else
1324                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325                          "i %d p_c %u\n", i, page_count);
1326 #endif
1327                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1328                         (pg->flag & OBD_BRW_SRVLOCK));
1329
1330                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1331                                       pg->count);
1332                 requested_nob += pg->count;
1333
1334                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1335                         niobuf--;
1336                         niobuf->len += pg->count;
1337                 } else {
1338                         niobuf->offset = pg->off;
1339                         niobuf->len    = pg->count;
1340                         niobuf->flags  = pg->flag;
1341                 }
1342                 pg_prev = pg;
1343         }
1344
1345         LASSERTF((void *)(niobuf - niocount) ==
1346                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1349
1350         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351         if (osc_should_shrink_grant(cli))
1352                 osc_shrink_grant_local(cli, &body->oa);
1353
1354         /* size[REQ_REC_OFF] still sizeof (*body) */
1355         if (opc == OST_WRITE) {
1356                 if (unlikely(cli->cl_checksum) &&
1357                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1358                         /* store cl_cksum_type in a local variable since
1359                          * it can be changed via lprocfs */
1360                         cksum_type_t cksum_type = cli->cl_cksum_type;
1361
1362                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1364                                 body->oa.o_flags = 0;
1365                         }
1366                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1367                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1368                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1369                                                              page_count, pga,
1370                                                              OST_WRITE,
1371                                                              cksum_type);
1372                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1373                                body->oa.o_cksum);
1374                         /* save this in 'oa', too, for later checking */
1375                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376                         oa->o_flags |= cksum_type_pack(cksum_type);
1377                 } else {
1378                         /* clear out the checksum flag, in case this is a
1379                          * resend but cl_checksum is no longer set. b=11238 */
1380                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1381                 }
1382                 oa->o_cksum = body->oa.o_cksum;
1383                 /* 1 RC per niobuf */
1384                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1385                                      sizeof(__u32) * niocount);
1386         } else {
1387                 if (unlikely(cli->cl_checksum) &&
1388                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1390                                 body->oa.o_flags = 0;
1391                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1392                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1393                 }
1394                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
1395                 /* 1 RC for the whole I/O */
1396         }
1397         ptlrpc_request_set_replen(req);
1398
1399         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1400         aa = ptlrpc_req_async_args(req);
1401         aa->aa_oa = oa;
1402         aa->aa_requested_nob = requested_nob;
1403         aa->aa_nio_count = niocount;
1404         aa->aa_page_count = page_count;
1405         aa->aa_resends = 0;
1406         aa->aa_ppga = pga;
1407         aa->aa_cli = cli;
1408         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1409         if (ocapa && reserve)
1410                 aa->aa_ocapa = capa_get(ocapa);
1411
1412         *reqp = req;
1413         RETURN(0);
1414
1415  out:
1416         ptlrpc_req_finished(req);
1417         RETURN(rc);
1418 }
1419
1420 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1421                                 __u32 client_cksum, __u32 server_cksum, int nob,
1422                                 obd_count page_count, struct brw_page **pga,
1423                                 cksum_type_t client_cksum_type)
1424 {
1425         __u32 new_cksum;
1426         char *msg;
1427         cksum_type_t cksum_type;
1428
1429         if (server_cksum == client_cksum) {
1430                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1431                 return 0;
1432         }
1433
1434         if (oa->o_valid & OBD_MD_FLFLAGS)
1435                 cksum_type = cksum_type_unpack(oa->o_flags);
1436         else
1437                 cksum_type = OBD_CKSUM_CRC32;
1438
1439         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440                                       cksum_type);
1441
1442         if (cksum_type != client_cksum_type)
1443                 msg = "the server did not use the checksum type specified in "
1444                       "the original request - likely a protocol problem";
1445         else if (new_cksum == server_cksum)
1446                 msg = "changed on the client after we checksummed it - "
1447                       "likely false positive due to mmap IO (bug 11742)";
1448         else if (new_cksum == client_cksum)
1449                 msg = "changed in transit before arrival at OST";
1450         else
1451                 msg = "changed in transit AND doesn't match the original - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453
1454         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1455                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1456                            "["LPU64"-"LPU64"]\n",
1457                            msg, libcfs_nid2str(peer->nid),
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1460                                                         (__u64)0,
1461                            oa->o_id,
1462                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1463                            pga[0]->off,
1464                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1465         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1466                "client csum now %x\n", client_cksum, client_cksum_type,
1467                server_cksum, cksum_type, new_cksum);
1468         return 1;
1469 }
1470
1471 /* Note rc enters this function as number of bytes transferred */
1472 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1473 {
1474         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1475         const lnet_process_id_t *peer =
1476                         &req->rq_import->imp_connection->c_peer;
1477         struct client_obd *cli = aa->aa_cli;
1478         struct ost_body *body;
1479         __u32 client_cksum = 0;
1480         ENTRY;
1481
1482         if (rc < 0 && rc != -EDQUOT)
1483                 RETURN(rc);
1484
1485         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1486         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1487         if (body == NULL) {
1488                 CDEBUG(D_INFO, "Can't unpack body\n");
1489                 RETURN(-EPROTO);
1490         }
1491
1492         /* set/clear over quota flag for a uid/gid */
1493         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1494             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1495                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1496
1497                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1498                              body->oa.o_flags);
1499         }
1500
1501         if (rc < 0)
1502                 RETURN(rc);
1503
1504         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1505                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1506
1507         osc_update_grant(cli, body);
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         RETURN(-EPROTO);
1513                 }
1514                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         RETURN(-EAGAIN);
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa->aa_requested_nob,
1522                                          aa->aa_page_count, aa->aa_ppga,
1523                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1524                         RETURN(-EAGAIN);
1525
1526                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527                                      aa->aa_page_count, aa->aa_ppga);
1528                 GOTO(out, rc);
1529         }
1530
1531         /* The rest of this function executes only for OST_READs */
1532
1533         /* if unwrap_bulk failed, return -EAGAIN to retry */
1534         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535         if (rc < 0)
1536                 GOTO(out, rc = -EAGAIN);
1537
1538         if (rc > aa->aa_requested_nob) {
1539                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1540                        aa->aa_requested_nob);
1541                 RETURN(-EPROTO);
1542         }
1543
1544         if (rc != req->rq_bulk->bd_nob_transferred) {
1545                 CERROR ("Unexpected rc %d (%d transferred)\n",
1546                         rc, req->rq_bulk->bd_nob_transferred);
1547                 return (-EPROTO);
1548         }
1549
1550         if (rc < aa->aa_requested_nob)
1551                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554                 static int cksum_counter;
1555                 __u32      server_cksum = body->oa.o_cksum;
1556                 char      *via;
1557                 char      *router;
1558                 cksum_type_t cksum_type;
1559
1560                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1561                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1562                 else
1563                         cksum_type = OBD_CKSUM_CRC32;
1564                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1565                                                  aa->aa_ppga, OST_READ,
1566                                                  cksum_type);
1567
1568                 if (peer->nid == req->rq_bulk->bd_sender) {
1569                         via = router = "";
1570                 } else {
1571                         via = " via ";
1572                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1573                 }
1574
1575                 if (server_cksum == ~0 && rc > 0) {
1576                         CERROR("Protocol error: server %s set the 'checksum' "
1577                                "bit, but didn't send a checksum.  Not fatal, "
1578                                "but please notify on http://bugzilla.lustre.org/\n",
1579                                libcfs_nid2str(peer->nid));
1580                 } else if (server_cksum != client_cksum) {
1581                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1582                                            "%s%s%s inum "LPU64"/"LPU64" object "
1583                                            LPU64"/"LPU64" extent "
1584                                            "["LPU64"-"LPU64"]\n",
1585                                            req->rq_import->imp_obd->obd_name,
1586                                            libcfs_nid2str(peer->nid),
1587                                            via, router,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_fid : (__u64)0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_generation :(__u64)0,
1592                                            body->oa.o_id,
1593                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1594                                                 body->oa.o_gr : (__u64)0,
1595                                            aa->aa_ppga[0]->off,
1596                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1597                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1598                                                                         1);
1599                         CERROR("client %x, server %x, cksum_type %x\n",
1600                                client_cksum, server_cksum, cksum_type);
1601                         cksum_counter = 0;
1602                         aa->aa_oa->o_cksum = client_cksum;
1603                         rc = -EAGAIN;
1604                 } else {
1605                         cksum_counter++;
1606                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1607                         rc = 0;
1608                 }
1609         } else if (unlikely(client_cksum)) {
1610                 static int cksum_missed;
1611
1612                 cksum_missed++;
1613                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614                         CERROR("Checksum %u requested from %s but not sent\n",
1615                                cksum_missed, libcfs_nid2str(peer->nid));
1616         } else {
1617                 rc = 0;
1618         }
1619 out:
1620         if (rc >= 0)
1621                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1622
1623         RETURN(rc);
1624 }
1625
1626 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1627                             struct lov_stripe_md *lsm,
1628                             obd_count page_count, struct brw_page **pga,
1629                             struct obd_capa *ocapa)
1630 {
1631         struct ptlrpc_request *req;
1632         int                    rc;
1633         cfs_waitq_t            waitq;
1634         int                    resends = 0;
1635         struct l_wait_info     lwi;
1636
1637         ENTRY;
1638
1639         cfs_waitq_init(&waitq);
1640
1641 restart_bulk:
1642         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1643                                   page_count, pga, &req, ocapa, 0);
1644         if (rc != 0)
1645                 return (rc);
1646
1647         rc = ptlrpc_queue_wait(req);
1648
1649         if (rc == -ETIMEDOUT && req->rq_resend) {
1650                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1651                 ptlrpc_req_finished(req);
1652                 goto restart_bulk;
1653         }
1654
1655         rc = osc_brw_fini_request(req, rc);
1656
1657         ptlrpc_req_finished(req);
1658         if (osc_recoverable_error(rc)) {
1659                 resends++;
1660                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1661                         CERROR("too many resend retries, returning error\n");
1662                         RETURN(-EIO);
1663                 }
1664
1665                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1666                 l_wait_event(waitq, 0, &lwi);
1667
1668                 goto restart_bulk;
1669         }
1670
1671         RETURN (rc);
1672 }
1673
1674 int osc_brw_redo_request(struct ptlrpc_request *request,
1675                          struct osc_brw_async_args *aa)
1676 {
1677         struct ptlrpc_request *new_req;
1678         struct ptlrpc_request_set *set = request->rq_set;
1679         struct osc_brw_async_args *new_aa;
1680         struct osc_async_page *oap;
1681         int rc = 0;
1682         ENTRY;
1683
1684         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1685                 CERROR("too many resend retries, returning error\n");
1686                 RETURN(-EIO);
1687         }
1688
1689         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1690
1691         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1692                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1693                                   aa->aa_cli, aa->aa_oa,
1694                                   NULL /* lsm unused by osc currently */,
1695                                   aa->aa_page_count, aa->aa_ppga,
1696                                   &new_req, aa->aa_ocapa, 0);
1697         if (rc)
1698                 RETURN(rc);
1699
1700         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1701
1702         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1703                 if (oap->oap_request != NULL) {
1704                         LASSERTF(request == oap->oap_request,
1705                                  "request %p != oap_request %p\n",
1706                                  request, oap->oap_request);
1707                         if (oap->oap_interrupted) {
1708                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1709                                 ptlrpc_req_finished(new_req);
1710                                 RETURN(-EINTR);
1711                         }
1712                 }
1713         }
1714         /* New request takes over pga and oaps from old request.
1715          * Note that copying a list_head doesn't work, need to move it... */
1716         aa->aa_resends++;
1717         new_req->rq_interpret_reply = request->rq_interpret_reply;
1718         new_req->rq_async_args = request->rq_async_args;
1719         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1720
1721         new_aa = ptlrpc_req_async_args(new_req);
1722
1723         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1724         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1725         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1726
1727         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1728                 if (oap->oap_request) {
1729                         ptlrpc_req_finished(oap->oap_request);
1730                         oap->oap_request = ptlrpc_request_addref(new_req);
1731                 }
1732         }
1733
1734         new_aa->aa_ocapa = aa->aa_ocapa;
1735         aa->aa_ocapa = NULL;
1736
1737         /* use ptlrpc_set_add_req is safe because interpret functions work
1738          * in check_set context. only one way exist with access to request
1739          * from different thread got -EINTR - this way protected with
1740          * cl_loi_list_lock */
1741         ptlrpc_set_add_req(set, new_req);
1742
1743         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1744
1745         DEBUG_REQ(D_INFO, new_req, "new request");
1746         RETURN(0);
1747 }
1748
1749 /*
1750  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1751  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1752  * fine for our small page arrays and doesn't require allocation.  its an
1753  * insertion sort that swaps elements that are strides apart, shrinking the
1754  * stride down until its '1' and the array is sorted.
1755  */
1756 static void sort_brw_pages(struct brw_page **array, int num)
1757 {
1758         int stride, i, j;
1759         struct brw_page *tmp;
1760
1761         if (num == 1)
1762                 return;
1763         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1764                 ;
1765
1766         do {
1767                 stride /= 3;
1768                 for (i = stride ; i < num ; i++) {
1769                         tmp = array[i];
1770                         j = i;
1771                         while (j >= stride && array[j - stride]->off > tmp->off) {
1772                                 array[j] = array[j - stride];
1773                                 j -= stride;
1774                         }
1775                         array[j] = tmp;
1776                 }
1777         } while (stride > 1);
1778 }
1779
1780 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1781 {
1782         int count = 1;
1783         int offset;
1784         int i = 0;
1785
1786         LASSERT (pages > 0);
1787         offset = pg[i]->off & ~CFS_PAGE_MASK;
1788
1789         for (;;) {
1790                 pages--;
1791                 if (pages == 0)         /* that's all */
1792                         return count;
1793
1794                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1795                         return count;   /* doesn't end on page boundary */
1796
1797                 i++;
1798                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1799                 if (offset != 0)        /* doesn't start on page boundary */
1800                         return count;
1801
1802                 count++;
1803         }
1804 }
1805
1806 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1807 {
1808         struct brw_page **ppga;
1809         int i;
1810
1811         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1812         if (ppga == NULL)
1813                 return NULL;
1814
1815         for (i = 0; i < count; i++)
1816                 ppga[i] = pga + i;
1817         return ppga;
1818 }
1819
1820 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1821 {
1822         LASSERT(ppga != NULL);
1823         OBD_FREE(ppga, sizeof(*ppga) * count);
1824 }
1825
1826 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1827                    obd_count page_count, struct brw_page *pga,
1828                    struct obd_trans_info *oti)
1829 {
1830         struct obdo *saved_oa = NULL;
1831         struct brw_page **ppga, **orig;
1832         struct obd_import *imp = class_exp2cliimp(exp);
1833         struct client_obd *cli;
1834         int rc, page_count_orig;
1835         ENTRY;
1836
1837         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1838         cli = &imp->imp_obd->u.cli;
1839
1840         if (cmd & OBD_BRW_CHECK) {
1841                 /* The caller just wants to know if there's a chance that this
1842                  * I/O can succeed */
1843
1844                 if (imp->imp_invalid)
1845                         RETURN(-EIO);
1846                 RETURN(0);
1847         }
1848
1849         /* test_brw with a failed create can trip this, maybe others. */
1850         LASSERT(cli->cl_max_pages_per_rpc);
1851
1852         rc = 0;
1853
1854         orig = ppga = osc_build_ppga(pga, page_count);
1855         if (ppga == NULL)
1856                 RETURN(-ENOMEM);
1857         page_count_orig = page_count;
1858
1859         sort_brw_pages(ppga, page_count);
1860         while (page_count) {
1861                 obd_count pages_per_brw;
1862
1863                 if (page_count > cli->cl_max_pages_per_rpc)
1864                         pages_per_brw = cli->cl_max_pages_per_rpc;
1865                 else
1866                         pages_per_brw = page_count;
1867
1868                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1869
1870                 if (saved_oa != NULL) {
1871                         /* restore previously saved oa */
1872                         *oinfo->oi_oa = *saved_oa;
1873                 } else if (page_count > pages_per_brw) {
1874                         /* save a copy of oa (brw will clobber it) */
1875                         OBDO_ALLOC(saved_oa);
1876                         if (saved_oa == NULL)
1877                                 GOTO(out, rc = -ENOMEM);
1878                         *saved_oa = *oinfo->oi_oa;
1879                 }
1880
1881                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1882                                       pages_per_brw, ppga, oinfo->oi_capa);
1883
1884                 if (rc != 0)
1885                         break;
1886
1887                 page_count -= pages_per_brw;
1888                 ppga += pages_per_brw;
1889         }
1890
1891 out:
1892         osc_release_ppga(orig, page_count_orig);
1893
1894         if (saved_oa != NULL)
1895                 OBDO_FREE(saved_oa);
1896
1897         RETURN(rc);
1898 }
1899
1900 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1901  * the dirty accounting.  Writeback completes or truncate happens before
1902  * writing starts.  Must be called with the loi lock held. */
1903 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1904                            int sent)
1905 {
1906         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1907 }
1908
1909
1910 /* This maintains the lists of pending pages to read/write for a given object
1911  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1912  * to quickly find objects that are ready to send an RPC. */
1913 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1914                          int cmd)
1915 {
1916         int optimal;
1917         ENTRY;
1918
1919         if (lop->lop_num_pending == 0)
1920                 RETURN(0);
1921
1922         /* if we have an invalid import we want to drain the queued pages
1923          * by forcing them through rpcs that immediately fail and complete
1924          * the pages.  recovery relies on this to empty the queued pages
1925          * before canceling the locks and evicting down the llite pages */
1926         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1927                 RETURN(1);
1928
1929         /* stream rpcs in queue order as long as as there is an urgent page
1930          * queued.  this is our cheap solution for good batching in the case
1931          * where writepage marks some random page in the middle of the file
1932          * as urgent because of, say, memory pressure */
1933         if (!cfs_list_empty(&lop->lop_urgent)) {
1934                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1935                 RETURN(1);
1936         }
1937         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1938         optimal = cli->cl_max_pages_per_rpc;
1939         if (cmd & OBD_BRW_WRITE) {
1940                 /* trigger a write rpc stream as long as there are dirtiers
1941                  * waiting for space.  as they're waiting, they're not going to
1942                  * create more pages to coallesce with what's waiting.. */
1943                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1944                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1945                         RETURN(1);
1946                 }
1947                 /* +16 to avoid triggering rpcs that would want to include pages
1948                  * that are being queued but which can't be made ready until
1949                  * the queuer finishes with the page. this is a wart for
1950                  * llite::commit_write() */
1951                 optimal += 16;
1952         }
1953         if (lop->lop_num_pending >= optimal)
1954                 RETURN(1);
1955
1956         RETURN(0);
1957 }
1958
1959 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1960 {
1961         struct osc_async_page *oap;
1962         ENTRY;
1963
1964         if (cfs_list_empty(&lop->lop_urgent))
1965                 RETURN(0);
1966
1967         oap = cfs_list_entry(lop->lop_urgent.next,
1968                          struct osc_async_page, oap_urgent_item);
1969
1970         if (oap->oap_async_flags & ASYNC_HP) {
1971                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1972                 RETURN(1);
1973         }
1974
1975         RETURN(0);
1976 }
1977
1978 static void on_list(cfs_list_t *item, cfs_list_t *list,
1979                     int should_be_on)
1980 {
1981         if (cfs_list_empty(item) && should_be_on)
1982                 cfs_list_add_tail(item, list);
1983         else if (!cfs_list_empty(item) && !should_be_on)
1984                 cfs_list_del_init(item);
1985 }
1986
1987 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1988  * can find pages to build into rpcs quickly */
1989 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1990 {
1991         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1992             lop_makes_hprpc(&loi->loi_read_lop)) {
1993                 /* HP rpc */
1994                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1995                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1996         } else {
1997                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1998                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1999                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2000                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2001         }
2002
2003         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2004                 loi->loi_write_lop.lop_num_pending);
2005
2006         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2007                 loi->loi_read_lop.lop_num_pending);
2008 }
2009
2010 static void lop_update_pending(struct client_obd *cli,
2011                                struct loi_oap_pages *lop, int cmd, int delta)
2012 {
2013         lop->lop_num_pending += delta;
2014         if (cmd & OBD_BRW_WRITE)
2015                 cli->cl_pending_w_pages += delta;
2016         else
2017                 cli->cl_pending_r_pages += delta;
2018 }
2019
2020 /**
2021  * this is called when a sync waiter receives an interruption.  Its job is to
2022  * get the caller woken as soon as possible.  If its page hasn't been put in an
2023  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2024  * desiring interruption which will forcefully complete the rpc once the rpc
2025  * has timed out.
2026  */
2027 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2028 {
2029         struct loi_oap_pages *lop;
2030         struct lov_oinfo *loi;
2031         int rc = -EBUSY;
2032         ENTRY;
2033
2034         LASSERT(!oap->oap_interrupted);
2035         oap->oap_interrupted = 1;
2036
2037         /* ok, it's been put in an rpc. only one oap gets a request reference */
2038         if (oap->oap_request != NULL) {
2039                 ptlrpc_mark_interrupted(oap->oap_request);
2040                 ptlrpcd_wake(oap->oap_request);
2041                 ptlrpc_req_finished(oap->oap_request);
2042                 oap->oap_request = NULL;
2043         }
2044
2045         /*
2046          * page completion may be called only if ->cpo_prep() method was
2047          * executed by osc_io_submit(), that also adds page the to pending list
2048          */
2049         if (!cfs_list_empty(&oap->oap_pending_item)) {
2050                 cfs_list_del_init(&oap->oap_pending_item);
2051                 cfs_list_del_init(&oap->oap_urgent_item);
2052
2053                 loi = oap->oap_loi;
2054                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2055                         &loi->loi_write_lop : &loi->loi_read_lop;
2056                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2057                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2058                 rc = oap->oap_caller_ops->ap_completion(env,
2059                                           oap->oap_caller_data,
2060                                           oap->oap_cmd, NULL, -EINTR);
2061         }
2062
2063         RETURN(rc);
2064 }
2065
2066 /* this is trying to propogate async writeback errors back up to the
2067  * application.  As an async write fails we record the error code for later if
2068  * the app does an fsync.  As long as errors persist we force future rpcs to be
2069  * sync so that the app can get a sync error and break the cycle of queueing
2070  * pages for which writeback will fail. */
2071 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2072                            int rc)
2073 {
2074         if (rc) {
2075                 if (!ar->ar_rc)
2076                         ar->ar_rc = rc;
2077
2078                 ar->ar_force_sync = 1;
2079                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2080                 return;
2081
2082         }
2083
2084         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2085                 ar->ar_force_sync = 0;
2086 }
2087
2088 void osc_oap_to_pending(struct osc_async_page *oap)
2089 {
2090         struct loi_oap_pages *lop;
2091
2092         if (oap->oap_cmd & OBD_BRW_WRITE)
2093                 lop = &oap->oap_loi->loi_write_lop;
2094         else
2095                 lop = &oap->oap_loi->loi_read_lop;
2096
2097         if (oap->oap_async_flags & ASYNC_HP)
2098                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2099         else if (oap->oap_async_flags & ASYNC_URGENT)
2100                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2101         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2102         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2103 }
2104
2105 /* this must be called holding the loi list lock to give coverage to exit_cache,
2106  * async_flag maintenance, and oap_request */
2107 static void osc_ap_completion(const struct lu_env *env,
2108                               struct client_obd *cli, struct obdo *oa,
2109                               struct osc_async_page *oap, int sent, int rc)
2110 {
2111         __u64 xid = 0;
2112
2113         ENTRY;
2114         if (oap->oap_request != NULL) {
2115                 xid = ptlrpc_req_xid(oap->oap_request);
2116                 ptlrpc_req_finished(oap->oap_request);
2117                 oap->oap_request = NULL;
2118         }
2119
2120         cfs_spin_lock(&oap->oap_lock);
2121         oap->oap_async_flags = 0;
2122         cfs_spin_unlock(&oap->oap_lock);
2123         oap->oap_interrupted = 0;
2124
2125         if (oap->oap_cmd & OBD_BRW_WRITE) {
2126                 osc_process_ar(&cli->cl_ar, xid, rc);
2127                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2128         }
2129
2130         if (rc == 0 && oa != NULL) {
2131                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2132                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2133                 if (oa->o_valid & OBD_MD_FLMTIME)
2134                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2135                 if (oa->o_valid & OBD_MD_FLATIME)
2136                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2137                 if (oa->o_valid & OBD_MD_FLCTIME)
2138                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2139         }
2140
2141         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2142                                                 oap->oap_cmd, oa, rc);
2143
2144         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2145          * I/O on the page could start, but OSC calls it under lock
2146          * and thus we can add oap back to pending safely */
2147         if (rc)
2148                 /* upper layer wants to leave the page on pending queue */
2149                 osc_oap_to_pending(oap);
2150         else
2151                 osc_exit_cache(cli, oap, sent);
2152         EXIT;
2153 }
2154
2155 static int brw_interpret(const struct lu_env *env,
2156                          struct ptlrpc_request *req, void *data, int rc)
2157 {
2158         struct osc_brw_async_args *aa = data;
2159         struct client_obd *cli;
2160         int async;
2161         ENTRY;
2162
2163         rc = osc_brw_fini_request(req, rc);
2164         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2165         if (osc_recoverable_error(rc)) {
2166                 rc = osc_brw_redo_request(req, aa);
2167                 if (rc == 0)
2168                         RETURN(0);
2169         }
2170
2171         if (aa->aa_ocapa) {
2172                 capa_put(aa->aa_ocapa);
2173                 aa->aa_ocapa = NULL;
2174         }
2175
2176         cli = aa->aa_cli;
2177
2178         client_obd_list_lock(&cli->cl_loi_list_lock);
2179
2180         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2181          * is called so we know whether to go to sync BRWs or wait for more
2182          * RPCs to complete */
2183         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2184                 cli->cl_w_in_flight--;
2185         else
2186                 cli->cl_r_in_flight--;
2187
2188         async = cfs_list_empty(&aa->aa_oaps);
2189         if (!async) { /* from osc_send_oap_rpc() */
2190                 struct osc_async_page *oap, *tmp;
2191                 /* the caller may re-use the oap after the completion call so
2192                  * we need to clean it up a little */
2193                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2194                                              oap_rpc_item) {
2195                         cfs_list_del_init(&oap->oap_rpc_item);
2196                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2197                 }
2198                 OBDO_FREE(aa->aa_oa);
2199         } else { /* from async_internal() */
2200                 int i;
2201                 for (i = 0; i < aa->aa_page_count; i++)
2202                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2203
2204                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2205                         OBDO_FREE(aa->aa_oa);
2206         }
2207         osc_wake_cache_waiters(cli);
2208         osc_check_rpcs(env, cli);
2209         client_obd_list_unlock(&cli->cl_loi_list_lock);
2210         if (!async)
2211                 cl_req_completion(env, aa->aa_clerq, rc);
2212         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2213         RETURN(rc);
2214 }
2215
2216 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2217                                             struct client_obd *cli,
2218                                             cfs_list_t *rpc_list,
2219                                             int page_count, int cmd)
2220 {
2221         struct ptlrpc_request *req;
2222         struct brw_page **pga = NULL;
2223         struct osc_brw_async_args *aa;
2224         struct obdo *oa = NULL;
2225         const struct obd_async_page_ops *ops = NULL;
2226         void *caller_data = NULL;
2227         struct osc_async_page *oap;
2228         struct osc_async_page *tmp;
2229         struct ost_body *body;
2230         struct cl_req *clerq = NULL;
2231         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2232         struct ldlm_lock *lock = NULL;
2233         struct cl_req_attr crattr;
2234         int i, rc;
2235
2236         ENTRY;
2237         LASSERT(!cfs_list_empty(rpc_list));
2238
2239         memset(&crattr, 0, sizeof crattr);
2240         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2241         if (pga == NULL)
2242                 GOTO(out, req = ERR_PTR(-ENOMEM));
2243
2244         OBDO_ALLOC(oa);
2245         if (oa == NULL)
2246                 GOTO(out, req = ERR_PTR(-ENOMEM));
2247
2248         i = 0;
2249         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2250                 struct cl_page *page = osc_oap2cl_page(oap);
2251                 if (ops == NULL) {
2252                         ops = oap->oap_caller_ops;
2253                         caller_data = oap->oap_caller_data;
2254
2255                         clerq = cl_req_alloc(env, page, crt,
2256                                              1 /* only 1-object rpcs for
2257                                                 * now */);
2258                         if (IS_ERR(clerq))
2259                                 GOTO(out, req = (void *)clerq);
2260                         lock = oap->oap_ldlm_lock;
2261                 }
2262                 pga[i] = &oap->oap_brw_page;
2263                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2264                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2265                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2266                 i++;
2267                 cl_req_page_add(env, clerq, page);
2268         }
2269
2270         /* always get the data for the obdo for the rpc */
2271         LASSERT(ops != NULL);
2272         crattr.cra_oa = oa;
2273         crattr.cra_capa = NULL;
2274         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2275         if (lock) {
2276                 oa->o_handle = lock->l_remote_handle;
2277                 oa->o_valid |= OBD_MD_FLHANDLE;
2278         }
2279
2280         rc = cl_req_prep(env, clerq);
2281         if (rc != 0) {
2282                 CERROR("cl_req_prep failed: %d\n", rc);
2283                 GOTO(out, req = ERR_PTR(rc));
2284         }
2285
2286         sort_brw_pages(pga, page_count);
2287         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2288                                   pga, &req, crattr.cra_capa, 1);
2289         if (rc != 0) {
2290                 CERROR("prep_req failed: %d\n", rc);
2291                 GOTO(out, req = ERR_PTR(rc));
2292         }
2293
2294         /* Need to update the timestamps after the request is built in case
2295          * we race with setattr (locally or in queue at OST).  If OST gets
2296          * later setattr before earlier BRW (as determined by the request xid),
2297          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2298          * way to do this in a single call.  bug 10150 */
2299         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2300         cl_req_attr_set(env, clerq, &crattr,
2301                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2302
2303         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2304         aa = ptlrpc_req_async_args(req);
2305         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2306         cfs_list_splice(rpc_list, &aa->aa_oaps);
2307         CFS_INIT_LIST_HEAD(rpc_list);
2308         aa->aa_clerq = clerq;
2309 out:
2310         capa_put(crattr.cra_capa);
2311         if (IS_ERR(req)) {
2312                 if (oa)
2313                         OBDO_FREE(oa);
2314                 if (pga)
2315                         OBD_FREE(pga, sizeof(*pga) * page_count);
2316                 /* this should happen rarely and is pretty bad, it makes the
2317                  * pending list not follow the dirty order */
2318                 client_obd_list_lock(&cli->cl_loi_list_lock);
2319                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2320                         cfs_list_del_init(&oap->oap_rpc_item);
2321
2322                         /* queued sync pages can be torn down while the pages
2323                          * were between the pending list and the rpc */
2324                         if (oap->oap_interrupted) {
2325                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2326                                 osc_ap_completion(env, cli, NULL, oap, 0,
2327                                                   oap->oap_count);
2328                                 continue;
2329                         }
2330                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2331                 }
2332                 if (clerq && !IS_ERR(clerq))
2333                         cl_req_completion(env, clerq, PTR_ERR(req));
2334         }
2335         RETURN(req);
2336 }
2337
2338 /**
2339  * prepare pages for ASYNC io and put pages in send queue.
2340  *
2341  * \param cmd OBD_BRW_* macroses
2342  * \param lop pending pages
2343  *
2344  * \return zero if pages successfully add to send queue.
2345  * \return not zere if error occurring.
2346  */
2347 static int
2348 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2349                  struct lov_oinfo *loi,
2350                  int cmd, struct loi_oap_pages *lop)
2351 {
2352         struct ptlrpc_request *req;
2353         obd_count page_count = 0;
2354         struct osc_async_page *oap = NULL, *tmp;
2355         struct osc_brw_async_args *aa;
2356         const struct obd_async_page_ops *ops;
2357         CFS_LIST_HEAD(rpc_list);
2358         CFS_LIST_HEAD(tmp_list);
2359         unsigned int ending_offset;
2360         unsigned  starting_offset = 0;
2361         int srvlock = 0;
2362         struct cl_object *clob = NULL;
2363         ENTRY;
2364
2365         /* ASYNC_HP pages first. At present, when the lock the pages is
2366          * to be canceled, the pages covered by the lock will be sent out
2367          * with ASYNC_HP. We have to send out them as soon as possible. */
2368         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2369                 if (oap->oap_async_flags & ASYNC_HP) 
2370                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2371                 else
2372                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2373                 if (++page_count >= cli->cl_max_pages_per_rpc)
2374                         break;
2375         }
2376
2377         cfs_list_splice(&tmp_list, &lop->lop_pending);
2378         page_count = 0;
2379
2380         /* first we find the pages we're allowed to work with */
2381         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2382                                      oap_pending_item) {
2383                 ops = oap->oap_caller_ops;
2384
2385                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2386                          "magic 0x%x\n", oap, oap->oap_magic);
2387
2388                 if (clob == NULL) {
2389                         /* pin object in memory, so that completion call-backs
2390                          * can be safely called under client_obd_list lock. */
2391                         clob = osc_oap2cl_page(oap)->cp_obj;
2392                         cl_object_get(clob);
2393                 }
2394
2395                 if (page_count != 0 &&
2396                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2397                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2398                                " oap %p, page %p, srvlock %u\n",
2399                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2400                         break;
2401                 }
2402
2403                 /* If there is a gap at the start of this page, it can't merge
2404                  * with any previous page, so we'll hand the network a
2405                  * "fragmented" page array that it can't transfer in 1 RDMA */
2406                 if (page_count != 0 && oap->oap_page_off != 0)
2407                         break;
2408
2409                 /* in llite being 'ready' equates to the page being locked
2410                  * until completion unlocks it.  commit_write submits a page
2411                  * as not ready because its unlock will happen unconditionally
2412                  * as the call returns.  if we race with commit_write giving
2413                  * us that page we dont' want to create a hole in the page
2414                  * stream, so we stop and leave the rpc to be fired by
2415                  * another dirtier or kupdated interval (the not ready page
2416                  * will still be on the dirty list).  we could call in
2417                  * at the end of ll_file_write to process the queue again. */
2418                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2419                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2420                                                     cmd);
2421                         if (rc < 0)
2422                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2423                                                 "instead of ready\n", oap,
2424                                                 oap->oap_page, rc);
2425                         switch (rc) {
2426                         case -EAGAIN:
2427                                 /* llite is telling us that the page is still
2428                                  * in commit_write and that we should try
2429                                  * and put it in an rpc again later.  we
2430                                  * break out of the loop so we don't create
2431                                  * a hole in the sequence of pages in the rpc
2432                                  * stream.*/
2433                                 oap = NULL;
2434                                 break;
2435                         case -EINTR:
2436                                 /* the io isn't needed.. tell the checks
2437                                  * below to complete the rpc with EINTR */
2438                                 cfs_spin_lock(&oap->oap_lock);
2439                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2440                                 cfs_spin_unlock(&oap->oap_lock);
2441                                 oap->oap_count = -EINTR;
2442                                 break;
2443                         case 0:
2444                                 cfs_spin_lock(&oap->oap_lock);
2445                                 oap->oap_async_flags |= ASYNC_READY;
2446                                 cfs_spin_unlock(&oap->oap_lock);
2447                                 break;
2448                         default:
2449                                 LASSERTF(0, "oap %p page %p returned %d "
2450                                             "from make_ready\n", oap,
2451                                             oap->oap_page, rc);
2452                                 break;
2453                         }
2454                 }
2455                 if (oap == NULL)
2456                         break;
2457                 /*
2458                  * Page submitted for IO has to be locked. Either by
2459                  * ->ap_make_ready() or by higher layers.
2460                  */
2461 #if defined(__KERNEL__) && defined(__linux__)
2462                 {
2463                         struct cl_page *page;
2464
2465                         page = osc_oap2cl_page(oap);
2466
2467                         if (page->cp_type == CPT_CACHEABLE &&
2468                             !(PageLocked(oap->oap_page) &&
2469                               (CheckWriteback(oap->oap_page, cmd)))) {
2470                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2471                                        oap->oap_page,
2472                                        (long)oap->oap_page->flags,
2473                                        oap->oap_async_flags);
2474                                 LBUG();
2475                         }
2476                 }
2477 #endif
2478
2479                 /* take the page out of our book-keeping */
2480                 cfs_list_del_init(&oap->oap_pending_item);
2481                 lop_update_pending(cli, lop, cmd, -1);
2482                 cfs_list_del_init(&oap->oap_urgent_item);
2483
2484                 if (page_count == 0)
2485                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2486                                           (PTLRPC_MAX_BRW_SIZE - 1);
2487
2488                 /* ask the caller for the size of the io as the rpc leaves. */
2489                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2490                         oap->oap_count =
2491                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2492                                                       cmd);
2493                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2494                 }
2495                 if (oap->oap_count <= 0) {
2496                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2497                                oap->oap_count);
2498                         osc_ap_completion(env, cli, NULL,
2499                                           oap, 0, oap->oap_count);
2500                         continue;
2501                 }
2502
2503                 /* now put the page back in our accounting */
2504                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2505                 if (page_count == 0)
2506                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2507                 if (++page_count >= cli->cl_max_pages_per_rpc)
2508                         break;
2509
2510                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2511                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2512                  * have the same alignment as the initial writes that allocated
2513                  * extents on the server. */
2514                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2515                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2516                 if (ending_offset == 0)
2517                         break;
2518
2519                 /* If there is a gap at the end of this page, it can't merge
2520                  * with any subsequent pages, so we'll hand the network a
2521                  * "fragmented" page array that it can't transfer in 1 RDMA */
2522                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2523                         break;
2524         }
2525
2526         osc_wake_cache_waiters(cli);
2527
2528         loi_list_maint(cli, loi);
2529
2530         client_obd_list_unlock(&cli->cl_loi_list_lock);
2531
2532         if (clob != NULL)
2533                 cl_object_put(env, clob);
2534
2535         if (page_count == 0) {
2536                 client_obd_list_lock(&cli->cl_loi_list_lock);
2537                 RETURN(0);
2538         }
2539
2540         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2541         if (IS_ERR(req)) {
2542                 LASSERT(cfs_list_empty(&rpc_list));
2543                 loi_list_maint(cli, loi);
2544                 RETURN(PTR_ERR(req));
2545         }
2546
2547         aa = ptlrpc_req_async_args(req);
2548
2549         if (cmd == OBD_BRW_READ) {
2550                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2551                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2552                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2553                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2554         } else {
2555                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2556                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2557                                  cli->cl_w_in_flight);
2558                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2559                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2560         }
2561         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2562
2563         client_obd_list_lock(&cli->cl_loi_list_lock);
2564
2565         if (cmd == OBD_BRW_READ)
2566                 cli->cl_r_in_flight++;
2567         else
2568                 cli->cl_w_in_flight++;
2569
2570         /* queued sync pages can be torn down while the pages
2571          * were between the pending list and the rpc */
2572         tmp = NULL;
2573         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2574                 /* only one oap gets a request reference */
2575                 if (tmp == NULL)
2576                         tmp = oap;
2577                 if (oap->oap_interrupted && !req->rq_intr) {
2578                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2579                                oap, req);
2580                         ptlrpc_mark_interrupted(req);
2581                 }
2582         }
2583         if (tmp != NULL)
2584                 tmp->oap_request = ptlrpc_request_addref(req);
2585
2586         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2587                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2588
2589         req->rq_interpret_reply = brw_interpret;
2590         ptlrpcd_add_req(req, PSCOPE_BRW);
2591         RETURN(1);
2592 }
2593
2594 #define LOI_DEBUG(LOI, STR, args...)                                     \
2595         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2596                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2597                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2598                (LOI)->loi_write_lop.lop_num_pending,                     \
2599                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2600                (LOI)->loi_read_lop.lop_num_pending,                      \
2601                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2602                args)                                                     \
2603
2604 /* This is called by osc_check_rpcs() to find which objects have pages that
2605  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2606 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2607 {
2608         ENTRY;
2609
2610         /* First return objects that have blocked locks so that they
2611          * will be flushed quickly and other clients can get the lock,
2612          * then objects which have pages ready to be stuffed into RPCs */
2613         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2614                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2615                                       struct lov_oinfo, loi_hp_ready_item));
2616         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2617                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2618                                       struct lov_oinfo, loi_ready_item));
2619
2620         /* then if we have cache waiters, return all objects with queued
2621          * writes.  This is especially important when many small files
2622          * have filled up the cache and not been fired into rpcs because
2623          * they don't pass the nr_pending/object threshhold */
2624         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2625             !cfs_list_empty(&cli->cl_loi_write_list))
2626                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2627                                       struct lov_oinfo, loi_write_item));
2628
2629         /* then return all queued objects when we have an invalid import
2630          * so that they get flushed */
2631         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2632                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2633                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2634                                               struct lov_oinfo,
2635                                               loi_write_item));
2636                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2637                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2638                                               struct lov_oinfo, loi_read_item));
2639         }
2640         RETURN(NULL);
2641 }
2642
2643 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2644 {
2645         struct osc_async_page *oap;
2646         int hprpc = 0;
2647
2648         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2649                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2650                                      struct osc_async_page, oap_urgent_item);
2651                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2652         }
2653
2654         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2655                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2656                                      struct osc_async_page, oap_urgent_item);
2657                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2658         }
2659
2660         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2661 }
2662
2663 /* called with the loi list lock held */
2664 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2665 {
2666         struct lov_oinfo *loi;
2667         int rc = 0, race_counter = 0;
2668         ENTRY;
2669
2670         while ((loi = osc_next_loi(cli)) != NULL) {
2671                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2672
2673                 if (osc_max_rpc_in_flight(cli, loi))
2674                         break;
2675
2676                 /* attempt some read/write balancing by alternating between
2677                  * reads and writes in an object.  The makes_rpc checks here
2678                  * would be redundant if we were getting read/write work items
2679                  * instead of objects.  we don't want send_oap_rpc to drain a
2680                  * partial read pending queue when we're given this object to
2681                  * do io on writes while there are cache waiters */
2682                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2683                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2684                                               &loi->loi_write_lop);
2685                         if (rc < 0) {
2686                                 CERROR("Write request failed with %d\n", rc);
2687
2688                                 /* osc_send_oap_rpc failed, mostly because of
2689                                  * memory pressure.
2690                                  *
2691                                  * It can't break here, because if:
2692                                  *  - a page was submitted by osc_io_submit, so
2693                                  *    page locked;
2694                                  *  - no request in flight
2695                                  *  - no subsequent request
2696                                  * The system will be in live-lock state,
2697                                  * because there is no chance to call
2698                                  * osc_io_unplug() and osc_check_rpcs() any
2699                                  * more. pdflush can't help in this case,
2700                                  * because it might be blocked at grabbing
2701                                  * the page lock as we mentioned.
2702                                  *
2703                                  * Anyway, continue to drain pages. */
2704                                 /* break; */
2705                         }
2706
2707                         if (rc > 0)
2708                                 race_counter = 0;
2709                         else
2710                                 race_counter++;
2711                 }
2712                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2713                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2714                                               &loi->loi_read_lop);
2715                         if (rc < 0)
2716                                 CERROR("Read request failed with %d\n", rc);
2717
2718                         if (rc > 0)
2719                                 race_counter = 0;
2720                         else
2721                                 race_counter++;
2722                 }
2723
2724                 /* attempt some inter-object balancing by issueing rpcs
2725                  * for each object in turn */
2726                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2727                         cfs_list_del_init(&loi->loi_hp_ready_item);
2728                 if (!cfs_list_empty(&loi->loi_ready_item))
2729                         cfs_list_del_init(&loi->loi_ready_item);
2730                 if (!cfs_list_empty(&loi->loi_write_item))
2731                         cfs_list_del_init(&loi->loi_write_item);
2732                 if (!cfs_list_empty(&loi->loi_read_item))
2733                         cfs_list_del_init(&loi->loi_read_item);
2734
2735                 loi_list_maint(cli, loi);
2736
2737                 /* send_oap_rpc fails with 0 when make_ready tells it to
2738                  * back off.  llite's make_ready does this when it tries
2739                  * to lock a page queued for write that is already locked.
2740                  * we want to try sending rpcs from many objects, but we
2741                  * don't want to spin failing with 0.  */
2742                 if (race_counter == 10)
2743                         break;
2744         }
2745         EXIT;
2746 }
2747
2748 /* we're trying to queue a page in the osc so we're subject to the
2749  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2750  * If the osc's queued pages are already at that limit, then we want to sleep
2751  * until there is space in the osc's queue for us.  We also may be waiting for
2752  * write credits from the OST if there are RPCs in flight that may return some
2753  * before we fall back to sync writes.
2754  *
2755  * We need this know our allocation was granted in the presence of signals */
2756 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2757 {
2758         int rc;
2759         ENTRY;
2760         client_obd_list_lock(&cli->cl_loi_list_lock);
2761         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2762         client_obd_list_unlock(&cli->cl_loi_list_lock);
2763         RETURN(rc);
2764 };
2765
2766 /**
2767  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2768  * is available.
2769  */
2770 int osc_enter_cache_try(const struct lu_env *env,
2771                         struct client_obd *cli, struct lov_oinfo *loi,
2772                         struct osc_async_page *oap, int transient)
2773 {
2774         int has_grant;
2775
2776         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2777         if (has_grant) {
2778                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2779                 if (transient) {
2780                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2781                         cfs_atomic_inc(&obd_dirty_transit_pages);
2782                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2783                 }
2784         }
2785         return has_grant;
2786 }
2787
2788 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2789  * grant or cache space. */
2790 static int osc_enter_cache(const struct lu_env *env,
2791                            struct client_obd *cli, struct lov_oinfo *loi,
2792                            struct osc_async_page *oap)
2793 {
2794         struct osc_cache_waiter ocw;
2795         struct l_wait_info lwi = { 0 };
2796
2797         ENTRY;
2798
2799         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2800                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2801                cli->cl_dirty_max, obd_max_dirty_pages,
2802                cli->cl_lost_grant, cli->cl_avail_grant);
2803
2804         /* force the caller to try sync io.  this can jump the list
2805          * of queued writes and create a discontiguous rpc stream */
2806         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2807             loi->loi_ar.ar_force_sync)
2808                 RETURN(-EDQUOT);
2809
2810         /* Hopefully normal case - cache space and write credits available */
2811         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2812             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2813             osc_enter_cache_try(env, cli, loi, oap, 0))
2814                 RETURN(0);
2815
2816         /* Make sure that there are write rpcs in flight to wait for.  This
2817          * is a little silly as this object may not have any pending but
2818          * other objects sure might. */
2819         if (cli->cl_w_in_flight) {
2820                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2821                 cfs_waitq_init(&ocw.ocw_waitq);
2822                 ocw.ocw_oap = oap;
2823                 ocw.ocw_rc = 0;
2824
2825                 loi_list_maint(cli, loi);
2826                 osc_check_rpcs(env, cli);
2827                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2828
2829                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2830                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2831
2832                 client_obd_list_lock(&cli->cl_loi_list_lock);
2833                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2834                         cfs_list_del(&ocw.ocw_entry);
2835                         RETURN(-EINTR);
2836                 }
2837                 RETURN(ocw.ocw_rc);
2838         }
2839
2840         RETURN(-EDQUOT);
2841 }
2842
2843
2844 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2845                         struct lov_oinfo *loi, cfs_page_t *page,
2846                         obd_off offset, const struct obd_async_page_ops *ops,
2847                         void *data, void **res, int nocache,
2848                         struct lustre_handle *lockh)
2849 {
2850         struct osc_async_page *oap;
2851
2852         ENTRY;
2853
2854         if (!page)
2855                 return cfs_size_round(sizeof(*oap));
2856
2857         oap = *res;
2858         oap->oap_magic = OAP_MAGIC;
2859         oap->oap_cli = &exp->exp_obd->u.cli;
2860         oap->oap_loi = loi;
2861
2862         oap->oap_caller_ops = ops;
2863         oap->oap_caller_data = data;
2864
2865         oap->oap_page = page;
2866         oap->oap_obj_off = offset;
2867         if (!client_is_remote(exp) &&
2868             cfs_capable(CFS_CAP_SYS_RESOURCE))
2869                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2870
2871         LASSERT(!(offset & ~CFS_PAGE_MASK));
2872
2873         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2874         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2875         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2876         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2877
2878         cfs_spin_lock_init(&oap->oap_lock);
2879         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2880         RETURN(0);
2881 }
2882
2883 struct osc_async_page *oap_from_cookie(void *cookie)
2884 {
2885         struct osc_async_page *oap = cookie;
2886         if (oap->oap_magic != OAP_MAGIC)
2887                 return ERR_PTR(-EINVAL);
2888         return oap;
2889 };
2890
2891 int osc_queue_async_io(const struct lu_env *env,
2892                        struct obd_export *exp, struct lov_stripe_md *lsm,
2893                        struct lov_oinfo *loi, void *cookie,
2894                        int cmd, obd_off off, int count,
2895                        obd_flag brw_flags, enum async_flags async_flags)
2896 {
2897         struct client_obd *cli = &exp->exp_obd->u.cli;
2898         struct osc_async_page *oap;
2899         int rc = 0;
2900         ENTRY;
2901
2902         oap = oap_from_cookie(cookie);
2903         if (IS_ERR(oap))
2904                 RETURN(PTR_ERR(oap));
2905
2906         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2907                 RETURN(-EIO);
2908
2909         if (!cfs_list_empty(&oap->oap_pending_item) ||
2910             !cfs_list_empty(&oap->oap_urgent_item) ||
2911             !cfs_list_empty(&oap->oap_rpc_item))
2912                 RETURN(-EBUSY);
2913
2914         /* check if the file's owner/group is over quota */
2915         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2916                 struct cl_object *obj;
2917                 struct cl_attr    attr; /* XXX put attr into thread info */
2918                 unsigned int qid[MAXQUOTAS];
2919
2920                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2921
2922                 cl_object_attr_lock(obj);
2923                 rc = cl_object_attr_get(env, obj, &attr);
2924                 cl_object_attr_unlock(obj);
2925
2926                 qid[USRQUOTA] = attr.cat_uid;
2927                 qid[GRPQUOTA] = attr.cat_gid;
2928                 if (rc == 0 &&
2929                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2930                         rc = -EDQUOT;
2931                 if (rc)
2932                         RETURN(rc);
2933         }
2934
2935         if (loi == NULL)
2936                 loi = lsm->lsm_oinfo[0];
2937
2938         client_obd_list_lock(&cli->cl_loi_list_lock);
2939
2940         LASSERT(off + count <= CFS_PAGE_SIZE);
2941         oap->oap_cmd = cmd;
2942         oap->oap_page_off = off;
2943         oap->oap_count = count;
2944         oap->oap_brw_flags = brw_flags;
2945         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2946         if (libcfs_memory_pressure_get())
2947                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2948         cfs_spin_lock(&oap->oap_lock);
2949         oap->oap_async_flags = async_flags;
2950         cfs_spin_unlock(&oap->oap_lock);
2951
2952         if (cmd & OBD_BRW_WRITE) {
2953                 rc = osc_enter_cache(env, cli, loi, oap);
2954                 if (rc) {
2955                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2956                         RETURN(rc);
2957                 }
2958         }
2959
2960         osc_oap_to_pending(oap);
2961         loi_list_maint(cli, loi);
2962
2963         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2964                   cmd);
2965
2966         osc_check_rpcs(env, cli);
2967         client_obd_list_unlock(&cli->cl_loi_list_lock);
2968
2969         RETURN(0);
2970 }
2971
2972 /* aka (~was & now & flag), but this is more clear :) */
2973 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2974
2975 int osc_set_async_flags_base(struct client_obd *cli,
2976                              struct lov_oinfo *loi, struct osc_async_page *oap,
2977                              obd_flag async_flags)
2978 {
2979         struct loi_oap_pages *lop;
2980         int flags = 0;
2981         ENTRY;
2982
2983         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
2984
2985         if (oap->oap_cmd & OBD_BRW_WRITE) {
2986                 lop = &loi->loi_write_lop;
2987         } else {
2988                 lop = &loi->loi_read_lop;
2989         }
2990
2991         if ((oap->oap_async_flags & async_flags) == async_flags)
2992                 RETURN(0);
2993
2994         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2995                 flags |= ASYNC_READY;
2996
2997         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2998             cfs_list_empty(&oap->oap_rpc_item)) {
2999                 if (oap->oap_async_flags & ASYNC_HP)
3000                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3001                 else
3002                         cfs_list_add_tail(&oap->oap_urgent_item,
3003                                           &lop->lop_urgent);
3004                 flags |= ASYNC_URGENT;
3005                 loi_list_maint(cli, loi);
3006         }
3007         cfs_spin_lock(&oap->oap_lock);
3008         oap->oap_async_flags |= flags;
3009         cfs_spin_unlock(&oap->oap_lock);
3010
3011         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3012                         oap->oap_async_flags);
3013         RETURN(0);
3014 }
3015
3016 int osc_teardown_async_page(struct obd_export *exp,
3017                             struct lov_stripe_md *lsm,
3018                             struct lov_oinfo *loi, void *cookie)
3019 {
3020         struct client_obd *cli = &exp->exp_obd->u.cli;
3021         struct loi_oap_pages *lop;
3022         struct osc_async_page *oap;
3023         int rc = 0;
3024         ENTRY;
3025
3026         oap = oap_from_cookie(cookie);
3027         if (IS_ERR(oap))
3028                 RETURN(PTR_ERR(oap));
3029
3030         if (loi == NULL)
3031                 loi = lsm->lsm_oinfo[0];
3032
3033         if (oap->oap_cmd & OBD_BRW_WRITE) {
3034                 lop = &loi->loi_write_lop;
3035         } else {
3036                 lop = &loi->loi_read_lop;
3037         }
3038
3039         client_obd_list_lock(&cli->cl_loi_list_lock);
3040
3041         if (!cfs_list_empty(&oap->oap_rpc_item))
3042                 GOTO(out, rc = -EBUSY);
3043
3044         osc_exit_cache(cli, oap, 0);
3045         osc_wake_cache_waiters(cli);
3046
3047         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3048                 cfs_list_del_init(&oap->oap_urgent_item);
3049                 cfs_spin_lock(&oap->oap_lock);
3050                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3051                 cfs_spin_unlock(&oap->oap_lock);
3052         }
3053         if (!cfs_list_empty(&oap->oap_pending_item)) {
3054                 cfs_list_del_init(&oap->oap_pending_item);
3055                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3056         }
3057         loi_list_maint(cli, loi);
3058         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3059 out:
3060         client_obd_list_unlock(&cli->cl_loi_list_lock);
3061         RETURN(rc);
3062 }
3063
3064 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3065                                          struct ldlm_enqueue_info *einfo,
3066                                          int flags)
3067 {
3068         void *data = einfo->ei_cbdata;
3069
3070         LASSERT(lock != NULL);
3071         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3072         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3073         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3074         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3075
3076         lock_res_and_lock(lock);
3077         cfs_spin_lock(&osc_ast_guard);
3078         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3079         lock->l_ast_data = data;
3080         cfs_spin_unlock(&osc_ast_guard);
3081         unlock_res_and_lock(lock);
3082 }
3083
3084 static void osc_set_data_with_check(struct lustre_handle *lockh,
3085                                     struct ldlm_enqueue_info *einfo,
3086                                     int flags)
3087 {
3088         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3089
3090         if (lock != NULL) {
3091                 osc_set_lock_data_with_check(lock, einfo, flags);
3092                 LDLM_LOCK_PUT(lock);
3093         } else
3094                 CERROR("lockh %p, data %p - client evicted?\n",
3095                        lockh, einfo->ei_cbdata);
3096 }
3097
3098 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3099                              ldlm_iterator_t replace, void *data)
3100 {
3101         struct ldlm_res_id res_id;
3102         struct obd_device *obd = class_exp2obd(exp);
3103
3104         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3105         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3106         return 0;
3107 }
3108
3109 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3110                             obd_enqueue_update_f upcall, void *cookie,
3111                             int *flags, int rc)
3112 {
3113         int intent = *flags & LDLM_FL_HAS_INTENT;
3114         ENTRY;
3115
3116         if (intent) {
3117                 /* The request was created before ldlm_cli_enqueue call. */
3118                 if (rc == ELDLM_LOCK_ABORTED) {
3119                         struct ldlm_reply *rep;
3120                         rep = req_capsule_server_get(&req->rq_pill,
3121                                                      &RMF_DLM_REP);
3122
3123                         LASSERT(rep != NULL);
3124                         if (rep->lock_policy_res1)
3125                                 rc = rep->lock_policy_res1;
3126                 }
3127         }
3128
3129         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3130                 *flags |= LDLM_FL_LVB_READY;
3131                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3132                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3133         }
3134
3135         /* Call the update callback. */
3136         rc = (*upcall)(cookie, rc);
3137         RETURN(rc);
3138 }
3139
3140 static int osc_enqueue_interpret(const struct lu_env *env,
3141                                  struct ptlrpc_request *req,
3142                                  struct osc_enqueue_args *aa, int rc)
3143 {
3144         struct ldlm_lock *lock;
3145         struct lustre_handle handle;
3146         __u32 mode;
3147
3148         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3149          * might be freed anytime after lock upcall has been called. */
3150         lustre_handle_copy(&handle, aa->oa_lockh);
3151         mode = aa->oa_ei->ei_mode;
3152
3153         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3154          * be valid. */
3155         lock = ldlm_handle2lock(&handle);
3156
3157         /* Take an additional reference so that a blocking AST that
3158          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3159          * to arrive after an upcall has been executed by
3160          * osc_enqueue_fini(). */
3161         ldlm_lock_addref(&handle, mode);
3162
3163         /* Complete obtaining the lock procedure. */
3164         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3165                                    mode, aa->oa_flags, aa->oa_lvb,
3166                                    sizeof(*aa->oa_lvb), &handle, rc);
3167         /* Complete osc stuff. */
3168         rc = osc_enqueue_fini(req, aa->oa_lvb,
3169                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3170
3171         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3172
3173         /* Release the lock for async request. */
3174         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3175                 /*
3176                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3177                  * not already released by
3178                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3179                  */
3180                 ldlm_lock_decref(&handle, mode);
3181
3182         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3183                  aa->oa_lockh, req, aa);
3184         ldlm_lock_decref(&handle, mode);
3185         LDLM_LOCK_PUT(lock);
3186         return rc;
3187 }
3188
3189 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3190                         struct lov_oinfo *loi, int flags,
3191                         struct ost_lvb *lvb, __u32 mode, int rc)
3192 {
3193         if (rc == ELDLM_OK) {
3194                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3195                 __u64 tmp;
3196
3197                 LASSERT(lock != NULL);
3198                 loi->loi_lvb = *lvb;
3199                 tmp = loi->loi_lvb.lvb_size;
3200                 /* Extend KMS up to the end of this lock and no further
3201                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3202                 if (tmp > lock->l_policy_data.l_extent.end)
3203                         tmp = lock->l_policy_data.l_extent.end + 1;
3204                 if (tmp >= loi->loi_kms) {
3205                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3206                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3207                         loi_kms_set(loi, tmp);
3208                 } else {
3209                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3210                                    LPU64"; leaving kms="LPU64", end="LPU64,
3211                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3212                                    lock->l_policy_data.l_extent.end);
3213                 }
3214                 ldlm_lock_allow_match(lock);
3215                 LDLM_LOCK_PUT(lock);
3216         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3217                 loi->loi_lvb = *lvb;
3218                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3219                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3220                 rc = ELDLM_OK;
3221         }
3222 }
3223 EXPORT_SYMBOL(osc_update_enqueue);
3224
3225 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3226
3227 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3228  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3229  * other synchronous requests, however keeping some locks and trying to obtain
3230  * others may take a considerable amount of time in a case of ost failure; and
3231  * when other sync requests do not get released lock from a client, the client
3232  * is excluded from the cluster -- such scenarious make the life difficult, so
3233  * release locks just after they are obtained. */
3234 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3235                      int *flags, ldlm_policy_data_t *policy,
3236                      struct ost_lvb *lvb, int kms_valid,
3237                      obd_enqueue_update_f upcall, void *cookie,
3238                      struct ldlm_enqueue_info *einfo,
3239                      struct lustre_handle *lockh,
3240                      struct ptlrpc_request_set *rqset, int async)
3241 {
3242         struct obd_device *obd = exp->exp_obd;
3243         struct ptlrpc_request *req = NULL;
3244         int intent = *flags & LDLM_FL_HAS_INTENT;
3245         ldlm_mode_t mode;
3246         int rc;
3247         ENTRY;
3248
3249         /* Filesystem lock extents are extended to page boundaries so that
3250          * dealing with the page cache is a little smoother.  */
3251         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3252         policy->l_extent.end |= ~CFS_PAGE_MASK;
3253
3254         /*
3255          * kms is not valid when either object is completely fresh (so that no
3256          * locks are cached), or object was evicted. In the latter case cached
3257          * lock cannot be used, because it would prime inode state with
3258          * potentially stale LVB.
3259          */
3260         if (!kms_valid)
3261                 goto no_match;
3262
3263         /* Next, search for already existing extent locks that will cover us */
3264         /* If we're trying to read, we also search for an existing PW lock.  The
3265          * VFS and page cache already protect us locally, so lots of readers/
3266          * writers can share a single PW lock.
3267          *
3268          * There are problems with conversion deadlocks, so instead of
3269          * converting a read lock to a write lock, we'll just enqueue a new
3270          * one.
3271          *
3272          * At some point we should cancel the read lock instead of making them
3273          * send us a blocking callback, but there are problems with canceling
3274          * locks out from other users right now, too. */
3275         mode = einfo->ei_mode;
3276         if (einfo->ei_mode == LCK_PR)
3277                 mode |= LCK_PW;
3278         mode = ldlm_lock_match(obd->obd_namespace,
3279                                *flags | LDLM_FL_LVB_READY, res_id,
3280                                einfo->ei_type, policy, mode, lockh, 0);
3281         if (mode) {
3282                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3283
3284                 if (matched->l_ast_data == NULL ||
3285                     matched->l_ast_data == einfo->ei_cbdata) {
3286                         /* addref the lock only if not async requests and PW
3287                          * lock is matched whereas we asked for PR. */
3288                         if (!rqset && einfo->ei_mode != mode)
3289                                 ldlm_lock_addref(lockh, LCK_PR);
3290                         osc_set_lock_data_with_check(matched, einfo, *flags);
3291                         if (intent) {
3292                                 /* I would like to be able to ASSERT here that
3293                                  * rss <= kms, but I can't, for reasons which
3294                                  * are explained in lov_enqueue() */
3295                         }
3296
3297                         /* We already have a lock, and it's referenced */
3298                         (*upcall)(cookie, ELDLM_OK);
3299
3300                         /* For async requests, decref the lock. */
3301                         if (einfo->ei_mode != mode)
3302                                 ldlm_lock_decref(lockh, LCK_PW);
3303                         else if (rqset)
3304                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3305                         LDLM_LOCK_PUT(matched);
3306                         RETURN(ELDLM_OK);
3307                 } else
3308                         ldlm_lock_decref(lockh, mode);
3309                 LDLM_LOCK_PUT(matched);
3310         }
3311
3312  no_match:
3313         if (intent) {
3314                 CFS_LIST_HEAD(cancels);
3315                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3316                                            &RQF_LDLM_ENQUEUE_LVB);
3317                 if (req == NULL)
3318                         RETURN(-ENOMEM);
3319
3320                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3321                 if (rc)
3322                         RETURN(rc);
3323
3324                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3325                                      sizeof *lvb);
3326                 ptlrpc_request_set_replen(req);
3327         }
3328
3329         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3330         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3331
3332         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3333                               sizeof(*lvb), lockh, async);
3334         if (rqset) {
3335                 if (!rc) {
3336                         struct osc_enqueue_args *aa;
3337                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3338                         aa = ptlrpc_req_async_args(req);
3339                         aa->oa_ei = einfo;
3340                         aa->oa_exp = exp;
3341                         aa->oa_flags  = flags;
3342                         aa->oa_upcall = upcall;
3343                         aa->oa_cookie = cookie;
3344                         aa->oa_lvb    = lvb;
3345                         aa->oa_lockh  = lockh;
3346
3347                         req->rq_interpret_reply =
3348                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3349                         if (rqset == PTLRPCD_SET)
3350                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3351                         else
3352                                 ptlrpc_set_add_req(rqset, req);
3353                 } else if (intent) {
3354                         ptlrpc_req_finished(req);
3355                 }
3356                 RETURN(rc);
3357         }
3358
3359         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3360         if (intent)
3361                 ptlrpc_req_finished(req);
3362
3363         RETURN(rc);
3364 }
3365
3366 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3367                        struct ldlm_enqueue_info *einfo,
3368                        struct ptlrpc_request_set *rqset)
3369 {
3370         struct ldlm_res_id res_id;
3371         int rc;
3372         ENTRY;
3373
3374         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3375                            oinfo->oi_md->lsm_object_gr, &res_id);
3376
3377         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3378                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3379                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3380                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3381                               rqset, rqset != NULL);
3382         RETURN(rc);
3383 }
3384
3385 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3386                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3387                    int *flags, void *data, struct lustre_handle *lockh,
3388                    int unref)
3389 {
3390         struct obd_device *obd = exp->exp_obd;
3391         int lflags = *flags;
3392         ldlm_mode_t rc;
3393         ENTRY;
3394
3395         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3396                 RETURN(-EIO);
3397
3398         /* Filesystem lock extents are extended to page boundaries so that
3399          * dealing with the page cache is a little smoother */
3400         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3401         policy->l_extent.end |= ~CFS_PAGE_MASK;
3402
3403         /* Next, search for already existing extent locks that will cover us */
3404         /* If we're trying to read, we also search for an existing PW lock.  The
3405          * VFS and page cache already protect us locally, so lots of readers/
3406          * writers can share a single PW lock. */
3407         rc = mode;
3408         if (mode == LCK_PR)
3409                 rc |= LCK_PW;
3410         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3411                              res_id, type, policy, rc, lockh, unref);
3412         if (rc) {
3413                 if (data != NULL)
3414                         osc_set_data_with_check(lockh, data, lflags);
3415                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3416                         ldlm_lock_addref(lockh, LCK_PR);
3417                         ldlm_lock_decref(lockh, LCK_PW);
3418                 }
3419                 RETURN(rc);
3420         }
3421         RETURN(rc);
3422 }
3423
3424 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3425 {
3426         ENTRY;
3427
3428         if (unlikely(mode == LCK_GROUP))
3429                 ldlm_lock_decref_and_cancel(lockh, mode);
3430         else
3431                 ldlm_lock_decref(lockh, mode);
3432
3433         RETURN(0);
3434 }
3435
3436 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3437                       __u32 mode, struct lustre_handle *lockh)
3438 {
3439         ENTRY;
3440         RETURN(osc_cancel_base(lockh, mode));
3441 }
3442
3443 static int osc_cancel_unused(struct obd_export *exp,
3444                              struct lov_stripe_md *lsm, int flags,
3445                              void *opaque)
3446 {
3447         struct obd_device *obd = class_exp2obd(exp);
3448         struct ldlm_res_id res_id, *resp = NULL;
3449
3450         if (lsm != NULL) {
3451                 resp = osc_build_res_name(lsm->lsm_object_id,
3452                                           lsm->lsm_object_gr, &res_id);
3453         }
3454
3455         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3456 }
3457
3458 static int osc_statfs_interpret(const struct lu_env *env,
3459                                 struct ptlrpc_request *req,
3460                                 struct osc_async_args *aa, int rc)
3461 {
3462         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3463         struct obd_statfs *msfs;
3464         __u64 used;
3465         ENTRY;
3466
3467         if (rc == -EBADR)
3468                 /* The request has in fact never been sent
3469                  * due to issues at a higher level (LOV).
3470                  * Exit immediately since the caller is
3471                  * aware of the problem and takes care
3472                  * of the clean up */
3473                  RETURN(rc);
3474
3475         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3476             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3477                 GOTO(out, rc = 0);
3478
3479         if (rc != 0)
3480                 GOTO(out, rc);
3481
3482         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3483         if (msfs == NULL) {
3484                 GOTO(out, rc = -EPROTO);
3485         }
3486
3487         /* Reinitialize the RDONLY and DEGRADED flags at the client
3488          * on each statfs, so they don't stay set permanently. */
3489         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3490
3491         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3492                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3493         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3494                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3495
3496         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3497                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3498         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3499                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3500
3501         /* Add a bit of hysteresis so this flag isn't continually flapping,
3502          * and ensure that new files don't get extremely fragmented due to
3503          * only a small amount of available space in the filesystem.
3504          * We want to set the NOSPC flag when there is less than ~0.1% free
3505          * and clear it when there is at least ~0.2% free space, so:
3506          *                   avail < ~0.1% max          max = avail + used
3507          *            1025 * avail < avail + used       used = blocks - free
3508          *            1024 * avail < used
3509          *            1024 * avail < blocks - free                      
3510          *                   avail < ((blocks - free) >> 10)    
3511          *
3512          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3513          * lose that amount of space so in those cases we report no space left
3514          * if their is less than 1 GB left.                             */
3515         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3516         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3517                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3518                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3519         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3520                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3521                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3522
3523         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3524
3525         *aa->aa_oi->oi_osfs = *msfs;
3526 out:
3527         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3528         RETURN(rc);
3529 }
3530
3531 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3532                             __u64 max_age, struct ptlrpc_request_set *rqset)
3533 {
3534         struct ptlrpc_request *req;
3535         struct osc_async_args *aa;
3536         int                    rc;
3537         ENTRY;
3538
3539         /* We could possibly pass max_age in the request (as an absolute
3540          * timestamp or a "seconds.usec ago") so the target can avoid doing
3541          * extra calls into the filesystem if that isn't necessary (e.g.
3542          * during mount that would help a bit).  Having relative timestamps
3543          * is not so great if request processing is slow, while absolute
3544          * timestamps are not ideal because they need time synchronization. */
3545         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3546         if (req == NULL)
3547                 RETURN(-ENOMEM);
3548
3549         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3550         if (rc) {
3551                 ptlrpc_request_free(req);
3552                 RETURN(rc);
3553         }
3554         ptlrpc_request_set_replen(req);
3555         req->rq_request_portal = OST_CREATE_PORTAL;
3556         ptlrpc_at_set_req_timeout(req);
3557
3558         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3559                 /* procfs requests not want stat in wait for avoid deadlock */
3560                 req->rq_no_resend = 1;
3561                 req->rq_no_delay = 1;
3562         }
3563
3564         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3565         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3566         aa = ptlrpc_req_async_args(req);
3567         aa->aa_oi = oinfo;
3568
3569         ptlrpc_set_add_req(rqset, req);
3570         RETURN(0);
3571 }
3572
3573 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3574                       __u64 max_age, __u32 flags)
3575 {
3576         struct obd_statfs     *msfs;
3577         struct ptlrpc_request *req;
3578         struct obd_import     *imp = NULL;
3579         int rc;
3580         ENTRY;
3581
3582         /*Since the request might also come from lprocfs, so we need
3583          *sync this with client_disconnect_export Bug15684*/
3584         cfs_down_read(&obd->u.cli.cl_sem);
3585         if (obd->u.cli.cl_import)
3586                 imp = class_import_get(obd->u.cli.cl_import);
3587         cfs_up_read(&obd->u.cli.cl_sem);
3588         if (!imp)
3589                 RETURN(-ENODEV);
3590
3591         /* We could possibly pass max_age in the request (as an absolute
3592          * timestamp or a "seconds.usec ago") so the target can avoid doing
3593          * extra calls into the filesystem if that isn't necessary (e.g.
3594          * during mount that would help a bit).  Having relative timestamps
3595          * is not so great if request processing is slow, while absolute
3596          * timestamps are not ideal because they need time synchronization. */
3597         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3598
3599         class_import_put(imp);
3600
3601         if (req == NULL)
3602                 RETURN(-ENOMEM);
3603
3604         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3605         if (rc) {
3606                 ptlrpc_request_free(req);
3607                 RETURN(rc);
3608         }
3609         ptlrpc_request_set_replen(req);
3610         req->rq_request_portal = OST_CREATE_PORTAL;
3611         ptlrpc_at_set_req_timeout(req);
3612
3613         if (flags & OBD_STATFS_NODELAY) {
3614                 /* procfs requests not want stat in wait for avoid deadlock */
3615                 req->rq_no_resend = 1;
3616                 req->rq_no_delay = 1;
3617         }
3618
3619         rc = ptlrpc_queue_wait(req);
3620         if (rc)
3621                 GOTO(out, rc);
3622
3623         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3624         if (msfs == NULL) {
3625                 GOTO(out, rc = -EPROTO);
3626         }
3627
3628         *osfs = *msfs;
3629
3630         EXIT;
3631  out:
3632         ptlrpc_req_finished(req);
3633         return rc;
3634 }
3635
3636 /* Retrieve object striping information.
3637  *
3638  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3639  * the maximum number of OST indices which will fit in the user buffer.
3640  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3641  */
3642 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3643 {
3644         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3645         struct lov_user_md_v3 lum, *lumk;
3646         struct lov_user_ost_data_v1 *lmm_objects;
3647         int rc = 0, lum_size;
3648         ENTRY;
3649
3650         if (!lsm)
3651                 RETURN(-ENODATA);
3652
3653         /* we only need the header part from user space to get lmm_magic and
3654          * lmm_stripe_count, (the header part is common to v1 and v3) */
3655         lum_size = sizeof(struct lov_user_md_v1);
3656         if (cfs_copy_from_user(&lum, lump, lum_size))
3657                 RETURN(-EFAULT);
3658
3659         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3660             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3661                 RETURN(-EINVAL);
3662
3663         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3664         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3665         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3666         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3667
3668         /* we can use lov_mds_md_size() to compute lum_size
3669          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3670         if (lum.lmm_stripe_count > 0) {
3671                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3672                 OBD_ALLOC(lumk, lum_size);
3673                 if (!lumk)
3674                         RETURN(-ENOMEM);
3675
3676                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3677                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3678                 else
3679                         lmm_objects = &(lumk->lmm_objects[0]);
3680                 lmm_objects->l_object_id = lsm->lsm_object_id;
3681         } else {
3682                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3683                 lumk = &lum;
3684         }
3685
3686         lumk->lmm_object_id = lsm->lsm_object_id;
3687         lumk->lmm_object_gr = lsm->lsm_object_gr;
3688         lumk->lmm_stripe_count = 1;
3689
3690         if (cfs_copy_to_user(lump, lumk, lum_size))
3691                 rc = -EFAULT;
3692
3693         if (lumk != &lum)
3694                 OBD_FREE(lumk, lum_size);
3695
3696         RETURN(rc);
3697 }
3698
3699
3700 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3701                          void *karg, void *uarg)
3702 {
3703         struct obd_device *obd = exp->exp_obd;
3704         struct obd_ioctl_data *data = karg;
3705         int err = 0;
3706         ENTRY;
3707
3708         if (!cfs_try_module_get(THIS_MODULE)) {
3709                 CERROR("Can't get module. Is it alive?");
3710                 return -EINVAL;
3711         }
3712         switch (cmd) {
3713         case OBD_IOC_LOV_GET_CONFIG: {
3714                 char *buf;
3715                 struct lov_desc *desc;
3716                 struct obd_uuid uuid;
3717
3718                 buf = NULL;
3719                 len = 0;
3720                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3721                         GOTO(out, err = -EINVAL);
3722
3723                 data = (struct obd_ioctl_data *)buf;
3724
3725                 if (sizeof(*desc) > data->ioc_inllen1) {
3726                         obd_ioctl_freedata(buf, len);
3727                         GOTO(out, err = -EINVAL);
3728                 }
3729
3730                 if (data->ioc_inllen2 < sizeof(uuid)) {
3731                         obd_ioctl_freedata(buf, len);
3732                         GOTO(out, err = -EINVAL);
3733                 }
3734
3735                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3736                 desc->ld_tgt_count = 1;
3737                 desc->ld_active_tgt_count = 1;
3738                 desc->ld_default_stripe_count = 1;
3739                 desc->ld_default_stripe_size = 0;
3740                 desc->ld_default_stripe_offset = 0;
3741                 desc->ld_pattern = 0;
3742                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3743
3744                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3745
3746                 err = cfs_copy_to_user((void *)uarg, buf, len);
3747                 if (err)
3748                         err = -EFAULT;
3749                 obd_ioctl_freedata(buf, len);
3750                 GOTO(out, err);
3751         }
3752         case LL_IOC_LOV_SETSTRIPE:
3753                 err = obd_alloc_memmd(exp, karg);
3754                 if (err > 0)
3755                         err = 0;
3756                 GOTO(out, err);
3757         case LL_IOC_LOV_GETSTRIPE:
3758                 err = osc_getstripe(karg, uarg);
3759                 GOTO(out, err);
3760         case OBD_IOC_CLIENT_RECOVER:
3761                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3762                                             data->ioc_inlbuf1);
3763                 if (err > 0)
3764                         err = 0;
3765                 GOTO(out, err);
3766         case IOC_OSC_SET_ACTIVE:
3767                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3768                                                data->ioc_offset);
3769                 GOTO(out, err);
3770         case OBD_IOC_POLL_QUOTACHECK:
3771                 err = lquota_poll_check(quota_interface, exp,
3772                                         (struct if_quotacheck *)karg);
3773                 GOTO(out, err);
3774         case OBD_IOC_PING_TARGET:
3775                 err = ptlrpc_obd_ping(obd);
3776                 GOTO(out, err);
3777         default:
3778                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3779                        cmd, cfs_curproc_comm());
3780                 GOTO(out, err = -ENOTTY);
3781         }
3782 out:
3783         cfs_module_put(THIS_MODULE);
3784         return err;
3785 }
3786
3787 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3788                         void *key, __u32 *vallen, void *val,
3789                         struct lov_stripe_md *lsm)
3790 {
3791         ENTRY;
3792         if (!vallen || !val)
3793                 RETURN(-EFAULT);
3794
3795         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3796                 __u32 *stripe = val;
3797                 *vallen = sizeof(*stripe);
3798                 *stripe = 0;
3799                 RETURN(0);
3800         } else if (KEY_IS(KEY_LAST_ID)) {
3801                 struct ptlrpc_request *req;
3802                 obd_id                *reply;
3803                 char                  *tmp;
3804                 int                    rc;
3805
3806                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3807                                            &RQF_OST_GET_INFO_LAST_ID);
3808                 if (req == NULL)
3809                         RETURN(-ENOMEM);
3810
3811                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3812                                      RCL_CLIENT, keylen);
3813                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3814                 if (rc) {
3815                         ptlrpc_request_free(req);
3816                         RETURN(rc);
3817                 }
3818
3819                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3820                 memcpy(tmp, key, keylen);
3821
3822                 req->rq_no_delay = req->rq_no_resend = 1;
3823                 ptlrpc_request_set_replen(req);
3824                 rc = ptlrpc_queue_wait(req);
3825                 if (rc)
3826                         GOTO(out, rc);
3827
3828                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3829                 if (reply == NULL)
3830                         GOTO(out, rc = -EPROTO);
3831
3832                 *((obd_id *)val) = *reply;
3833         out:
3834                 ptlrpc_req_finished(req);
3835                 RETURN(rc);
3836         } else if (KEY_IS(KEY_FIEMAP)) {
3837                 struct ptlrpc_request *req;
3838                 struct ll_user_fiemap *reply;
3839                 char *tmp;
3840                 int rc;
3841
3842                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3843                                            &RQF_OST_GET_INFO_FIEMAP);
3844                 if (req == NULL)
3845                         RETURN(-ENOMEM);
3846
3847                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3848                                      RCL_CLIENT, keylen);
3849                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3850                                      RCL_CLIENT, *vallen);
3851                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3852                                      RCL_SERVER, *vallen);
3853
3854                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3855                 if (rc) {
3856                         ptlrpc_request_free(req);
3857                         RETURN(rc);
3858                 }
3859
3860                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3861                 memcpy(tmp, key, keylen);
3862                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3863                 memcpy(tmp, val, *vallen);
3864
3865                 ptlrpc_request_set_replen(req);
3866                 rc = ptlrpc_queue_wait(req);
3867                 if (rc)
3868                         GOTO(out1, rc);
3869
3870                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3871                 if (reply == NULL)
3872                         GOTO(out1, rc = -EPROTO);
3873
3874                 memcpy(val, reply, *vallen);
3875         out1:
3876                 ptlrpc_req_finished(req);
3877
3878                 RETURN(rc);
3879         }
3880
3881         RETURN(-EINVAL);
3882 }
3883
3884 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3885 {
3886         struct llog_ctxt *ctxt;
3887         int rc = 0;
3888         ENTRY;
3889
3890         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3891         if (ctxt) {
3892                 rc = llog_initiator_connect(ctxt);
3893                 llog_ctxt_put(ctxt);
3894         } else {
3895                 /* XXX return an error? skip setting below flags? */
3896         }
3897
3898         cfs_spin_lock(&imp->imp_lock);
3899         imp->imp_server_timeout = 1;
3900         imp->imp_pingable = 1;
3901         cfs_spin_unlock(&imp->imp_lock);
3902         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3903
3904         RETURN(rc);
3905 }
3906
3907 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3908                                           struct ptlrpc_request *req,
3909                                           void *aa, int rc)
3910 {
3911         ENTRY;
3912         if (rc != 0)
3913                 RETURN(rc);
3914
3915         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3916 }
3917
3918 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3919                               void *key, obd_count vallen, void *val,
3920                               struct ptlrpc_request_set *set)
3921 {
3922         struct ptlrpc_request *req;
3923         struct obd_device     *obd = exp->exp_obd;
3924         struct obd_import     *imp = class_exp2cliimp(exp);
3925         char                  *tmp;
3926         int                    rc;
3927         ENTRY;
3928
3929         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3930
3931         if (KEY_IS(KEY_NEXT_ID)) {
3932                 obd_id new_val;
3933                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3934
3935                 if (vallen != sizeof(obd_id))
3936                         RETURN(-ERANGE);
3937                 if (val == NULL)
3938                         RETURN(-EINVAL);
3939
3940                 if (vallen != sizeof(obd_id))
3941                         RETURN(-EINVAL);
3942
3943                 /* avoid race between allocate new object and set next id
3944                  * from ll_sync thread */
3945                 cfs_spin_lock(&oscc->oscc_lock);
3946                 new_val = *((obd_id*)val) + 1;
3947                 if (new_val > oscc->oscc_next_id)
3948                         oscc->oscc_next_id = new_val;
3949                 cfs_spin_unlock(&oscc->oscc_lock);
3950                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3951                        exp->exp_obd->obd_name,
3952                        obd->u.cli.cl_oscc.oscc_next_id);
3953
3954                 RETURN(0);
3955         }
3956
3957         if (KEY_IS(KEY_INIT_RECOV)) {
3958                 if (vallen != sizeof(int))
3959                         RETURN(-EINVAL);
3960                 cfs_spin_lock(&imp->imp_lock);
3961                 imp->imp_initial_recov = *(int *)val;
3962                 cfs_spin_unlock(&imp->imp_lock);
3963                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3964                        exp->exp_obd->obd_name,
3965                        imp->imp_initial_recov);
3966                 RETURN(0);
3967         }
3968
3969         if (KEY_IS(KEY_CHECKSUM)) {
3970                 if (vallen != sizeof(int))
3971                         RETURN(-EINVAL);
3972                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3973                 RETURN(0);
3974         }
3975
3976         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3977                 sptlrpc_conf_client_adapt(obd);
3978                 RETURN(0);
3979         }
3980
3981         if (KEY_IS(KEY_FLUSH_CTX)) {
3982                 sptlrpc_import_flush_my_ctx(imp);
3983                 RETURN(0);
3984         }
3985
3986         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3987                 RETURN(-EINVAL);
3988
3989         /* We pass all other commands directly to OST. Since nobody calls osc
3990            methods directly and everybody is supposed to go through LOV, we
3991            assume lov checked invalid values for us.
3992            The only recognised values so far are evict_by_nid and mds_conn.
3993            Even if something bad goes through, we'd get a -EINVAL from OST
3994            anyway. */
3995
3996         if (KEY_IS(KEY_GRANT_SHRINK))
3997                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3998         else
3999                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4000
4001         if (req == NULL)
4002                 RETURN(-ENOMEM);
4003
4004         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4005                              RCL_CLIENT, keylen);
4006         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4007                              RCL_CLIENT, vallen);
4008         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4009         if (rc) {
4010                 ptlrpc_request_free(req);
4011                 RETURN(rc);
4012         }
4013
4014         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4015         memcpy(tmp, key, keylen);
4016         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4017         memcpy(tmp, val, vallen);
4018
4019         if (KEY_IS(KEY_MDS_CONN)) {
4020                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4021
4022                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4023                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4024                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4025                 req->rq_no_delay = req->rq_no_resend = 1;
4026                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4027         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4028                 struct osc_grant_args *aa;
4029                 struct obdo *oa;
4030
4031                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4032                 aa = ptlrpc_req_async_args(req);
4033                 OBD_ALLOC_PTR(oa);
4034                 if (!oa) {
4035                         ptlrpc_req_finished(req);
4036                         RETURN(-ENOMEM);
4037                 }
4038                 *oa = ((struct ost_body *)val)->oa;
4039                 aa->aa_oa = oa;
4040                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4041         }
4042
4043         ptlrpc_request_set_replen(req);
4044         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4045                 LASSERT(set != NULL);
4046                 ptlrpc_set_add_req(set, req);
4047                 ptlrpc_check_set(NULL, set);
4048         } else
4049                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4050
4051         RETURN(0);
4052 }
4053
4054
4055 static struct llog_operations osc_size_repl_logops = {
4056         lop_cancel: llog_obd_repl_cancel
4057 };
4058
4059 static struct llog_operations osc_mds_ost_orig_logops;
4060
4061 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4062                            struct obd_device *tgt, struct llog_catid *catid)
4063 {
4064         int rc;
4065         ENTRY;
4066
4067         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4068                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4069         if (rc) {
4070                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4071                 GOTO(out, rc);
4072         }
4073
4074         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4075                         NULL, &osc_size_repl_logops);
4076         if (rc) {
4077                 struct llog_ctxt *ctxt =
4078                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4079                 if (ctxt)
4080                         llog_cleanup(ctxt);
4081                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4082         }
4083         GOTO(out, rc);
4084 out:
4085         if (rc) {
4086                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4087                        obd->obd_name, tgt->obd_name, catid, rc);
4088                 CERROR("logid "LPX64":0x%x\n",
4089                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4090         }
4091         return rc;
4092 }
4093
4094 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4095                          struct obd_device *disk_obd, int *index)
4096 {
4097         struct llog_catid catid;
4098         static char name[32] = CATLIST;
4099         int rc;
4100         ENTRY;
4101
4102         LASSERT(olg == &obd->obd_olg);
4103
4104         cfs_mutex_down(&olg->olg_cat_processing);
4105         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4106         if (rc) {
4107                 CERROR("rc: %d\n", rc);
4108                 GOTO(out, rc);
4109         }
4110
4111         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4112                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4113                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4114
4115         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4116         if (rc) {
4117                 CERROR("rc: %d\n", rc);
4118                 GOTO(out, rc);
4119         }
4120
4121         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4122         if (rc) {
4123                 CERROR("rc: %d\n", rc);
4124                 GOTO(out, rc);
4125         }
4126
4127  out:
4128         cfs_mutex_up(&olg->olg_cat_processing);
4129
4130         return rc;
4131 }
4132
4133 static int osc_llog_finish(struct obd_device *obd, int count)
4134 {
4135         struct llog_ctxt *ctxt;
4136         int rc = 0, rc2 = 0;
4137         ENTRY;
4138
4139         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4140         if (ctxt)
4141                 rc = llog_cleanup(ctxt);
4142
4143         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4144         if (ctxt)
4145                 rc2 = llog_cleanup(ctxt);
4146         if (!rc)
4147                 rc = rc2;
4148
4149         RETURN(rc);
4150 }
4151
4152 static int osc_reconnect(const struct lu_env *env,
4153                          struct obd_export *exp, struct obd_device *obd,
4154                          struct obd_uuid *cluuid,
4155                          struct obd_connect_data *data,
4156                          void *localdata)
4157 {
4158         struct client_obd *cli = &obd->u.cli;
4159
4160         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4161                 long lost_grant;
4162
4163                 client_obd_list_lock(&cli->cl_loi_list_lock);
4164                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4165                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4166                 lost_grant = cli->cl_lost_grant;
4167                 cli->cl_lost_grant = 0;
4168                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4169
4170                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4171                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4172                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4173                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4174                        " ocd_grant: %d\n", data->ocd_connect_flags,
4175                        data->ocd_version, data->ocd_grant);
4176         }
4177
4178         RETURN(0);
4179 }
4180
4181 static int osc_disconnect(struct obd_export *exp)
4182 {
4183         struct obd_device *obd = class_exp2obd(exp);
4184         struct llog_ctxt  *ctxt;
4185         int rc;
4186
4187         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4188         if (ctxt) {
4189                 if (obd->u.cli.cl_conn_count == 1) {
4190                         /* Flush any remaining cancel messages out to the
4191                          * target */
4192                         llog_sync(ctxt, exp);
4193                 }
4194                 llog_ctxt_put(ctxt);
4195         } else {
4196                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4197                        obd);
4198         }
4199
4200         rc = client_disconnect_export(exp);
4201         /**
4202          * Initially we put del_shrink_grant before disconnect_export, but it
4203          * causes the following problem if setup (connect) and cleanup
4204          * (disconnect) are tangled together.
4205          *      connect p1                     disconnect p2
4206          *   ptlrpc_connect_import
4207          *     ...............               class_manual_cleanup
4208          *                                     osc_disconnect
4209          *                                     del_shrink_grant
4210          *   ptlrpc_connect_interrupt
4211          *     init_grant_shrink
4212          *   add this client to shrink list
4213          *                                      cleanup_osc
4214          * Bang! pinger trigger the shrink.
4215          * So the osc should be disconnected from the shrink list, after we
4216          * are sure the import has been destroyed. BUG18662
4217          */
4218         if (obd->u.cli.cl_import == NULL)
4219                 osc_del_shrink_grant(&obd->u.cli);
4220         return rc;
4221 }
4222
4223 static int osc_import_event(struct obd_device *obd,
4224                             struct obd_import *imp,
4225                             enum obd_import_event event)
4226 {
4227         struct client_obd *cli;
4228         int rc = 0;
4229
4230         ENTRY;
4231         LASSERT(imp->imp_obd == obd);
4232
4233         switch (event) {
4234         case IMP_EVENT_DISCON: {
4235                 /* Only do this on the MDS OSC's */
4236                 if (imp->imp_server_timeout) {
4237                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4238
4239                         cfs_spin_lock(&oscc->oscc_lock);
4240                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4241                         cfs_spin_unlock(&oscc->oscc_lock);
4242                 }
4243                 cli = &obd->u.cli;
4244                 client_obd_list_lock(&cli->cl_loi_list_lock);
4245                 cli->cl_avail_grant = 0;
4246                 cli->cl_lost_grant = 0;
4247                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4248                 break;
4249         }
4250         case IMP_EVENT_INACTIVE: {
4251                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4252                 break;
4253         }
4254         case IMP_EVENT_INVALIDATE: {
4255                 struct ldlm_namespace *ns = obd->obd_namespace;
4256                 struct lu_env         *env;
4257                 int                    refcheck;
4258
4259                 env = cl_env_get(&refcheck);
4260                 if (!IS_ERR(env)) {
4261                         /* Reset grants */
4262                         cli = &obd->u.cli;
4263                         client_obd_list_lock(&cli->cl_loi_list_lock);
4264                         /* all pages go to failing rpcs due to the invalid
4265                          * import */
4266                         osc_check_rpcs(env, cli);
4267                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4268
4269                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4270                         cl_env_put(env, &refcheck);
4271                 } else
4272                         rc = PTR_ERR(env);
4273                 break;
4274         }
4275         case IMP_EVENT_ACTIVE: {
4276                 /* Only do this on the MDS OSC's */
4277                 if (imp->imp_server_timeout) {
4278                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4279
4280                         cfs_spin_lock(&oscc->oscc_lock);
4281                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4282                         cfs_spin_unlock(&oscc->oscc_lock);
4283                 }
4284                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4285                 break;
4286         }
4287         case IMP_EVENT_OCD: {
4288                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4289
4290                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4291                         osc_init_grant(&obd->u.cli, ocd);
4292
4293                 /* See bug 7198 */
4294                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4295                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4296
4297                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4298                 break;
4299         }
4300         default:
4301                 CERROR("Unknown import event %d\n", event);
4302                 LBUG();
4303         }
4304         RETURN(rc);
4305 }
4306
4307 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4308 {
4309         int rc;
4310         ENTRY;
4311
4312         ENTRY;
4313         rc = ptlrpcd_addref();
4314         if (rc)
4315                 RETURN(rc);
4316
4317         rc = client_obd_setup(obd, lcfg);
4318         if (rc) {
4319                 ptlrpcd_decref();
4320         } else {
4321                 struct lprocfs_static_vars lvars = { 0 };
4322                 struct client_obd *cli = &obd->u.cli;
4323
4324                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4325                 lprocfs_osc_init_vars(&lvars);
4326                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4327                         lproc_osc_attach_seqstat(obd);
4328                         sptlrpc_lprocfs_cliobd_attach(obd);
4329                         ptlrpc_lprocfs_register_obd(obd);
4330                 }
4331
4332                 oscc_init(obd);
4333                 /* We need to allocate a few requests more, because
4334                    brw_interpret tries to create new requests before freeing
4335                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4336                    reserved, but I afraid that might be too much wasted RAM
4337                    in fact, so 2 is just my guess and still should work. */
4338                 cli->cl_import->imp_rq_pool =
4339                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4340                                             OST_MAXREQSIZE,
4341                                             ptlrpc_add_rqs_to_pool);
4342
4343                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4344                 cfs_sema_init(&cli->cl_grant_sem, 1);
4345         }
4346
4347         RETURN(rc);
4348 }
4349
4350 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4351 {
4352         int rc = 0;
4353         ENTRY;
4354
4355         switch (stage) {
4356         case OBD_CLEANUP_EARLY: {
4357                 struct obd_import *imp;
4358                 imp = obd->u.cli.cl_import;
4359                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4360                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4361                 ptlrpc_deactivate_import(imp);
4362                 cfs_spin_lock(&imp->imp_lock);
4363                 imp->imp_pingable = 0;
4364                 cfs_spin_unlock(&imp->imp_lock);
4365                 break;
4366         }
4367         case OBD_CLEANUP_EXPORTS: {
4368                 /* If we set up but never connected, the
4369                    client import will not have been cleaned. */
4370                 if (obd->u.cli.cl_import) {
4371                         struct obd_import *imp;
4372                         cfs_down_write(&obd->u.cli.cl_sem);
4373                         imp = obd->u.cli.cl_import;
4374                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4375                                obd->obd_name);
4376                         ptlrpc_invalidate_import(imp);
4377                         if (imp->imp_rq_pool) {
4378                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4379                                 imp->imp_rq_pool = NULL;
4380                         }
4381                         class_destroy_import(imp);
4382                         cfs_up_write(&obd->u.cli.cl_sem);
4383                         obd->u.cli.cl_import = NULL;
4384                 }
4385                 rc = obd_llog_finish(obd, 0);
4386                 if (rc != 0)
4387                         CERROR("failed to cleanup llogging subsystems\n");
4388                 break;
4389                 }
4390         }
4391         RETURN(rc);
4392 }
4393
4394 int osc_cleanup(struct obd_device *obd)
4395 {
4396         int rc;
4397
4398         ENTRY;
4399         ptlrpc_lprocfs_unregister_obd(obd);
4400         lprocfs_obd_cleanup(obd);
4401
4402         /* free memory of osc quota cache */
4403         lquota_cleanup(quota_interface, obd);
4404
4405         rc = client_obd_cleanup(obd);
4406
4407         ptlrpcd_decref();
4408         RETURN(rc);
4409 }
4410
4411 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4412 {
4413         struct lprocfs_static_vars lvars = { 0 };
4414         int rc = 0;
4415
4416         lprocfs_osc_init_vars(&lvars);
4417
4418         switch (lcfg->lcfg_command) {
4419         default:
4420                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4421                                               lcfg, obd);
4422                 if (rc > 0)
4423                         rc = 0;
4424                 break;
4425         }
4426
4427         return(rc);
4428 }
4429
4430 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4431 {
4432         return osc_process_config_base(obd, buf);
4433 }
4434
4435 struct obd_ops osc_obd_ops = {
4436         .o_owner                = THIS_MODULE,
4437         .o_setup                = osc_setup,
4438         .o_precleanup           = osc_precleanup,
4439         .o_cleanup              = osc_cleanup,
4440         .o_add_conn             = client_import_add_conn,
4441         .o_del_conn             = client_import_del_conn,
4442         .o_connect              = client_connect_import,
4443         .o_reconnect            = osc_reconnect,
4444         .o_disconnect           = osc_disconnect,
4445         .o_statfs               = osc_statfs,
4446         .o_statfs_async         = osc_statfs_async,
4447         .o_packmd               = osc_packmd,
4448         .o_unpackmd             = osc_unpackmd,
4449         .o_precreate            = osc_precreate,
4450         .o_create               = osc_create,
4451         .o_create_async         = osc_create_async,
4452         .o_destroy              = osc_destroy,
4453         .o_getattr              = osc_getattr,
4454         .o_getattr_async        = osc_getattr_async,
4455         .o_setattr              = osc_setattr,
4456         .o_setattr_async        = osc_setattr_async,
4457         .o_brw                  = osc_brw,
4458         .o_punch                = osc_punch,
4459         .o_sync                 = osc_sync,
4460         .o_enqueue              = osc_enqueue,
4461         .o_change_cbdata        = osc_change_cbdata,
4462         .o_cancel               = osc_cancel,
4463         .o_cancel_unused        = osc_cancel_unused,
4464         .o_iocontrol            = osc_iocontrol,
4465         .o_get_info             = osc_get_info,
4466         .o_set_info_async       = osc_set_info_async,
4467         .o_import_event         = osc_import_event,
4468         .o_llog_init            = osc_llog_init,
4469         .o_llog_finish          = osc_llog_finish,
4470         .o_process_config       = osc_process_config,
4471 };
4472
4473 extern struct lu_kmem_descr osc_caches[];
4474 extern cfs_spinlock_t       osc_ast_guard;
4475 extern cfs_lock_class_key_t osc_ast_guard_class;
4476
4477 int __init osc_init(void)
4478 {
4479         struct lprocfs_static_vars lvars = { 0 };
4480         int rc;
4481         ENTRY;
4482
4483         /* print an address of _any_ initialized kernel symbol from this
4484          * module, to allow debugging with gdb that doesn't support data
4485          * symbols from modules.*/
4486         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4487
4488         rc = lu_kmem_init(osc_caches);
4489
4490         lprocfs_osc_init_vars(&lvars);
4491
4492         cfs_request_module("lquota");
4493         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4494         lquota_init(quota_interface);
4495         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4496
4497         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4498                                  LUSTRE_OSC_NAME, &osc_device_type);
4499         if (rc) {
4500                 if (quota_interface)
4501                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4502                 lu_kmem_fini(osc_caches);
4503                 RETURN(rc);
4504         }
4505
4506         cfs_spin_lock_init(&osc_ast_guard);
4507         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4508
4509         osc_mds_ost_orig_logops = llog_lvfs_ops;
4510         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4511         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4512         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4513         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4514
4515         RETURN(rc);
4516 }
4517
4518 #ifdef __KERNEL__
4519 static void /*__exit*/ osc_exit(void)
4520 {
4521         lu_device_type_fini(&osc_device_type);
4522
4523         lquota_exit(quota_interface);
4524         if (quota_interface)
4525                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4526
4527         class_unregister_type(LUSTRE_OSC_NAME);
4528         lu_kmem_fini(osc_caches);
4529 }
4530
4531 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4532 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4533 MODULE_LICENSE("GPL");
4534
4535 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4536 #endif