Whamcloud - gitweb
dea1edd960262810697e19a0692e404f36c4b352
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_async_args *aa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
362 out:
363         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364         RETURN(rc);
365 }
366
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368                              struct obd_trans_info *oti,
369                              struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request *req;
372         struct osc_async_args *aa;
373         int                    rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PSCOPE_OTHER);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403                 aa = ptlrpc_req_async_args(req);
404                 aa->aa_oi = oinfo;
405
406                 ptlrpc_set_add_req(rqset, req);
407         }
408
409         RETURN(0);
410 }
411
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct lov_stripe_md  *lsm;
418         int                    rc;
419         ENTRY;
420
421         LASSERT(oa);
422         LASSERT(ea);
423
424         lsm = *ea;
425         if (!lsm) {
426                 rc = obd_alloc_memmd(exp, &lsm);
427                 if (rc < 0)
428                         RETURN(rc);
429         }
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
432         if (req == NULL)
433                 GOTO(out, rc = -ENOMEM);
434
435         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 GOTO(out, rc);
439         }
440
441         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442         LASSERT(body);
443         lustre_set_wire_obdo(&body->oa, oa);
444
445         ptlrpc_request_set_replen(req);
446
447         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448             oa->o_flags == OBD_FL_DELORPHAN) {
449                 DEBUG_REQ(D_HA, req,
450                           "delorphan from OST integration");
451                 /* Don't resend the delorphan req */
452                 req->rq_no_resend = req->rq_no_delay = 1;
453         }
454
455         rc = ptlrpc_queue_wait(req);
456         if (rc)
457                 GOTO(out_req, rc);
458
459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460         if (body == NULL)
461                 GOTO(out_req, rc = -EPROTO);
462
463         lustre_get_wire_obdo(oa, &body->oa);
464
465         /* This should really be sent by the OST */
466         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467         oa->o_valid |= OBD_MD_FLBLKSZ;
468
469         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470          * have valid lsm_oinfo data structs, so don't go touching that.
471          * This needs to be fixed in a big way.
472          */
473         lsm->lsm_object_id = oa->o_id;
474         lsm->lsm_object_gr = oa->o_gr;
475         *ea = lsm;
476
477         if (oti != NULL) {
478                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
479
480                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481                         if (!oti->oti_logcookies)
482                                 oti_alloc_cookies(oti, 1);
483                         *oti->oti_logcookies = oa->o_lcookie;
484                 }
485         }
486
487         CDEBUG(D_HA, "transno: "LPD64"\n",
488                lustre_msg_get_transno(req->rq_repmsg));
489 out_req:
490         ptlrpc_req_finished(req);
491 out:
492         if (rc && !*ea)
493                 obd_free_memmd(exp, &lsm);
494         RETURN(rc);
495 }
496
497 static int osc_punch_interpret(const struct lu_env *env,
498                                struct ptlrpc_request *req,
499                                struct osc_punch_args *aa, int rc)
500 {
501         struct ost_body *body;
502         ENTRY;
503
504         if (rc != 0)
505                 GOTO(out, rc);
506
507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508         if (body == NULL)
509                 GOTO(out, rc = -EPROTO);
510
511         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
512 out:
513         rc = aa->pa_upcall(aa->pa_cookie, rc);
514         RETURN(rc);
515 }
516
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518                    struct obd_capa *capa,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request *req;
523         struct osc_punch_args *aa;
524         struct ost_body       *body;
525         int                    rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&body->oa, oa);
544         osc_pack_capa(req, body, capa);
545
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551         aa = ptlrpc_req_async_args(req);
552         aa->pa_oa     = oa;
553         aa->pa_upcall = upcall;
554         aa->pa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PSCOPE_OTHER);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564                      struct obd_trans_info *oti,
565                      struct ptlrpc_request_set *rqset)
566 {
567         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
568         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571                               oinfo->oi_cb_up, oinfo, rqset);
572 }
573
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575                     struct lov_stripe_md *md, obd_size start, obd_size end,
576                     void *capa)
577 {
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         int                    rc;
581         ENTRY;
582
583         if (!oa) {
584                 CDEBUG(D_INFO, "oa NULL\n");
585                 RETURN(-EINVAL);
586         }
587
588         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
589         if (req == NULL)
590                 RETURN(-ENOMEM);
591
592         osc_set_capa_size(req, &RMF_CAPA1, capa);
593         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
594         if (rc) {
595                 ptlrpc_request_free(req);
596                 RETURN(rc);
597         }
598
599         /* overload the size and blocks fields in the oa with start/end */
600         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
601         LASSERT(body);
602         lustre_set_wire_obdo(&body->oa, oa);
603         body->oa.o_size = start;
604         body->oa.o_blocks = end;
605         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606         osc_pack_capa(req, body, capa);
607
608         ptlrpc_request_set_replen(req);
609
610         rc = ptlrpc_queue_wait(req);
611         if (rc)
612                 GOTO(out, rc);
613
614         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
615         if (body == NULL)
616                 GOTO(out, rc = -EPROTO);
617
618         lustre_get_wire_obdo(oa, &body->oa);
619
620         EXIT;
621  out:
622         ptlrpc_req_finished(req);
623         return rc;
624 }
625
626 /* Find and cancel locally locks matched by @mode in the resource found by
627  * @objid. Found locks are added into @cancel list. Returns the amount of
628  * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
630                                    struct list_head *cancels, ldlm_mode_t mode,
631                                    int lock_flags)
632 {
633         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634         struct ldlm_res_id res_id;
635         struct ldlm_resource *res;
636         int count;
637         ENTRY;
638
639         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
641         if (res == NULL)
642                 RETURN(0);
643
644         LDLM_RESOURCE_ADDREF(res);
645         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646                                            lock_flags, 0, NULL);
647         LDLM_RESOURCE_DELREF(res);
648         ldlm_resource_putref(res);
649         RETURN(count);
650 }
651
652 static int osc_destroy_interpret(const struct lu_env *env,
653                                  struct ptlrpc_request *req, void *data,
654                                  int rc)
655 {
656         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
657
658         atomic_dec(&cli->cl_destroy_in_flight);
659         cfs_waitq_signal(&cli->cl_destroy_waitq);
660         return 0;
661 }
662
663 static int osc_can_send_destroy(struct client_obd *cli)
664 {
665         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
666             cli->cl_max_rpcs_in_flight) {
667                 /* The destroy request can be sent */
668                 return 1;
669         }
670         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
671             cli->cl_max_rpcs_in_flight) {
672                 /*
673                  * The counter has been modified between the two atomic
674                  * operations.
675                  */
676                 cfs_waitq_signal(&cli->cl_destroy_waitq);
677         }
678         return 0;
679 }
680
681 /* Destroy requests can be async always on the client, and we don't even really
682  * care about the return code since the client cannot do anything at all about
683  * a destroy failure.
684  * When the MDS is unlinking a filename, it saves the file objects into a
685  * recovery llog, and these object records are cancelled when the OST reports
686  * they were destroyed and sync'd to disk (i.e. transaction committed).
687  * If the client dies, or the OST is down when the object should be destroyed,
688  * the records are not cancelled, and when the OST reconnects to the MDS next,
689  * it will retrieve the llog unlink logs and then sends the log cancellation
690  * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
693                        struct obd_export *md_export, void *capa)
694 {
695         struct client_obd     *cli = &exp->exp_obd->u.cli;
696         struct ptlrpc_request *req;
697         struct ost_body       *body;
698         CFS_LIST_HEAD(cancels);
699         int rc, count;
700         ENTRY;
701
702         if (!oa) {
703                 CDEBUG(D_INFO, "oa NULL\n");
704                 RETURN(-EINVAL);
705         }
706
707         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708                                         LDLM_FL_DISCARD_DATA);
709
710         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
711         if (req == NULL) {
712                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
713                 RETURN(-ENOMEM);
714         }
715
716         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
718                                0, &cancels, count);
719         if (rc) {
720                 ptlrpc_request_free(req);
721                 RETURN(rc);
722         }
723
724         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725         ptlrpc_at_set_req_timeout(req);
726
727         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728                 oa->o_lcookie = *oti->oti_logcookies;
729         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
730         LASSERT(body);
731         lustre_set_wire_obdo(&body->oa, oa);
732
733         osc_pack_capa(req, body, (struct obd_capa *)capa);
734         ptlrpc_request_set_replen(req);
735
736         /* don't throttle destroy RPCs for the MDT */
737         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738                 req->rq_interpret_reply = osc_destroy_interpret;
739                 if (!osc_can_send_destroy(cli)) {
740                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
741                                                           NULL);
742
743                         /*
744                          * Wait until the number of on-going destroy RPCs drops
745                          * under max_rpc_in_flight
746                          */
747                         l_wait_event_exclusive(cli->cl_destroy_waitq,
748                                                osc_can_send_destroy(cli), &lwi);
749                 }
750         }
751
752         /* Do not wait for response */
753         ptlrpcd_add_req(req, PSCOPE_OTHER);
754         RETURN(0);
755 }
756
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
758                                 long writing_bytes)
759 {
760         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
761
762         LASSERT(!(oa->o_valid & bits));
763
764         oa->o_valid |= bits;
765         client_obd_list_lock(&cli->cl_loi_list_lock);
766         oa->o_dirty = cli->cl_dirty;
767         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
770                 oa->o_undirty = 0;
771         } else if (atomic_read(&obd_dirty_pages) -
772                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
773                 CERROR("dirty %d - %d > system dirty_max %d\n",
774                        atomic_read(&obd_dirty_pages),
775                        atomic_read(&obd_dirty_transit_pages),
776                        obd_max_dirty_pages);
777                 oa->o_undirty = 0;
778         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
779                 CERROR("dirty %lu - dirty_max %lu too big???\n",
780                        cli->cl_dirty, cli->cl_dirty_max);
781                 oa->o_undirty = 0;
782         } else {
783                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
784                                 (cli->cl_max_rpcs_in_flight + 1);
785                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
786         }
787         oa->o_grant = cli->cl_avail_grant;
788         oa->o_dropped = cli->cl_lost_grant;
789         cli->cl_lost_grant = 0;
790         client_obd_list_unlock(&cli->cl_loi_list_lock);
791         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
792                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793
794 }
795
796 static void osc_update_next_shrink(struct client_obd *cli)
797 {
798         cli->cl_next_shrink_grant =
799                 cfs_time_shift(cli->cl_grant_shrink_interval);
800         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
801                cli->cl_next_shrink_grant);
802 }
803
804 /* caller must hold loi_list_lock */
805 static void osc_consume_write_grant(struct client_obd *cli,
806                                     struct brw_page *pga)
807 {
808         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
809         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
810         atomic_inc(&obd_dirty_pages);
811         cli->cl_dirty += CFS_PAGE_SIZE;
812         cli->cl_avail_grant -= CFS_PAGE_SIZE;
813         pga->flag |= OBD_BRW_FROM_GRANT;
814         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
815                CFS_PAGE_SIZE, pga, pga->pg);
816         LASSERT(cli->cl_avail_grant >= 0);
817         osc_update_next_shrink(cli);
818 }
819
820 /* the companion to osc_consume_write_grant, called when a brw has completed.
821  * must be called with the loi lock held. */
822 static void osc_release_write_grant(struct client_obd *cli,
823                                     struct brw_page *pga, int sent)
824 {
825         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
826         ENTRY;
827
828         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
829         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
830                 EXIT;
831                 return;
832         }
833
834         pga->flag &= ~OBD_BRW_FROM_GRANT;
835         atomic_dec(&obd_dirty_pages);
836         cli->cl_dirty -= CFS_PAGE_SIZE;
837         if (pga->flag & OBD_BRW_NOCACHE) {
838                 pga->flag &= ~OBD_BRW_NOCACHE;
839                 atomic_dec(&obd_dirty_transit_pages);
840                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
841         }
842         if (!sent) {
843                 cli->cl_lost_grant += CFS_PAGE_SIZE;
844                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
845                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
846         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
847                 /* For short writes we shouldn't count parts of pages that
848                  * span a whole block on the OST side, or our accounting goes
849                  * wrong.  Should match the code in filter_grant_check. */
850                 int offset = pga->off & ~CFS_PAGE_MASK;
851                 int count = pga->count + (offset & (blocksize - 1));
852                 int end = (offset + pga->count) & (blocksize - 1);
853                 if (end)
854                         count += blocksize - end;
855
856                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
857                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
858                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
859                        cli->cl_avail_grant, cli->cl_dirty);
860         }
861
862         EXIT;
863 }
864
865 static unsigned long rpcs_in_flight(struct client_obd *cli)
866 {
867         return cli->cl_r_in_flight + cli->cl_w_in_flight;
868 }
869
870 /* caller must hold loi_list_lock */
871 void osc_wake_cache_waiters(struct client_obd *cli)
872 {
873         struct list_head *l, *tmp;
874         struct osc_cache_waiter *ocw;
875
876         ENTRY;
877         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
878                 /* if we can't dirty more, we must wait until some is written */
879                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
880                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
881                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
882                                "osc max %ld, sys max %d\n", cli->cl_dirty,
883                                cli->cl_dirty_max, obd_max_dirty_pages);
884                         return;
885                 }
886
887                 /* if still dirty cache but no grant wait for pending RPCs that
888                  * may yet return us some grant before doing sync writes */
889                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
890                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
891                                cli->cl_w_in_flight);
892                         return;
893                 }
894
895                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
896                 list_del_init(&ocw->ocw_entry);
897                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
898                         /* no more RPCs in flight to return grant, do sync IO */
899                         ocw->ocw_rc = -EDQUOT;
900                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
901                 } else {
902                         osc_consume_write_grant(cli,
903                                                 &ocw->ocw_oap->oap_brw_page);
904                 }
905
906                 cfs_waitq_signal(&ocw->ocw_waitq);
907         }
908
909         EXIT;
910 }
911
912 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
913 {
914         client_obd_list_lock(&cli->cl_loi_list_lock);
915         cli->cl_avail_grant += grant;
916         client_obd_list_unlock(&cli->cl_loi_list_lock);
917 }
918
919 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
920 {
921         if (body->oa.o_valid & OBD_MD_FLGRANT) {
922                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
923                 __osc_update_grant(cli, body->oa.o_grant);
924         }
925 }
926
927 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
928                               void *key, obd_count vallen, void *val,
929                               struct ptlrpc_request_set *set);
930
931 static int osc_shrink_grant_interpret(const struct lu_env *env,
932                                       struct ptlrpc_request *req,
933                                       void *aa, int rc)
934 {
935         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
936         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
937         struct ost_body *body;
938
939         if (rc != 0) {
940                 __osc_update_grant(cli, oa->o_grant);
941                 GOTO(out, rc);
942         }
943
944         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
945         LASSERT(body);
946         osc_update_grant(cli, body);
947 out:
948         OBD_FREE_PTR(oa);
949         return rc;
950 }
951
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
953 {
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         oa->o_grant = cli->cl_avail_grant / 4;
956         cli->cl_avail_grant -= oa->o_grant;
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         oa->o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960 }
961
962 /* Shrink the current grant, either from some large amount to enough for a
963  * full set of in-flight RPCs, or if we have already shrunk to that limit
964  * then to enough for a single RPC.  This avoids keeping more grant than
965  * needed, and avoids shrinking the grant piecemeal. */
966 static int osc_shrink_grant(struct client_obd *cli)
967 {
968         long target = (cli->cl_max_rpcs_in_flight + 1) *
969                       cli->cl_max_pages_per_rpc;
970
971         client_obd_list_lock(&cli->cl_loi_list_lock);
972         if (cli->cl_avail_grant <= target)
973                 target = cli->cl_max_pages_per_rpc;
974         client_obd_list_unlock(&cli->cl_loi_list_lock);
975
976         return osc_shrink_grant_to_target(cli, target);
977 }
978
979 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
980 {
981         int    rc = 0;
982         struct ost_body     *body;
983         ENTRY;
984
985         client_obd_list_lock(&cli->cl_loi_list_lock);
986         /* Don't shrink if we are already above or below the desired limit
987          * We don't want to shrink below a single RPC, as that will negatively
988          * impact block allocation and long-term performance. */
989         if (target < cli->cl_max_pages_per_rpc)
990                 target = cli->cl_max_pages_per_rpc;
991
992         if (target >= cli->cl_avail_grant) {
993                 client_obd_list_unlock(&cli->cl_loi_list_lock);
994                 RETURN(0);
995         }
996         client_obd_list_unlock(&cli->cl_loi_list_lock);
997
998         OBD_ALLOC_PTR(body);
999         if (!body)
1000                 RETURN(-ENOMEM);
1001
1002         osc_announce_cached(cli, &body->oa, 0);
1003
1004         client_obd_list_lock(&cli->cl_loi_list_lock);
1005         body->oa.o_grant = cli->cl_avail_grant - target;
1006         cli->cl_avail_grant = target;
1007         client_obd_list_unlock(&cli->cl_loi_list_lock);
1008         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1009         osc_update_next_shrink(cli);
1010
1011         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1012                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1013                                 sizeof(*body), body, NULL);
1014         if (rc != 0)
1015                 __osc_update_grant(cli, body->oa.o_grant);
1016         OBD_FREE_PTR(body);
1017         RETURN(rc);
1018 }
1019
1020 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1021 static int osc_should_shrink_grant(struct client_obd *client)
1022 {
1023         cfs_time_t time = cfs_time_current();
1024         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1025         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1026                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1027                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1028                         return 1;
1029                 else
1030                         osc_update_next_shrink(client);
1031         }
1032         return 0;
1033 }
1034
1035 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1036 {
1037         struct client_obd *client;
1038
1039         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1040                 if (osc_should_shrink_grant(client))
1041                         osc_shrink_grant(client);
1042         }
1043         return 0;
1044 }
1045
1046 static int osc_add_shrink_grant(struct client_obd *client)
1047 {
1048         int rc;
1049
1050         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1051                                        TIMEOUT_GRANT,
1052                                        osc_grant_shrink_grant_cb, NULL,
1053                                        &client->cl_grant_shrink_list);
1054         if (rc) {
1055                 CERROR("add grant client %s error %d\n",
1056                         client->cl_import->imp_obd->obd_name, rc);
1057                 return rc;
1058         }
1059         CDEBUG(D_CACHE, "add grant client %s \n",
1060                client->cl_import->imp_obd->obd_name);
1061         osc_update_next_shrink(client);
1062         return 0;
1063 }
1064
1065 static int osc_del_shrink_grant(struct client_obd *client)
1066 {
1067         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1068                                          TIMEOUT_GRANT);
1069 }
1070
1071 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1072 {
1073         /*
1074          * ocd_grant is the total grant amount we're expect to hold: if we've
1075          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1076          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1077          *
1078          * race is tolerable here: if we're evicted, but imp_state already
1079          * left EVICTED state, then cl_dirty must be 0 already.
1080          */
1081         client_obd_list_lock(&cli->cl_loi_list_lock);
1082         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1083                 cli->cl_avail_grant = ocd->ocd_grant;
1084         else
1085                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1086         client_obd_list_unlock(&cli->cl_loi_list_lock);
1087
1088         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1089                cli->cl_avail_grant, cli->cl_lost_grant);
1090         LASSERT(cli->cl_avail_grant >= 0);
1091
1092         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093             list_empty(&cli->cl_grant_shrink_list))
1094                 osc_add_shrink_grant(cli);
1095 }
1096
1097 /* We assume that the reason this OSC got a short read is because it read
1098  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100  * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102                               struct brw_page **pga)
1103 {
1104         char *ptr;
1105         int i = 0;
1106
1107         /* skip bytes read OK */
1108         while (nob_read > 0) {
1109                 LASSERT (page_count > 0);
1110
1111                 if (pga[i]->count > nob_read) {
1112                         /* EOF inside this page */
1113                         ptr = cfs_kmap(pga[i]->pg) +
1114                                 (pga[i]->off & ~CFS_PAGE_MASK);
1115                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116                         cfs_kunmap(pga[i]->pg);
1117                         page_count--;
1118                         i++;
1119                         break;
1120                 }
1121
1122                 nob_read -= pga[i]->count;
1123                 page_count--;
1124                 i++;
1125         }
1126
1127         /* zero remaining pages */
1128         while (page_count-- > 0) {
1129                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130                 memset(ptr, 0, pga[i]->count);
1131                 cfs_kunmap(pga[i]->pg);
1132                 i++;
1133         }
1134 }
1135
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137                            int requested_nob, int niocount,
1138                            obd_count page_count, struct brw_page **pga)
1139 {
1140         int     i;
1141         __u32   *remote_rcs;
1142
1143         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144                                                   sizeof(*remote_rcs) *
1145                                                   niocount);
1146         if (remote_rcs == NULL) {
1147                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1148                 return(-EPROTO);
1149         }
1150
1151         /* return error if any niobuf was in error */
1152         for (i = 0; i < niocount; i++) {
1153                 if (remote_rcs[i] < 0)
1154                         return(remote_rcs[i]);
1155
1156                 if (remote_rcs[i] != 0) {
1157                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158                                 i, remote_rcs[i], req);
1159                         return(-EPROTO);
1160                 }
1161         }
1162
1163         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165                        req->rq_bulk->bd_nob_transferred, requested_nob);
1166                 return(-EPROTO);
1167         }
1168
1169         return (0);
1170 }
1171
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1173 {
1174         if (p1->flag != p2->flag) {
1175                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1176                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1177
1178                 /* warn if we try to combine flags that we don't know to be
1179                  * safe to combine */
1180                 if ((p1->flag & mask) != (p2->flag & mask))
1181                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1182                                "same brw?\n", p1->flag, p2->flag);
1183                 return 0;
1184         }
1185
1186         return (p1->off + p1->count == p2->off);
1187 }
1188
1189 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1190                                    struct brw_page **pga, int opc,
1191                                    cksum_type_t cksum_type)
1192 {
1193         __u32 cksum;
1194         int i = 0;
1195
1196         LASSERT (pg_count > 0);
1197         cksum = init_checksum(cksum_type);
1198         while (nob > 0 && pg_count > 0) {
1199                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1200                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1201                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1202
1203                 /* corrupt the data before we compute the checksum, to
1204                  * simulate an OST->client data error */
1205                 if (i == 0 && opc == OST_READ &&
1206                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1207                         memcpy(ptr + off, "bad1", min(4, nob));
1208                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1209                 cfs_kunmap(pga[i]->pg);
1210                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1211                                off, cksum);
1212
1213                 nob -= pga[i]->count;
1214                 pg_count--;
1215                 i++;
1216         }
1217         /* For sending we only compute the wrong checksum instead
1218          * of corrupting the data so it is still correct on a redo */
1219         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1220                 cksum++;
1221
1222         return cksum;
1223 }
1224
1225 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1226                                 struct lov_stripe_md *lsm, obd_count page_count,
1227                                 struct brw_page **pga,
1228                                 struct ptlrpc_request **reqp,
1229                                 struct obd_capa *ocapa, int reserve)
1230 {
1231         struct ptlrpc_request   *req;
1232         struct ptlrpc_bulk_desc *desc;
1233         struct ost_body         *body;
1234         struct obd_ioobj        *ioobj;
1235         struct niobuf_remote    *niobuf;
1236         int niocount, i, requested_nob, opc, rc;
1237         struct osc_brw_async_args *aa;
1238         struct req_capsule      *pill;
1239         struct brw_page *pg_prev;
1240
1241         ENTRY;
1242         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1243                 RETURN(-ENOMEM); /* Recoverable */
1244         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1245                 RETURN(-EINVAL); /* Fatal */
1246
1247         if ((cmd & OBD_BRW_WRITE) != 0) {
1248                 opc = OST_WRITE;
1249                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1250                                                 cli->cl_import->imp_rq_pool,
1251                                                 &RQF_OST_BRW);
1252         } else {
1253                 opc = OST_READ;
1254                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1255         }
1256         if (req == NULL)
1257                 RETURN(-ENOMEM);
1258
1259         for (niocount = i = 1; i < page_count; i++) {
1260                 if (!can_merge_pages(pga[i - 1], pga[i]))
1261                         niocount++;
1262         }
1263
1264         pill = &req->rq_pill;
1265         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1266                              sizeof(*ioobj));
1267         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268                              niocount * sizeof(*niobuf));
1269         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1270
1271         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1272         if (rc) {
1273                 ptlrpc_request_free(req);
1274                 RETURN(rc);
1275         }
1276         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1277         ptlrpc_at_set_req_timeout(req);
1278
1279         if (opc == OST_WRITE)
1280                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1281                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1282         else
1283                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1284                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1285
1286         if (desc == NULL)
1287                 GOTO(out, rc = -ENOMEM);
1288         /* NB request now owns desc and will free it when it gets freed */
1289
1290         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1294
1295         lustre_set_wire_obdo(&body->oa, oa);
1296
1297         obdo_to_ioobj(oa, ioobj);
1298         ioobj->ioo_bufcnt = niocount;
1299         osc_pack_capa(req, body, ocapa);
1300         LASSERT (page_count > 0);
1301         pg_prev = pga[0];
1302         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1303                 struct brw_page *pg = pga[i];
1304
1305                 LASSERT(pg->count > 0);
1306                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1307                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1308                          pg->off, pg->count);
1309 #ifdef __linux__
1310                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1311                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1312                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1313                          i, page_count,
1314                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1315                          pg_prev->pg, page_private(pg_prev->pg),
1316                          pg_prev->pg->index, pg_prev->off);
1317 #else
1318                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                          "i %d p_c %u\n", i, page_count);
1320 #endif
1321                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1322                         (pg->flag & OBD_BRW_SRVLOCK));
1323
1324                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1325                                       pg->count);
1326                 requested_nob += pg->count;
1327
1328                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1329                         niobuf--;
1330                         niobuf->len += pg->count;
1331                 } else {
1332                         niobuf->offset = pg->off;
1333                         niobuf->len    = pg->count;
1334                         niobuf->flags  = pg->flag;
1335                 }
1336                 pg_prev = pg;
1337         }
1338
1339         LASSERTF((void *)(niobuf - niocount) ==
1340                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1341                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1342                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1343
1344         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1345         if (osc_should_shrink_grant(cli))
1346                 osc_shrink_grant_local(cli, &body->oa);
1347
1348         /* size[REQ_REC_OFF] still sizeof (*body) */
1349         if (opc == OST_WRITE) {
1350                 if (unlikely(cli->cl_checksum) &&
1351                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1352                         /* store cl_cksum_type in a local variable since
1353                          * it can be changed via lprocfs */
1354                         cksum_type_t cksum_type = cli->cl_cksum_type;
1355
1356                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1357                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1358                                 body->oa.o_flags = 0;
1359                         }
1360                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1361                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1362                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1363                                                              page_count, pga,
1364                                                              OST_WRITE,
1365                                                              cksum_type);
1366                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1367                                body->oa.o_cksum);
1368                         /* save this in 'oa', too, for later checking */
1369                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1370                         oa->o_flags |= cksum_type_pack(cksum_type);
1371                 } else {
1372                         /* clear out the checksum flag, in case this is a
1373                          * resend but cl_checksum is no longer set. b=11238 */
1374                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1375                 }
1376                 oa->o_cksum = body->oa.o_cksum;
1377                 /* 1 RC per niobuf */
1378                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1379                                      sizeof(__u32) * niocount);
1380         } else {
1381                 if (unlikely(cli->cl_checksum) &&
1382                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1383                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1384                                 body->oa.o_flags = 0;
1385                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1386                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1387                 }
1388                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
1389                 /* 1 RC for the whole I/O */
1390         }
1391         ptlrpc_request_set_replen(req);
1392
1393         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1394         aa = ptlrpc_req_async_args(req);
1395         aa->aa_oa = oa;
1396         aa->aa_requested_nob = requested_nob;
1397         aa->aa_nio_count = niocount;
1398         aa->aa_page_count = page_count;
1399         aa->aa_resends = 0;
1400         aa->aa_ppga = pga;
1401         aa->aa_cli = cli;
1402         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1403         if (ocapa && reserve)
1404                 aa->aa_ocapa = capa_get(ocapa);
1405
1406         *reqp = req;
1407         RETURN(0);
1408
1409  out:
1410         ptlrpc_req_finished(req);
1411         RETURN(rc);
1412 }
1413
1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum, int nob,
1416                                 obd_count page_count, struct brw_page **pga,
1417                                 cksum_type_t client_cksum_type)
1418 {
1419         __u32 new_cksum;
1420         char *msg;
1421         cksum_type_t cksum_type;
1422
1423         if (server_cksum == client_cksum) {
1424                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425                 return 0;
1426         }
1427
1428         if (oa->o_valid & OBD_MD_FLFLAGS)
1429                 cksum_type = cksum_type_unpack(oa->o_flags);
1430         else
1431                 cksum_type = OBD_CKSUM_CRC32;
1432
1433         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1434                                       cksum_type);
1435
1436         if (cksum_type != client_cksum_type)
1437                 msg = "the server did not use the checksum type specified in "
1438                       "the original request - likely a protocol problem";
1439         else if (new_cksum == server_cksum)
1440                 msg = "changed on the client after we checksummed it - "
1441                       "likely false positive due to mmap IO (bug 11742)";
1442         else if (new_cksum == client_cksum)
1443                 msg = "changed in transit before arrival at OST";
1444         else
1445                 msg = "changed in transit AND doesn't match the original - "
1446                       "likely false positive due to mmap IO (bug 11742)";
1447
1448         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1449                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1450                            "["LPU64"-"LPU64"]\n",
1451                            msg, libcfs_nid2str(peer->nid),
1452                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1453                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1454                                                         (__u64)0,
1455                            oa->o_id,
1456                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1457                            pga[0]->off,
1458                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1459         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1460                "client csum now %x\n", client_cksum, client_cksum_type,
1461                server_cksum, cksum_type, new_cksum);
1462         return 1;
1463 }
1464
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1467 {
1468         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469         const lnet_process_id_t *peer =
1470                         &req->rq_import->imp_connection->c_peer;
1471         struct client_obd *cli = aa->aa_cli;
1472         struct ost_body *body;
1473         __u32 client_cksum = 0;
1474         ENTRY;
1475
1476         if (rc < 0 && rc != -EDQUOT)
1477                 RETURN(rc);
1478
1479         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1481         if (body == NULL) {
1482                 CDEBUG(D_INFO, "Can't unpack body\n");
1483                 RETURN(-EPROTO);
1484         }
1485
1486         /* set/clear over quota flag for a uid/gid */
1487         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1488             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1489                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1490
1491                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1492                              body->oa.o_flags);
1493         }
1494
1495         if (rc < 0)
1496                 RETURN(rc);
1497
1498         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1499                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1500
1501         osc_update_grant(cli, body);
1502
1503         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1504                 if (rc > 0) {
1505                         CERROR("Unexpected +ve rc %d\n", rc);
1506                         RETURN(-EPROTO);
1507                 }
1508                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1509
1510                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1511                         RETURN(-EAGAIN);
1512
1513                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1514                     check_write_checksum(&body->oa, peer, client_cksum,
1515                                          body->oa.o_cksum, aa->aa_requested_nob,
1516                                          aa->aa_page_count, aa->aa_ppga,
1517                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1518                         RETURN(-EAGAIN);
1519
1520                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1521                                      aa->aa_page_count, aa->aa_ppga);
1522                 GOTO(out, rc);
1523         }
1524
1525         /* The rest of this function executes only for OST_READs */
1526
1527         /* if unwrap_bulk failed, return -EAGAIN to retry */
1528         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1529         if (rc < 0)
1530                 GOTO(out, rc = -EAGAIN);
1531
1532         if (rc > aa->aa_requested_nob) {
1533                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534                        aa->aa_requested_nob);
1535                 RETURN(-EPROTO);
1536         }
1537
1538         if (rc != req->rq_bulk->bd_nob_transferred) {
1539                 CERROR ("Unexpected rc %d (%d transferred)\n",
1540                         rc, req->rq_bulk->bd_nob_transferred);
1541                 return (-EPROTO);
1542         }
1543
1544         if (rc < aa->aa_requested_nob)
1545                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546
1547         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548                 static int cksum_counter;
1549                 __u32      server_cksum = body->oa.o_cksum;
1550                 char      *via;
1551                 char      *router;
1552                 cksum_type_t cksum_type;
1553
1554                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1555                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1556                 else
1557                         cksum_type = OBD_CKSUM_CRC32;
1558                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1559                                                  aa->aa_ppga, OST_READ,
1560                                                  cksum_type);
1561
1562                 if (peer->nid == req->rq_bulk->bd_sender) {
1563                         via = router = "";
1564                 } else {
1565                         via = " via ";
1566                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1567                 }
1568
1569                 if (server_cksum == ~0 && rc > 0) {
1570                         CERROR("Protocol error: server %s set the 'checksum' "
1571                                "bit, but didn't send a checksum.  Not fatal, "
1572                                "but please notify on http://bugzilla.lustre.org/\n",
1573                                libcfs_nid2str(peer->nid));
1574                 } else if (server_cksum != client_cksum) {
1575                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1576                                            "%s%s%s inum "LPU64"/"LPU64" object "
1577                                            LPU64"/"LPU64" extent "
1578                                            "["LPU64"-"LPU64"]\n",
1579                                            req->rq_import->imp_obd->obd_name,
1580                                            libcfs_nid2str(peer->nid),
1581                                            via, router,
1582                                            body->oa.o_valid & OBD_MD_FLFID ?
1583                                                 body->oa.o_fid : (__u64)0,
1584                                            body->oa.o_valid & OBD_MD_FLFID ?
1585                                                 body->oa.o_generation :(__u64)0,
1586                                            body->oa.o_id,
1587                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1588                                                 body->oa.o_gr : (__u64)0,
1589                                            aa->aa_ppga[0]->off,
1590                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1591                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1592                                                                         1);
1593                         CERROR("client %x, server %x, cksum_type %x\n",
1594                                client_cksum, server_cksum, cksum_type);
1595                         cksum_counter = 0;
1596                         aa->aa_oa->o_cksum = client_cksum;
1597                         rc = -EAGAIN;
1598                 } else {
1599                         cksum_counter++;
1600                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1601                         rc = 0;
1602                 }
1603         } else if (unlikely(client_cksum)) {
1604                 static int cksum_missed;
1605
1606                 cksum_missed++;
1607                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1608                         CERROR("Checksum %u requested from %s but not sent\n",
1609                                cksum_missed, libcfs_nid2str(peer->nid));
1610         } else {
1611                 rc = 0;
1612         }
1613 out:
1614         if (rc >= 0)
1615                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1616
1617         RETURN(rc);
1618 }
1619
1620 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1621                             struct lov_stripe_md *lsm,
1622                             obd_count page_count, struct brw_page **pga,
1623                             struct obd_capa *ocapa)
1624 {
1625         struct ptlrpc_request *req;
1626         int                    rc;
1627         cfs_waitq_t            waitq;
1628         int                    resends = 0;
1629         struct l_wait_info     lwi;
1630
1631         ENTRY;
1632
1633         cfs_waitq_init(&waitq);
1634
1635 restart_bulk:
1636         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1637                                   page_count, pga, &req, ocapa, 0);
1638         if (rc != 0)
1639                 return (rc);
1640
1641         rc = ptlrpc_queue_wait(req);
1642
1643         if (rc == -ETIMEDOUT && req->rq_resend) {
1644                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1645                 ptlrpc_req_finished(req);
1646                 goto restart_bulk;
1647         }
1648
1649         rc = osc_brw_fini_request(req, rc);
1650
1651         ptlrpc_req_finished(req);
1652         if (osc_recoverable_error(rc)) {
1653                 resends++;
1654                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1655                         CERROR("too many resend retries, returning error\n");
1656                         RETURN(-EIO);
1657                 }
1658
1659                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1660                 l_wait_event(waitq, 0, &lwi);
1661
1662                 goto restart_bulk;
1663         }
1664
1665         RETURN (rc);
1666 }
1667
1668 int osc_brw_redo_request(struct ptlrpc_request *request,
1669                          struct osc_brw_async_args *aa)
1670 {
1671         struct ptlrpc_request *new_req;
1672         struct ptlrpc_request_set *set = request->rq_set;
1673         struct osc_brw_async_args *new_aa;
1674         struct osc_async_page *oap;
1675         int rc = 0;
1676         ENTRY;
1677
1678         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1679                 CERROR("too many resend retries, returning error\n");
1680                 RETURN(-EIO);
1681         }
1682
1683         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1684
1685         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1686                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1687                                   aa->aa_cli, aa->aa_oa,
1688                                   NULL /* lsm unused by osc currently */,
1689                                   aa->aa_page_count, aa->aa_ppga,
1690                                   &new_req, aa->aa_ocapa, 0);
1691         if (rc)
1692                 RETURN(rc);
1693
1694         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1695
1696         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1697                 if (oap->oap_request != NULL) {
1698                         LASSERTF(request == oap->oap_request,
1699                                  "request %p != oap_request %p\n",
1700                                  request, oap->oap_request);
1701                         if (oap->oap_interrupted) {
1702                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1703                                 ptlrpc_req_finished(new_req);
1704                                 RETURN(-EINTR);
1705                         }
1706                 }
1707         }
1708         /* New request takes over pga and oaps from old request.
1709          * Note that copying a list_head doesn't work, need to move it... */
1710         aa->aa_resends++;
1711         new_req->rq_interpret_reply = request->rq_interpret_reply;
1712         new_req->rq_async_args = request->rq_async_args;
1713         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1714
1715         new_aa = ptlrpc_req_async_args(new_req);
1716
1717         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1718         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1719         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1720
1721         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1722                 if (oap->oap_request) {
1723                         ptlrpc_req_finished(oap->oap_request);
1724                         oap->oap_request = ptlrpc_request_addref(new_req);
1725                 }
1726         }
1727
1728         new_aa->aa_ocapa = aa->aa_ocapa;
1729         aa->aa_ocapa = NULL;
1730
1731         /* use ptlrpc_set_add_req is safe because interpret functions work
1732          * in check_set context. only one way exist with access to request
1733          * from different thread got -EINTR - this way protected with
1734          * cl_loi_list_lock */
1735         ptlrpc_set_add_req(set, new_req);
1736
1737         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1738
1739         DEBUG_REQ(D_INFO, new_req, "new request");
1740         RETURN(0);
1741 }
1742
1743 /*
1744  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1745  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1746  * fine for our small page arrays and doesn't require allocation.  its an
1747  * insertion sort that swaps elements that are strides apart, shrinking the
1748  * stride down until its '1' and the array is sorted.
1749  */
1750 static void sort_brw_pages(struct brw_page **array, int num)
1751 {
1752         int stride, i, j;
1753         struct brw_page *tmp;
1754
1755         if (num == 1)
1756                 return;
1757         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1758                 ;
1759
1760         do {
1761                 stride /= 3;
1762                 for (i = stride ; i < num ; i++) {
1763                         tmp = array[i];
1764                         j = i;
1765                         while (j >= stride && array[j - stride]->off > tmp->off) {
1766                                 array[j] = array[j - stride];
1767                                 j -= stride;
1768                         }
1769                         array[j] = tmp;
1770                 }
1771         } while (stride > 1);
1772 }
1773
1774 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1775 {
1776         int count = 1;
1777         int offset;
1778         int i = 0;
1779
1780         LASSERT (pages > 0);
1781         offset = pg[i]->off & ~CFS_PAGE_MASK;
1782
1783         for (;;) {
1784                 pages--;
1785                 if (pages == 0)         /* that's all */
1786                         return count;
1787
1788                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1789                         return count;   /* doesn't end on page boundary */
1790
1791                 i++;
1792                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1793                 if (offset != 0)        /* doesn't start on page boundary */
1794                         return count;
1795
1796                 count++;
1797         }
1798 }
1799
1800 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1801 {
1802         struct brw_page **ppga;
1803         int i;
1804
1805         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1806         if (ppga == NULL)
1807                 return NULL;
1808
1809         for (i = 0; i < count; i++)
1810                 ppga[i] = pga + i;
1811         return ppga;
1812 }
1813
1814 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1815 {
1816         LASSERT(ppga != NULL);
1817         OBD_FREE(ppga, sizeof(*ppga) * count);
1818 }
1819
1820 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1821                    obd_count page_count, struct brw_page *pga,
1822                    struct obd_trans_info *oti)
1823 {
1824         struct obdo *saved_oa = NULL;
1825         struct brw_page **ppga, **orig;
1826         struct obd_import *imp = class_exp2cliimp(exp);
1827         struct client_obd *cli;
1828         int rc, page_count_orig;
1829         ENTRY;
1830
1831         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1832         cli = &imp->imp_obd->u.cli;
1833
1834         if (cmd & OBD_BRW_CHECK) {
1835                 /* The caller just wants to know if there's a chance that this
1836                  * I/O can succeed */
1837
1838                 if (imp->imp_invalid)
1839                         RETURN(-EIO);
1840                 RETURN(0);
1841         }
1842
1843         /* test_brw with a failed create can trip this, maybe others. */
1844         LASSERT(cli->cl_max_pages_per_rpc);
1845
1846         rc = 0;
1847
1848         orig = ppga = osc_build_ppga(pga, page_count);
1849         if (ppga == NULL)
1850                 RETURN(-ENOMEM);
1851         page_count_orig = page_count;
1852
1853         sort_brw_pages(ppga, page_count);
1854         while (page_count) {
1855                 obd_count pages_per_brw;
1856
1857                 if (page_count > cli->cl_max_pages_per_rpc)
1858                         pages_per_brw = cli->cl_max_pages_per_rpc;
1859                 else
1860                         pages_per_brw = page_count;
1861
1862                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1863
1864                 if (saved_oa != NULL) {
1865                         /* restore previously saved oa */
1866                         *oinfo->oi_oa = *saved_oa;
1867                 } else if (page_count > pages_per_brw) {
1868                         /* save a copy of oa (brw will clobber it) */
1869                         OBDO_ALLOC(saved_oa);
1870                         if (saved_oa == NULL)
1871                                 GOTO(out, rc = -ENOMEM);
1872                         *saved_oa = *oinfo->oi_oa;
1873                 }
1874
1875                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1876                                       pages_per_brw, ppga, oinfo->oi_capa);
1877
1878                 if (rc != 0)
1879                         break;
1880
1881                 page_count -= pages_per_brw;
1882                 ppga += pages_per_brw;
1883         }
1884
1885 out:
1886         osc_release_ppga(orig, page_count_orig);
1887
1888         if (saved_oa != NULL)
1889                 OBDO_FREE(saved_oa);
1890
1891         RETURN(rc);
1892 }
1893
1894 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1895  * the dirty accounting.  Writeback completes or truncate happens before
1896  * writing starts.  Must be called with the loi lock held. */
1897 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1898                            int sent)
1899 {
1900         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1901 }
1902
1903
1904 /* This maintains the lists of pending pages to read/write for a given object
1905  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1906  * to quickly find objects that are ready to send an RPC. */
1907 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1908                          int cmd)
1909 {
1910         int optimal;
1911         ENTRY;
1912
1913         if (lop->lop_num_pending == 0)
1914                 RETURN(0);
1915
1916         /* if we have an invalid import we want to drain the queued pages
1917          * by forcing them through rpcs that immediately fail and complete
1918          * the pages.  recovery relies on this to empty the queued pages
1919          * before canceling the locks and evicting down the llite pages */
1920         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1921                 RETURN(1);
1922
1923         /* stream rpcs in queue order as long as as there is an urgent page
1924          * queued.  this is our cheap solution for good batching in the case
1925          * where writepage marks some random page in the middle of the file
1926          * as urgent because of, say, memory pressure */
1927         if (!list_empty(&lop->lop_urgent)) {
1928                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1929                 RETURN(1);
1930         }
1931         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1932         optimal = cli->cl_max_pages_per_rpc;
1933         if (cmd & OBD_BRW_WRITE) {
1934                 /* trigger a write rpc stream as long as there are dirtiers
1935                  * waiting for space.  as they're waiting, they're not going to
1936                  * create more pages to coallesce with what's waiting.. */
1937                 if (!list_empty(&cli->cl_cache_waiters)) {
1938                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1939                         RETURN(1);
1940                 }
1941                 /* +16 to avoid triggering rpcs that would want to include pages
1942                  * that are being queued but which can't be made ready until
1943                  * the queuer finishes with the page. this is a wart for
1944                  * llite::commit_write() */
1945                 optimal += 16;
1946         }
1947         if (lop->lop_num_pending >= optimal)
1948                 RETURN(1);
1949
1950         RETURN(0);
1951 }
1952
1953 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1954 {
1955         struct osc_async_page *oap;
1956         ENTRY;
1957
1958         if (list_empty(&lop->lop_urgent))
1959                 RETURN(0);
1960
1961         oap = list_entry(lop->lop_urgent.next,
1962                          struct osc_async_page, oap_urgent_item);
1963
1964         if (oap->oap_async_flags & ASYNC_HP) {
1965                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1966                 RETURN(1);
1967         }
1968
1969         RETURN(0);
1970 }
1971
1972 static void on_list(struct list_head *item, struct list_head *list,
1973                     int should_be_on)
1974 {
1975         if (list_empty(item) && should_be_on)
1976                 list_add_tail(item, list);
1977         else if (!list_empty(item) && !should_be_on)
1978                 list_del_init(item);
1979 }
1980
1981 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1982  * can find pages to build into rpcs quickly */
1983 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1984 {
1985         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1986             lop_makes_hprpc(&loi->loi_read_lop)) {
1987                 /* HP rpc */
1988                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1989                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1990         } else {
1991                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1992                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1993                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1994                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1995         }
1996
1997         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1998                 loi->loi_write_lop.lop_num_pending);
1999
2000         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2001                 loi->loi_read_lop.lop_num_pending);
2002 }
2003
2004 static void lop_update_pending(struct client_obd *cli,
2005                                struct loi_oap_pages *lop, int cmd, int delta)
2006 {
2007         lop->lop_num_pending += delta;
2008         if (cmd & OBD_BRW_WRITE)
2009                 cli->cl_pending_w_pages += delta;
2010         else
2011                 cli->cl_pending_r_pages += delta;
2012 }
2013
2014 /**
2015  * this is called when a sync waiter receives an interruption.  Its job is to
2016  * get the caller woken as soon as possible.  If its page hasn't been put in an
2017  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2018  * desiring interruption which will forcefully complete the rpc once the rpc
2019  * has timed out.
2020  */
2021 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2022 {
2023         struct loi_oap_pages *lop;
2024         struct lov_oinfo *loi;
2025         int rc = -EBUSY;
2026         ENTRY;
2027
2028         LASSERT(!oap->oap_interrupted);
2029         oap->oap_interrupted = 1;
2030
2031         /* ok, it's been put in an rpc. only one oap gets a request reference */
2032         if (oap->oap_request != NULL) {
2033                 ptlrpc_mark_interrupted(oap->oap_request);
2034                 ptlrpcd_wake(oap->oap_request);
2035                 ptlrpc_req_finished(oap->oap_request);
2036                 oap->oap_request = NULL;
2037         }
2038
2039         /*
2040          * page completion may be called only if ->cpo_prep() method was
2041          * executed by osc_io_submit(), that also adds page the to pending list
2042          */
2043         if (!list_empty(&oap->oap_pending_item)) {
2044                 list_del_init(&oap->oap_pending_item);
2045                 list_del_init(&oap->oap_urgent_item);
2046
2047                 loi = oap->oap_loi;
2048                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2049                         &loi->loi_write_lop : &loi->loi_read_lop;
2050                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2051                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2052                 rc = oap->oap_caller_ops->ap_completion(env,
2053                                           oap->oap_caller_data,
2054                                           oap->oap_cmd, NULL, -EINTR);
2055         }
2056
2057         RETURN(rc);
2058 }
2059
2060 /* this is trying to propogate async writeback errors back up to the
2061  * application.  As an async write fails we record the error code for later if
2062  * the app does an fsync.  As long as errors persist we force future rpcs to be
2063  * sync so that the app can get a sync error and break the cycle of queueing
2064  * pages for which writeback will fail. */
2065 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2066                            int rc)
2067 {
2068         if (rc) {
2069                 if (!ar->ar_rc)
2070                         ar->ar_rc = rc;
2071
2072                 ar->ar_force_sync = 1;
2073                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2074                 return;
2075
2076         }
2077
2078         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2079                 ar->ar_force_sync = 0;
2080 }
2081
2082 void osc_oap_to_pending(struct osc_async_page *oap)
2083 {
2084         struct loi_oap_pages *lop;
2085
2086         if (oap->oap_cmd & OBD_BRW_WRITE)
2087                 lop = &oap->oap_loi->loi_write_lop;
2088         else
2089                 lop = &oap->oap_loi->loi_read_lop;
2090
2091         if (oap->oap_async_flags & ASYNC_HP)
2092                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2093         else if (oap->oap_async_flags & ASYNC_URGENT)
2094                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2095         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2096         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2097 }
2098
2099 /* this must be called holding the loi list lock to give coverage to exit_cache,
2100  * async_flag maintenance, and oap_request */
2101 static void osc_ap_completion(const struct lu_env *env,
2102                               struct client_obd *cli, struct obdo *oa,
2103                               struct osc_async_page *oap, int sent, int rc)
2104 {
2105         __u64 xid = 0;
2106
2107         ENTRY;
2108         if (oap->oap_request != NULL) {
2109                 xid = ptlrpc_req_xid(oap->oap_request);
2110                 ptlrpc_req_finished(oap->oap_request);
2111                 oap->oap_request = NULL;
2112         }
2113
2114         spin_lock(&oap->oap_lock);
2115         oap->oap_async_flags = 0;
2116         spin_unlock(&oap->oap_lock);
2117         oap->oap_interrupted = 0;
2118
2119         if (oap->oap_cmd & OBD_BRW_WRITE) {
2120                 osc_process_ar(&cli->cl_ar, xid, rc);
2121                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2122         }
2123
2124         if (rc == 0 && oa != NULL) {
2125                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2126                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2127                 if (oa->o_valid & OBD_MD_FLMTIME)
2128                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2129                 if (oa->o_valid & OBD_MD_FLATIME)
2130                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2131                 if (oa->o_valid & OBD_MD_FLCTIME)
2132                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2133         }
2134
2135         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2136                                                 oap->oap_cmd, oa, rc);
2137
2138         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2139          * I/O on the page could start, but OSC calls it under lock
2140          * and thus we can add oap back to pending safely */
2141         if (rc)
2142                 /* upper layer wants to leave the page on pending queue */
2143                 osc_oap_to_pending(oap);
2144         else
2145                 osc_exit_cache(cli, oap, sent);
2146         EXIT;
2147 }
2148
2149 static int brw_interpret(const struct lu_env *env,
2150                          struct ptlrpc_request *req, void *data, int rc)
2151 {
2152         struct osc_brw_async_args *aa = data;
2153         struct client_obd *cli;
2154         int async;
2155         ENTRY;
2156
2157         rc = osc_brw_fini_request(req, rc);
2158         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2159         if (osc_recoverable_error(rc)) {
2160                 rc = osc_brw_redo_request(req, aa);
2161                 if (rc == 0)
2162                         RETURN(0);
2163         }
2164
2165         if (aa->aa_ocapa) {
2166                 capa_put(aa->aa_ocapa);
2167                 aa->aa_ocapa = NULL;
2168         }
2169
2170         cli = aa->aa_cli;
2171
2172         client_obd_list_lock(&cli->cl_loi_list_lock);
2173
2174         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2175          * is called so we know whether to go to sync BRWs or wait for more
2176          * RPCs to complete */
2177         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2178                 cli->cl_w_in_flight--;
2179         else
2180                 cli->cl_r_in_flight--;
2181
2182         async = list_empty(&aa->aa_oaps);
2183         if (!async) { /* from osc_send_oap_rpc() */
2184                 struct osc_async_page *oap, *tmp;
2185                 /* the caller may re-use the oap after the completion call so
2186                  * we need to clean it up a little */
2187                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2188                         list_del_init(&oap->oap_rpc_item);
2189                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2190                 }
2191                 OBDO_FREE(aa->aa_oa);
2192         } else { /* from async_internal() */
2193                 int i;
2194                 for (i = 0; i < aa->aa_page_count; i++)
2195                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2196
2197                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2198                         OBDO_FREE(aa->aa_oa);
2199         }
2200         osc_wake_cache_waiters(cli);
2201         osc_check_rpcs(env, cli);
2202         client_obd_list_unlock(&cli->cl_loi_list_lock);
2203         if (!async)
2204                 cl_req_completion(env, aa->aa_clerq, rc);
2205         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2206         RETURN(rc);
2207 }
2208
2209 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2210                                             struct client_obd *cli,
2211                                             struct list_head *rpc_list,
2212                                             int page_count, int cmd)
2213 {
2214         struct ptlrpc_request *req;
2215         struct brw_page **pga = NULL;
2216         struct osc_brw_async_args *aa;
2217         struct obdo *oa = NULL;
2218         const struct obd_async_page_ops *ops = NULL;
2219         void *caller_data = NULL;
2220         struct osc_async_page *oap;
2221         struct osc_async_page *tmp;
2222         struct ost_body *body;
2223         struct cl_req *clerq = NULL;
2224         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2225         struct ldlm_lock *lock = NULL;
2226         struct cl_req_attr crattr;
2227         int i, rc;
2228
2229         ENTRY;
2230         LASSERT(!list_empty(rpc_list));
2231
2232         memset(&crattr, 0, sizeof crattr);
2233         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2234         if (pga == NULL)
2235                 GOTO(out, req = ERR_PTR(-ENOMEM));
2236
2237         OBDO_ALLOC(oa);
2238         if (oa == NULL)
2239                 GOTO(out, req = ERR_PTR(-ENOMEM));
2240
2241         i = 0;
2242         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2243                 struct cl_page *page = osc_oap2cl_page(oap);
2244                 if (ops == NULL) {
2245                         ops = oap->oap_caller_ops;
2246                         caller_data = oap->oap_caller_data;
2247
2248                         clerq = cl_req_alloc(env, page, crt,
2249                                              1 /* only 1-object rpcs for
2250                                                 * now */);
2251                         if (IS_ERR(clerq))
2252                                 GOTO(out, req = (void *)clerq);
2253                         lock = oap->oap_ldlm_lock;
2254                 }
2255                 pga[i] = &oap->oap_brw_page;
2256                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2257                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2258                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2259                 i++;
2260                 cl_req_page_add(env, clerq, page);
2261         }
2262
2263         /* always get the data for the obdo for the rpc */
2264         LASSERT(ops != NULL);
2265         crattr.cra_oa = oa;
2266         crattr.cra_capa = NULL;
2267         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2268         if (lock) {
2269                 oa->o_handle = lock->l_remote_handle;
2270                 oa->o_valid |= OBD_MD_FLHANDLE;
2271         }
2272
2273         rc = cl_req_prep(env, clerq);
2274         if (rc != 0) {
2275                 CERROR("cl_req_prep failed: %d\n", rc);
2276                 GOTO(out, req = ERR_PTR(rc));
2277         }
2278
2279         sort_brw_pages(pga, page_count);
2280         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2281                                   pga, &req, crattr.cra_capa, 1);
2282         if (rc != 0) {
2283                 CERROR("prep_req failed: %d\n", rc);
2284                 GOTO(out, req = ERR_PTR(rc));
2285         }
2286
2287         /* Need to update the timestamps after the request is built in case
2288          * we race with setattr (locally or in queue at OST).  If OST gets
2289          * later setattr before earlier BRW (as determined by the request xid),
2290          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2291          * way to do this in a single call.  bug 10150 */
2292         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2293         cl_req_attr_set(env, clerq, &crattr,
2294                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2295
2296         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2297         aa = ptlrpc_req_async_args(req);
2298         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2299         list_splice(rpc_list, &aa->aa_oaps);
2300         CFS_INIT_LIST_HEAD(rpc_list);
2301         aa->aa_clerq = clerq;
2302 out:
2303         capa_put(crattr.cra_capa);
2304         if (IS_ERR(req)) {
2305                 if (oa)
2306                         OBDO_FREE(oa);
2307                 if (pga)
2308                         OBD_FREE(pga, sizeof(*pga) * page_count);
2309                 /* this should happen rarely and is pretty bad, it makes the
2310                  * pending list not follow the dirty order */
2311                 client_obd_list_lock(&cli->cl_loi_list_lock);
2312                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2313                         list_del_init(&oap->oap_rpc_item);
2314
2315                         /* queued sync pages can be torn down while the pages
2316                          * were between the pending list and the rpc */
2317                         if (oap->oap_interrupted) {
2318                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2319                                 osc_ap_completion(env, cli, NULL, oap, 0,
2320                                                   oap->oap_count);
2321                                 continue;
2322                         }
2323                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2324                 }
2325                 if (clerq && !IS_ERR(clerq))
2326                         cl_req_completion(env, clerq, PTR_ERR(req));
2327         }
2328         RETURN(req);
2329 }
2330
2331 /**
2332  * prepare pages for ASYNC io and put pages in send queue.
2333  *
2334  * \param cmd OBD_BRW_* macroses
2335  * \param lop pending pages
2336  *
2337  * \return zero if pages successfully add to send queue.
2338  * \return not zere if error occurring.
2339  */
2340 static int
2341 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2342                  struct lov_oinfo *loi,
2343                  int cmd, struct loi_oap_pages *lop)
2344 {
2345         struct ptlrpc_request *req;
2346         obd_count page_count = 0;
2347         struct osc_async_page *oap = NULL, *tmp;
2348         struct osc_brw_async_args *aa;
2349         const struct obd_async_page_ops *ops;
2350         CFS_LIST_HEAD(rpc_list);
2351         CFS_LIST_HEAD(tmp_list);
2352         unsigned int ending_offset;
2353         unsigned  starting_offset = 0;
2354         int srvlock = 0;
2355         struct cl_object *clob = NULL;
2356         ENTRY;
2357
2358         /* ASYNC_HP pages first. At present, when the lock the pages is
2359          * to be canceled, the pages covered by the lock will be sent out
2360          * with ASYNC_HP. We have to send out them as soon as possible. */
2361         list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2362                 if (oap->oap_async_flags & ASYNC_HP) 
2363                         list_move(&oap->oap_pending_item, &tmp_list);
2364                 else
2365                         list_move_tail(&oap->oap_pending_item, &tmp_list);
2366                 if (++page_count >= cli->cl_max_pages_per_rpc)
2367                         break;
2368         }
2369
2370         list_splice(&tmp_list, &lop->lop_pending);
2371         page_count = 0;
2372
2373         /* first we find the pages we're allowed to work with */
2374         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2375                                  oap_pending_item) {
2376                 ops = oap->oap_caller_ops;
2377
2378                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2379                          "magic 0x%x\n", oap, oap->oap_magic);
2380
2381                 if (clob == NULL) {
2382                         /* pin object in memory, so that completion call-backs
2383                          * can be safely called under client_obd_list lock. */
2384                         clob = osc_oap2cl_page(oap)->cp_obj;
2385                         cl_object_get(clob);
2386                 }
2387
2388                 if (page_count != 0 &&
2389                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2390                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2391                                " oap %p, page %p, srvlock %u\n",
2392                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2393                         break;
2394                 }
2395
2396                 /* If there is a gap at the start of this page, it can't merge
2397                  * with any previous page, so we'll hand the network a
2398                  * "fragmented" page array that it can't transfer in 1 RDMA */
2399                 if (page_count != 0 && oap->oap_page_off != 0)
2400                         break;
2401
2402                 /* in llite being 'ready' equates to the page being locked
2403                  * until completion unlocks it.  commit_write submits a page
2404                  * as not ready because its unlock will happen unconditionally
2405                  * as the call returns.  if we race with commit_write giving
2406                  * us that page we dont' want to create a hole in the page
2407                  * stream, so we stop and leave the rpc to be fired by
2408                  * another dirtier or kupdated interval (the not ready page
2409                  * will still be on the dirty list).  we could call in
2410                  * at the end of ll_file_write to process the queue again. */
2411                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2412                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2413                                                     cmd);
2414                         if (rc < 0)
2415                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2416                                                 "instead of ready\n", oap,
2417                                                 oap->oap_page, rc);
2418                         switch (rc) {
2419                         case -EAGAIN:
2420                                 /* llite is telling us that the page is still
2421                                  * in commit_write and that we should try
2422                                  * and put it in an rpc again later.  we
2423                                  * break out of the loop so we don't create
2424                                  * a hole in the sequence of pages in the rpc
2425                                  * stream.*/
2426                                 oap = NULL;
2427                                 break;
2428                         case -EINTR:
2429                                 /* the io isn't needed.. tell the checks
2430                                  * below to complete the rpc with EINTR */
2431                                 spin_lock(&oap->oap_lock);
2432                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2433                                 spin_unlock(&oap->oap_lock);
2434                                 oap->oap_count = -EINTR;
2435                                 break;
2436                         case 0:
2437                                 spin_lock(&oap->oap_lock);
2438                                 oap->oap_async_flags |= ASYNC_READY;
2439                                 spin_unlock(&oap->oap_lock);
2440                                 break;
2441                         default:
2442                                 LASSERTF(0, "oap %p page %p returned %d "
2443                                             "from make_ready\n", oap,
2444                                             oap->oap_page, rc);
2445                                 break;
2446                         }
2447                 }
2448                 if (oap == NULL)
2449                         break;
2450                 /*
2451                  * Page submitted for IO has to be locked. Either by
2452                  * ->ap_make_ready() or by higher layers.
2453                  */
2454 #if defined(__KERNEL__) && defined(__linux__)
2455                 {
2456                         struct cl_page *page;
2457
2458                         page = osc_oap2cl_page(oap);
2459
2460                         if (page->cp_type == CPT_CACHEABLE &&
2461                             !(PageLocked(oap->oap_page) &&
2462                               (CheckWriteback(oap->oap_page, cmd)))) {
2463                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2464                                        oap->oap_page,
2465                                        (long)oap->oap_page->flags,
2466                                        oap->oap_async_flags);
2467                                 LBUG();
2468                         }
2469                 }
2470 #endif
2471
2472                 /* take the page out of our book-keeping */
2473                 list_del_init(&oap->oap_pending_item);
2474                 lop_update_pending(cli, lop, cmd, -1);
2475                 list_del_init(&oap->oap_urgent_item);
2476
2477                 if (page_count == 0)
2478                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2479                                           (PTLRPC_MAX_BRW_SIZE - 1);
2480
2481                 /* ask the caller for the size of the io as the rpc leaves. */
2482                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2483                         oap->oap_count =
2484                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2485                                                       cmd);
2486                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2487                 }
2488                 if (oap->oap_count <= 0) {
2489                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2490                                oap->oap_count);
2491                         osc_ap_completion(env, cli, NULL,
2492                                           oap, 0, oap->oap_count);
2493                         continue;
2494                 }
2495
2496                 /* now put the page back in our accounting */
2497                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2498                 if (page_count == 0)
2499                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2500                 if (++page_count >= cli->cl_max_pages_per_rpc)
2501                         break;
2502
2503                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2504                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2505                  * have the same alignment as the initial writes that allocated
2506                  * extents on the server. */
2507                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2508                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2509                 if (ending_offset == 0)
2510                         break;
2511
2512                 /* If there is a gap at the end of this page, it can't merge
2513                  * with any subsequent pages, so we'll hand the network a
2514                  * "fragmented" page array that it can't transfer in 1 RDMA */
2515                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2516                         break;
2517         }
2518
2519         osc_wake_cache_waiters(cli);
2520
2521         loi_list_maint(cli, loi);
2522
2523         client_obd_list_unlock(&cli->cl_loi_list_lock);
2524
2525         if (clob != NULL)
2526                 cl_object_put(env, clob);
2527
2528         if (page_count == 0) {
2529                 client_obd_list_lock(&cli->cl_loi_list_lock);
2530                 RETURN(0);
2531         }
2532
2533         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2534         if (IS_ERR(req)) {
2535                 LASSERT(list_empty(&rpc_list));
2536                 loi_list_maint(cli, loi);
2537                 RETURN(PTR_ERR(req));
2538         }
2539
2540         aa = ptlrpc_req_async_args(req);
2541
2542         if (cmd == OBD_BRW_READ) {
2543                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2544                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2545                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2546                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2547         } else {
2548                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2549                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2550                                  cli->cl_w_in_flight);
2551                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2552                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2553         }
2554         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2555
2556         client_obd_list_lock(&cli->cl_loi_list_lock);
2557
2558         if (cmd == OBD_BRW_READ)
2559                 cli->cl_r_in_flight++;
2560         else
2561                 cli->cl_w_in_flight++;
2562
2563         /* queued sync pages can be torn down while the pages
2564          * were between the pending list and the rpc */
2565         tmp = NULL;
2566         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2567                 /* only one oap gets a request reference */
2568                 if (tmp == NULL)
2569                         tmp = oap;
2570                 if (oap->oap_interrupted && !req->rq_intr) {
2571                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2572                                oap, req);
2573                         ptlrpc_mark_interrupted(req);
2574                 }
2575         }
2576         if (tmp != NULL)
2577                 tmp->oap_request = ptlrpc_request_addref(req);
2578
2579         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2580                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2581
2582         req->rq_interpret_reply = brw_interpret;
2583         ptlrpcd_add_req(req, PSCOPE_BRW);
2584         RETURN(1);
2585 }
2586
2587 #define LOI_DEBUG(LOI, STR, args...)                                     \
2588         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2589                !list_empty(&(LOI)->loi_ready_item) ||                    \
2590                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2591                (LOI)->loi_write_lop.lop_num_pending,                     \
2592                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2593                (LOI)->loi_read_lop.lop_num_pending,                      \
2594                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2595                args)                                                     \
2596
2597 /* This is called by osc_check_rpcs() to find which objects have pages that
2598  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2599 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2600 {
2601         ENTRY;
2602
2603         /* First return objects that have blocked locks so that they
2604          * will be flushed quickly and other clients can get the lock,
2605          * then objects which have pages ready to be stuffed into RPCs */
2606         if (!list_empty(&cli->cl_loi_hp_ready_list))
2607                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2608                                   struct lov_oinfo, loi_hp_ready_item));
2609         if (!list_empty(&cli->cl_loi_ready_list))
2610                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2611                                   struct lov_oinfo, loi_ready_item));
2612
2613         /* then if we have cache waiters, return all objects with queued
2614          * writes.  This is especially important when many small files
2615          * have filled up the cache and not been fired into rpcs because
2616          * they don't pass the nr_pending/object threshhold */
2617         if (!list_empty(&cli->cl_cache_waiters) &&
2618             !list_empty(&cli->cl_loi_write_list))
2619                 RETURN(list_entry(cli->cl_loi_write_list.next,
2620                                   struct lov_oinfo, loi_write_item));
2621
2622         /* then return all queued objects when we have an invalid import
2623          * so that they get flushed */
2624         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2625                 if (!list_empty(&cli->cl_loi_write_list))
2626                         RETURN(list_entry(cli->cl_loi_write_list.next,
2627                                           struct lov_oinfo, loi_write_item));
2628                 if (!list_empty(&cli->cl_loi_read_list))
2629                         RETURN(list_entry(cli->cl_loi_read_list.next,
2630                                           struct lov_oinfo, loi_read_item));
2631         }
2632         RETURN(NULL);
2633 }
2634
2635 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2636 {
2637         struct osc_async_page *oap;
2638         int hprpc = 0;
2639
2640         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2641                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2642                                  struct osc_async_page, oap_urgent_item);
2643                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2644         }
2645
2646         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2647                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2648                                  struct osc_async_page, oap_urgent_item);
2649                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2650         }
2651
2652         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2653 }
2654
2655 /* called with the loi list lock held */
2656 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2657 {
2658         struct lov_oinfo *loi;
2659         int rc = 0, race_counter = 0;
2660         ENTRY;
2661
2662         while ((loi = osc_next_loi(cli)) != NULL) {
2663                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2664
2665                 if (osc_max_rpc_in_flight(cli, loi))
2666                         break;
2667
2668                 /* attempt some read/write balancing by alternating between
2669                  * reads and writes in an object.  The makes_rpc checks here
2670                  * would be redundant if we were getting read/write work items
2671                  * instead of objects.  we don't want send_oap_rpc to drain a
2672                  * partial read pending queue when we're given this object to
2673                  * do io on writes while there are cache waiters */
2674                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2675                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2676                                               &loi->loi_write_lop);
2677                         if (rc < 0) {
2678                                 CERROR("Write request failed with %d\n", rc);
2679
2680                                 /* osc_send_oap_rpc failed, mostly because of
2681                                  * memory pressure.
2682                                  *
2683                                  * It can't break here, because if:
2684                                  *  - a page was submitted by osc_io_submit, so
2685                                  *    page locked;
2686                                  *  - no request in flight
2687                                  *  - no subsequent request
2688                                  * The system will be in live-lock state,
2689                                  * because there is no chance to call
2690                                  * osc_io_unplug() and osc_check_rpcs() any
2691                                  * more. pdflush can't help in this case,
2692                                  * because it might be blocked at grabbing
2693                                  * the page lock as we mentioned.
2694                                  *
2695                                  * Anyway, continue to drain pages. */
2696                                 /* break; */
2697                         }
2698
2699                         if (rc > 0)
2700                                 race_counter = 0;
2701                         else
2702                                 race_counter++;
2703                 }
2704                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2705                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2706                                               &loi->loi_read_lop);
2707                         if (rc < 0)
2708                                 CERROR("Read request failed with %d\n", rc);
2709
2710                         if (rc > 0)
2711                                 race_counter = 0;
2712                         else
2713                                 race_counter++;
2714                 }
2715
2716                 /* attempt some inter-object balancing by issueing rpcs
2717                  * for each object in turn */
2718                 if (!list_empty(&loi->loi_hp_ready_item))
2719                         list_del_init(&loi->loi_hp_ready_item);
2720                 if (!list_empty(&loi->loi_ready_item))
2721                         list_del_init(&loi->loi_ready_item);
2722                 if (!list_empty(&loi->loi_write_item))
2723                         list_del_init(&loi->loi_write_item);
2724                 if (!list_empty(&loi->loi_read_item))
2725                         list_del_init(&loi->loi_read_item);
2726
2727                 loi_list_maint(cli, loi);
2728
2729                 /* send_oap_rpc fails with 0 when make_ready tells it to
2730                  * back off.  llite's make_ready does this when it tries
2731                  * to lock a page queued for write that is already locked.
2732                  * we want to try sending rpcs from many objects, but we
2733                  * don't want to spin failing with 0.  */
2734                 if (race_counter == 10)
2735                         break;
2736         }
2737         EXIT;
2738 }
2739
2740 /* we're trying to queue a page in the osc so we're subject to the
2741  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2742  * If the osc's queued pages are already at that limit, then we want to sleep
2743  * until there is space in the osc's queue for us.  We also may be waiting for
2744  * write credits from the OST if there are RPCs in flight that may return some
2745  * before we fall back to sync writes.
2746  *
2747  * We need this know our allocation was granted in the presence of signals */
2748 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2749 {
2750         int rc;
2751         ENTRY;
2752         client_obd_list_lock(&cli->cl_loi_list_lock);
2753         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2754         client_obd_list_unlock(&cli->cl_loi_list_lock);
2755         RETURN(rc);
2756 };
2757
2758 /**
2759  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2760  * is available.
2761  */
2762 int osc_enter_cache_try(const struct lu_env *env,
2763                         struct client_obd *cli, struct lov_oinfo *loi,
2764                         struct osc_async_page *oap, int transient)
2765 {
2766         int has_grant;
2767
2768         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2769         if (has_grant) {
2770                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2771                 if (transient) {
2772                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2773                         atomic_inc(&obd_dirty_transit_pages);
2774                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2775                 }
2776         }
2777         return has_grant;
2778 }
2779
2780 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2781  * grant or cache space. */
2782 static int osc_enter_cache(const struct lu_env *env,
2783                            struct client_obd *cli, struct lov_oinfo *loi,
2784                            struct osc_async_page *oap)
2785 {
2786         struct osc_cache_waiter ocw;
2787         struct l_wait_info lwi = { 0 };
2788
2789         ENTRY;
2790
2791         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2792                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2793                cli->cl_dirty_max, obd_max_dirty_pages,
2794                cli->cl_lost_grant, cli->cl_avail_grant);
2795
2796         /* force the caller to try sync io.  this can jump the list
2797          * of queued writes and create a discontiguous rpc stream */
2798         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2799             loi->loi_ar.ar_force_sync)
2800                 RETURN(-EDQUOT);
2801
2802         /* Hopefully normal case - cache space and write credits available */
2803         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2804             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2805             osc_enter_cache_try(env, cli, loi, oap, 0))
2806                 RETURN(0);
2807
2808         /* Make sure that there are write rpcs in flight to wait for.  This
2809          * is a little silly as this object may not have any pending but
2810          * other objects sure might. */
2811         if (cli->cl_w_in_flight) {
2812                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2813                 cfs_waitq_init(&ocw.ocw_waitq);
2814                 ocw.ocw_oap = oap;
2815                 ocw.ocw_rc = 0;
2816
2817                 loi_list_maint(cli, loi);
2818                 osc_check_rpcs(env, cli);
2819                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2820
2821                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2822                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2823
2824                 client_obd_list_lock(&cli->cl_loi_list_lock);
2825                 if (!list_empty(&ocw.ocw_entry)) {
2826                         list_del(&ocw.ocw_entry);
2827                         RETURN(-EINTR);
2828                 }
2829                 RETURN(ocw.ocw_rc);
2830         }
2831
2832         RETURN(-EDQUOT);
2833 }
2834
2835
2836 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2837                         struct lov_oinfo *loi, cfs_page_t *page,
2838                         obd_off offset, const struct obd_async_page_ops *ops,
2839                         void *data, void **res, int nocache,
2840                         struct lustre_handle *lockh)
2841 {
2842         struct osc_async_page *oap;
2843
2844         ENTRY;
2845
2846         if (!page)
2847                 return size_round(sizeof(*oap));
2848
2849         oap = *res;
2850         oap->oap_magic = OAP_MAGIC;
2851         oap->oap_cli = &exp->exp_obd->u.cli;
2852         oap->oap_loi = loi;
2853
2854         oap->oap_caller_ops = ops;
2855         oap->oap_caller_data = data;
2856
2857         oap->oap_page = page;
2858         oap->oap_obj_off = offset;
2859         if (!client_is_remote(exp) &&
2860             cfs_capable(CFS_CAP_SYS_RESOURCE))
2861                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2862
2863         LASSERT(!(offset & ~CFS_PAGE_MASK));
2864
2865         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2866         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2867         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2868         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2869
2870         spin_lock_init(&oap->oap_lock);
2871         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2872         RETURN(0);
2873 }
2874
2875 struct osc_async_page *oap_from_cookie(void *cookie)
2876 {
2877         struct osc_async_page *oap = cookie;
2878         if (oap->oap_magic != OAP_MAGIC)
2879                 return ERR_PTR(-EINVAL);
2880         return oap;
2881 };
2882
2883 int osc_queue_async_io(const struct lu_env *env,
2884                        struct obd_export *exp, struct lov_stripe_md *lsm,
2885                        struct lov_oinfo *loi, void *cookie,
2886                        int cmd, obd_off off, int count,
2887                        obd_flag brw_flags, enum async_flags async_flags)
2888 {
2889         struct client_obd *cli = &exp->exp_obd->u.cli;
2890         struct osc_async_page *oap;
2891         int rc = 0;
2892         ENTRY;
2893
2894         oap = oap_from_cookie(cookie);
2895         if (IS_ERR(oap))
2896                 RETURN(PTR_ERR(oap));
2897
2898         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2899                 RETURN(-EIO);
2900
2901         if (!list_empty(&oap->oap_pending_item) ||
2902             !list_empty(&oap->oap_urgent_item) ||
2903             !list_empty(&oap->oap_rpc_item))
2904                 RETURN(-EBUSY);
2905
2906         /* check if the file's owner/group is over quota */
2907         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2908                 struct cl_object *obj;
2909                 struct cl_attr    attr; /* XXX put attr into thread info */
2910                 unsigned int qid[MAXQUOTAS];
2911
2912                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2913
2914                 cl_object_attr_lock(obj);
2915                 rc = cl_object_attr_get(env, obj, &attr);
2916                 cl_object_attr_unlock(obj);
2917
2918                 qid[USRQUOTA] = attr.cat_uid;
2919                 qid[GRPQUOTA] = attr.cat_gid;
2920                 if (rc == 0 &&
2921                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2922                         rc = -EDQUOT;
2923                 if (rc)
2924                         RETURN(rc);
2925         }
2926
2927         if (loi == NULL)
2928                 loi = lsm->lsm_oinfo[0];
2929
2930         client_obd_list_lock(&cli->cl_loi_list_lock);
2931
2932         LASSERT(off + count <= CFS_PAGE_SIZE);
2933         oap->oap_cmd = cmd;
2934         oap->oap_page_off = off;
2935         oap->oap_count = count;
2936         oap->oap_brw_flags = brw_flags;
2937         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2938         if (libcfs_memory_pressure_get())
2939                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2940         spin_lock(&oap->oap_lock);
2941         oap->oap_async_flags = async_flags;
2942         spin_unlock(&oap->oap_lock);
2943
2944         if (cmd & OBD_BRW_WRITE) {
2945                 rc = osc_enter_cache(env, cli, loi, oap);
2946                 if (rc) {
2947                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2948                         RETURN(rc);
2949                 }
2950         }
2951
2952         osc_oap_to_pending(oap);
2953         loi_list_maint(cli, loi);
2954
2955         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2956                   cmd);
2957
2958         osc_check_rpcs(env, cli);
2959         client_obd_list_unlock(&cli->cl_loi_list_lock);
2960
2961         RETURN(0);
2962 }
2963
2964 /* aka (~was & now & flag), but this is more clear :) */
2965 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2966
2967 int osc_set_async_flags_base(struct client_obd *cli,
2968                              struct lov_oinfo *loi, struct osc_async_page *oap,
2969                              obd_flag async_flags)
2970 {
2971         struct loi_oap_pages *lop;
2972         int flags = 0;
2973         ENTRY;
2974
2975         LASSERT(!list_empty(&oap->oap_pending_item));
2976
2977         if (oap->oap_cmd & OBD_BRW_WRITE) {
2978                 lop = &loi->loi_write_lop;
2979         } else {
2980                 lop = &loi->loi_read_lop;
2981         }
2982
2983         if ((oap->oap_async_flags & async_flags) == async_flags)
2984                 RETURN(0);
2985
2986         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2987                 flags |= ASYNC_READY;
2988
2989         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2990             list_empty(&oap->oap_rpc_item)) {
2991                 if (oap->oap_async_flags & ASYNC_HP)
2992                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2993                 else
2994                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2995                 flags |= ASYNC_URGENT;
2996                 loi_list_maint(cli, loi);
2997         }
2998         spin_lock(&oap->oap_lock);
2999         oap->oap_async_flags |= flags;
3000         spin_unlock(&oap->oap_lock);
3001
3002         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3003                         oap->oap_async_flags);
3004         RETURN(0);
3005 }
3006
3007 int osc_teardown_async_page(struct obd_export *exp,
3008                             struct lov_stripe_md *lsm,
3009                             struct lov_oinfo *loi, void *cookie)
3010 {
3011         struct client_obd *cli = &exp->exp_obd->u.cli;
3012         struct loi_oap_pages *lop;
3013         struct osc_async_page *oap;
3014         int rc = 0;
3015         ENTRY;
3016
3017         oap = oap_from_cookie(cookie);
3018         if (IS_ERR(oap))
3019                 RETURN(PTR_ERR(oap));
3020
3021         if (loi == NULL)
3022                 loi = lsm->lsm_oinfo[0];
3023
3024         if (oap->oap_cmd & OBD_BRW_WRITE) {
3025                 lop = &loi->loi_write_lop;
3026         } else {
3027                 lop = &loi->loi_read_lop;
3028         }
3029
3030         client_obd_list_lock(&cli->cl_loi_list_lock);
3031
3032         if (!list_empty(&oap->oap_rpc_item))
3033                 GOTO(out, rc = -EBUSY);
3034
3035         osc_exit_cache(cli, oap, 0);
3036         osc_wake_cache_waiters(cli);
3037
3038         if (!list_empty(&oap->oap_urgent_item)) {
3039                 list_del_init(&oap->oap_urgent_item);
3040                 spin_lock(&oap->oap_lock);
3041                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3042                 spin_unlock(&oap->oap_lock);
3043         }
3044         if (!list_empty(&oap->oap_pending_item)) {
3045                 list_del_init(&oap->oap_pending_item);
3046                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3047         }
3048         loi_list_maint(cli, loi);
3049         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3050 out:
3051         client_obd_list_unlock(&cli->cl_loi_list_lock);
3052         RETURN(rc);
3053 }
3054
3055 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3056                                          struct ldlm_enqueue_info *einfo,
3057                                          int flags)
3058 {
3059         void *data = einfo->ei_cbdata;
3060
3061         LASSERT(lock != NULL);
3062         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3063         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3064         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3065         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3066
3067         lock_res_and_lock(lock);
3068         spin_lock(&osc_ast_guard);
3069         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3070         lock->l_ast_data = data;
3071         spin_unlock(&osc_ast_guard);
3072         unlock_res_and_lock(lock);
3073 }
3074
3075 static void osc_set_data_with_check(struct lustre_handle *lockh,
3076                                     struct ldlm_enqueue_info *einfo,
3077                                     int flags)
3078 {
3079         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3080
3081         if (lock != NULL) {
3082                 osc_set_lock_data_with_check(lock, einfo, flags);
3083                 LDLM_LOCK_PUT(lock);
3084         } else
3085                 CERROR("lockh %p, data %p - client evicted?\n",
3086                        lockh, einfo->ei_cbdata);
3087 }
3088
3089 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3090                              ldlm_iterator_t replace, void *data)
3091 {
3092         struct ldlm_res_id res_id;
3093         struct obd_device *obd = class_exp2obd(exp);
3094
3095         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3096         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3097         return 0;
3098 }
3099
3100 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3101                             obd_enqueue_update_f upcall, void *cookie,
3102                             int *flags, int rc)
3103 {
3104         int intent = *flags & LDLM_FL_HAS_INTENT;
3105         ENTRY;
3106
3107         if (intent) {
3108                 /* The request was created before ldlm_cli_enqueue call. */
3109                 if (rc == ELDLM_LOCK_ABORTED) {
3110                         struct ldlm_reply *rep;
3111                         rep = req_capsule_server_get(&req->rq_pill,
3112                                                      &RMF_DLM_REP);
3113
3114                         LASSERT(rep != NULL);
3115                         if (rep->lock_policy_res1)
3116                                 rc = rep->lock_policy_res1;
3117                 }
3118         }
3119
3120         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3121                 *flags |= LDLM_FL_LVB_READY;
3122                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3123                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3124         }
3125
3126         /* Call the update callback. */
3127         rc = (*upcall)(cookie, rc);
3128         RETURN(rc);
3129 }
3130
3131 static int osc_enqueue_interpret(const struct lu_env *env,
3132                                  struct ptlrpc_request *req,
3133                                  struct osc_enqueue_args *aa, int rc)
3134 {
3135         struct ldlm_lock *lock;
3136         struct lustre_handle handle;
3137         __u32 mode;
3138
3139         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3140          * might be freed anytime after lock upcall has been called. */
3141         lustre_handle_copy(&handle, aa->oa_lockh);
3142         mode = aa->oa_ei->ei_mode;
3143
3144         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3145          * be valid. */
3146         lock = ldlm_handle2lock(&handle);
3147
3148         /* Take an additional reference so that a blocking AST that
3149          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3150          * to arrive after an upcall has been executed by
3151          * osc_enqueue_fini(). */
3152         ldlm_lock_addref(&handle, mode);
3153
3154         /* Complete obtaining the lock procedure. */
3155         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3156                                    mode, aa->oa_flags, aa->oa_lvb,
3157                                    sizeof(*aa->oa_lvb), &handle, rc);
3158         /* Complete osc stuff. */
3159         rc = osc_enqueue_fini(req, aa->oa_lvb,
3160                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3161
3162         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3163
3164         /* Release the lock for async request. */
3165         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3166                 /*
3167                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3168                  * not already released by
3169                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3170                  */
3171                 ldlm_lock_decref(&handle, mode);
3172
3173         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3174                  aa->oa_lockh, req, aa);
3175         ldlm_lock_decref(&handle, mode);
3176         LDLM_LOCK_PUT(lock);
3177         return rc;
3178 }
3179
3180 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3181                         struct lov_oinfo *loi, int flags,
3182                         struct ost_lvb *lvb, __u32 mode, int rc)
3183 {
3184         if (rc == ELDLM_OK) {
3185                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3186                 __u64 tmp;
3187
3188                 LASSERT(lock != NULL);
3189                 loi->loi_lvb = *lvb;
3190                 tmp = loi->loi_lvb.lvb_size;
3191                 /* Extend KMS up to the end of this lock and no further
3192                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3193                 if (tmp > lock->l_policy_data.l_extent.end)
3194                         tmp = lock->l_policy_data.l_extent.end + 1;
3195                 if (tmp >= loi->loi_kms) {
3196                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3197                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3198                         loi_kms_set(loi, tmp);
3199                 } else {
3200                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3201                                    LPU64"; leaving kms="LPU64", end="LPU64,
3202                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3203                                    lock->l_policy_data.l_extent.end);
3204                 }
3205                 ldlm_lock_allow_match(lock);
3206                 LDLM_LOCK_PUT(lock);
3207         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3208                 loi->loi_lvb = *lvb;
3209                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3210                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3211                 rc = ELDLM_OK;
3212         }
3213 }
3214 EXPORT_SYMBOL(osc_update_enqueue);
3215
3216 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3217
3218 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3219  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3220  * other synchronous requests, however keeping some locks and trying to obtain
3221  * others may take a considerable amount of time in a case of ost failure; and
3222  * when other sync requests do not get released lock from a client, the client
3223  * is excluded from the cluster -- such scenarious make the life difficult, so
3224  * release locks just after they are obtained. */
3225 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3226                      int *flags, ldlm_policy_data_t *policy,
3227                      struct ost_lvb *lvb, int kms_valid,
3228                      obd_enqueue_update_f upcall, void *cookie,
3229                      struct ldlm_enqueue_info *einfo,
3230                      struct lustre_handle *lockh,
3231                      struct ptlrpc_request_set *rqset, int async)
3232 {
3233         struct obd_device *obd = exp->exp_obd;
3234         struct ptlrpc_request *req = NULL;
3235         int intent = *flags & LDLM_FL_HAS_INTENT;
3236         ldlm_mode_t mode;
3237         int rc;
3238         ENTRY;
3239
3240         /* Filesystem lock extents are extended to page boundaries so that
3241          * dealing with the page cache is a little smoother.  */
3242         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3243         policy->l_extent.end |= ~CFS_PAGE_MASK;
3244
3245         /*
3246          * kms is not valid when either object is completely fresh (so that no
3247          * locks are cached), or object was evicted. In the latter case cached
3248          * lock cannot be used, because it would prime inode state with
3249          * potentially stale LVB.
3250          */
3251         if (!kms_valid)
3252                 goto no_match;
3253
3254         /* Next, search for already existing extent locks that will cover us */
3255         /* If we're trying to read, we also search for an existing PW lock.  The
3256          * VFS and page cache already protect us locally, so lots of readers/
3257          * writers can share a single PW lock.
3258          *
3259          * There are problems with conversion deadlocks, so instead of
3260          * converting a read lock to a write lock, we'll just enqueue a new
3261          * one.
3262          *
3263          * At some point we should cancel the read lock instead of making them
3264          * send us a blocking callback, but there are problems with canceling
3265          * locks out from other users right now, too. */
3266         mode = einfo->ei_mode;
3267         if (einfo->ei_mode == LCK_PR)
3268                 mode |= LCK_PW;
3269         mode = ldlm_lock_match(obd->obd_namespace,
3270                                *flags | LDLM_FL_LVB_READY, res_id,
3271                                einfo->ei_type, policy, mode, lockh, 0);
3272         if (mode) {
3273                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3274
3275                 if (matched->l_ast_data == NULL ||
3276                     matched->l_ast_data == einfo->ei_cbdata) {
3277                         /* addref the lock only if not async requests and PW
3278                          * lock is matched whereas we asked for PR. */
3279                         if (!rqset && einfo->ei_mode != mode)
3280                                 ldlm_lock_addref(lockh, LCK_PR);
3281                         osc_set_lock_data_with_check(matched, einfo, *flags);
3282                         if (intent) {
3283                                 /* I would like to be able to ASSERT here that
3284                                  * rss <= kms, but I can't, for reasons which
3285                                  * are explained in lov_enqueue() */
3286                         }
3287
3288                         /* We already have a lock, and it's referenced */
3289                         (*upcall)(cookie, ELDLM_OK);
3290
3291                         /* For async requests, decref the lock. */
3292                         if (einfo->ei_mode != mode)
3293                                 ldlm_lock_decref(lockh, LCK_PW);
3294                         else if (rqset)
3295                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3296                         LDLM_LOCK_PUT(matched);
3297                         RETURN(ELDLM_OK);
3298                 } else
3299                         ldlm_lock_decref(lockh, mode);
3300                 LDLM_LOCK_PUT(matched);
3301         }
3302
3303  no_match:
3304         if (intent) {
3305                 CFS_LIST_HEAD(cancels);
3306                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3307                                            &RQF_LDLM_ENQUEUE_LVB);
3308                 if (req == NULL)
3309                         RETURN(-ENOMEM);
3310
3311                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3312                 if (rc)
3313                         RETURN(rc);
3314
3315                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3316                                      sizeof *lvb);
3317                 ptlrpc_request_set_replen(req);
3318         }
3319
3320         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3321         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3322
3323         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3324                               sizeof(*lvb), lockh, async);
3325         if (rqset) {
3326                 if (!rc) {
3327                         struct osc_enqueue_args *aa;
3328                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3329                         aa = ptlrpc_req_async_args(req);
3330                         aa->oa_ei = einfo;
3331                         aa->oa_exp = exp;
3332                         aa->oa_flags  = flags;
3333                         aa->oa_upcall = upcall;
3334                         aa->oa_cookie = cookie;
3335                         aa->oa_lvb    = lvb;
3336                         aa->oa_lockh  = lockh;
3337
3338                         req->rq_interpret_reply =
3339                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3340                         if (rqset == PTLRPCD_SET)
3341                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3342                         else
3343                                 ptlrpc_set_add_req(rqset, req);
3344                 } else if (intent) {
3345                         ptlrpc_req_finished(req);
3346                 }
3347                 RETURN(rc);
3348         }
3349
3350         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3351         if (intent)
3352                 ptlrpc_req_finished(req);
3353
3354         RETURN(rc);
3355 }
3356
3357 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3358                        struct ldlm_enqueue_info *einfo,
3359                        struct ptlrpc_request_set *rqset)
3360 {
3361         struct ldlm_res_id res_id;
3362         int rc;
3363         ENTRY;
3364
3365         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3366                            oinfo->oi_md->lsm_object_gr, &res_id);
3367
3368         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3369                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3370                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3371                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3372                               rqset, rqset != NULL);
3373         RETURN(rc);
3374 }
3375
3376 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3377                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3378                    int *flags, void *data, struct lustre_handle *lockh,
3379                    int unref)
3380 {
3381         struct obd_device *obd = exp->exp_obd;
3382         int lflags = *flags;
3383         ldlm_mode_t rc;
3384         ENTRY;
3385
3386         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3387                 RETURN(-EIO);
3388
3389         /* Filesystem lock extents are extended to page boundaries so that
3390          * dealing with the page cache is a little smoother */
3391         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3392         policy->l_extent.end |= ~CFS_PAGE_MASK;
3393
3394         /* Next, search for already existing extent locks that will cover us */
3395         /* If we're trying to read, we also search for an existing PW lock.  The
3396          * VFS and page cache already protect us locally, so lots of readers/
3397          * writers can share a single PW lock. */
3398         rc = mode;
3399         if (mode == LCK_PR)
3400                 rc |= LCK_PW;
3401         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3402                              res_id, type, policy, rc, lockh, unref);
3403         if (rc) {
3404                 if (data != NULL)
3405                         osc_set_data_with_check(lockh, data, lflags);
3406                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3407                         ldlm_lock_addref(lockh, LCK_PR);
3408                         ldlm_lock_decref(lockh, LCK_PW);
3409                 }
3410                 RETURN(rc);
3411         }
3412         RETURN(rc);
3413 }
3414
3415 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3416 {
3417         ENTRY;
3418
3419         if (unlikely(mode == LCK_GROUP))
3420                 ldlm_lock_decref_and_cancel(lockh, mode);
3421         else
3422                 ldlm_lock_decref(lockh, mode);
3423
3424         RETURN(0);
3425 }
3426
3427 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3428                       __u32 mode, struct lustre_handle *lockh)
3429 {
3430         ENTRY;
3431         RETURN(osc_cancel_base(lockh, mode));
3432 }
3433
3434 static int osc_cancel_unused(struct obd_export *exp,
3435                              struct lov_stripe_md *lsm, int flags,
3436                              void *opaque)
3437 {
3438         struct obd_device *obd = class_exp2obd(exp);
3439         struct ldlm_res_id res_id, *resp = NULL;
3440
3441         if (lsm != NULL) {
3442                 resp = osc_build_res_name(lsm->lsm_object_id,
3443                                           lsm->lsm_object_gr, &res_id);
3444         }
3445
3446         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3447 }
3448
3449 static int osc_statfs_interpret(const struct lu_env *env,
3450                                 struct ptlrpc_request *req,
3451                                 struct osc_async_args *aa, int rc)
3452 {
3453         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3454         struct obd_statfs *msfs;
3455         __u64 used;
3456         ENTRY;
3457
3458         if (rc == -EBADR)
3459                 /* The request has in fact never been sent
3460                  * due to issues at a higher level (LOV).
3461                  * Exit immediately since the caller is
3462                  * aware of the problem and takes care
3463                  * of the clean up */
3464                  RETURN(rc);
3465
3466         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3467             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3468                 GOTO(out, rc = 0);
3469
3470         if (rc != 0)
3471                 GOTO(out, rc);
3472
3473         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3474         if (msfs == NULL) {
3475                 GOTO(out, rc = -EPROTO);
3476         }
3477
3478         /* Reinitialize the RDONLY and DEGRADED flags at the client
3479          * on each statfs, so they don't stay set permanently. */
3480         spin_lock(&cli->cl_oscc.oscc_lock);
3481
3482         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3483                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3484         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3485                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3486
3487         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3488                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3489         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3490                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3491
3492         /* Add a bit of hysteresis so this flag isn't continually flapping,
3493          * and ensure that new files don't get extremely fragmented due to
3494          * only a small amount of available space in the filesystem.
3495          * We want to set the NOSPC flag when there is less than ~0.1% free
3496          * and clear it when there is at least ~0.2% free space, so:
3497          *                   avail < ~0.1% max          max = avail + used
3498          *            1025 * avail < avail + used       used = blocks - free
3499          *            1024 * avail < used
3500          *            1024 * avail < blocks - free                      
3501          *                   avail < ((blocks - free) >> 10)    
3502          *
3503          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3504          * lose that amount of space so in those cases we report no space left
3505          * if their is less than 1 GB left.                             */
3506         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3507         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3508                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3509                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3510         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3511                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3512                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3513
3514         spin_unlock(&cli->cl_oscc.oscc_lock);
3515
3516         *aa->aa_oi->oi_osfs = *msfs;
3517 out:
3518         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3519         RETURN(rc);
3520 }
3521
3522 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3523                             __u64 max_age, struct ptlrpc_request_set *rqset)
3524 {
3525         struct ptlrpc_request *req;
3526         struct osc_async_args *aa;
3527         int                    rc;
3528         ENTRY;
3529
3530         /* We could possibly pass max_age in the request (as an absolute
3531          * timestamp or a "seconds.usec ago") so the target can avoid doing
3532          * extra calls into the filesystem if that isn't necessary (e.g.
3533          * during mount that would help a bit).  Having relative timestamps
3534          * is not so great if request processing is slow, while absolute
3535          * timestamps are not ideal because they need time synchronization. */
3536         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3537         if (req == NULL)
3538                 RETURN(-ENOMEM);
3539
3540         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3541         if (rc) {
3542                 ptlrpc_request_free(req);
3543                 RETURN(rc);
3544         }
3545         ptlrpc_request_set_replen(req);
3546         req->rq_request_portal = OST_CREATE_PORTAL;
3547         ptlrpc_at_set_req_timeout(req);
3548
3549         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3550                 /* procfs requests not want stat in wait for avoid deadlock */
3551                 req->rq_no_resend = 1;
3552                 req->rq_no_delay = 1;
3553         }
3554
3555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3557         aa = ptlrpc_req_async_args(req);
3558         aa->aa_oi = oinfo;
3559
3560         ptlrpc_set_add_req(rqset, req);
3561         RETURN(0);
3562 }
3563
3564 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3565                       __u64 max_age, __u32 flags)
3566 {
3567         struct obd_statfs     *msfs;
3568         struct ptlrpc_request *req;
3569         struct obd_import     *imp = NULL;
3570         int rc;
3571         ENTRY;
3572
3573         /*Since the request might also come from lprocfs, so we need
3574          *sync this with client_disconnect_export Bug15684*/
3575         down_read(&obd->u.cli.cl_sem);
3576         if (obd->u.cli.cl_import)
3577                 imp = class_import_get(obd->u.cli.cl_import);
3578         up_read(&obd->u.cli.cl_sem);
3579         if (!imp)
3580                 RETURN(-ENODEV);
3581
3582         /* We could possibly pass max_age in the request (as an absolute
3583          * timestamp or a "seconds.usec ago") so the target can avoid doing
3584          * extra calls into the filesystem if that isn't necessary (e.g.
3585          * during mount that would help a bit).  Having relative timestamps
3586          * is not so great if request processing is slow, while absolute
3587          * timestamps are not ideal because they need time synchronization. */
3588         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3589
3590         class_import_put(imp);
3591
3592         if (req == NULL)
3593                 RETURN(-ENOMEM);
3594
3595         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3596         if (rc) {
3597                 ptlrpc_request_free(req);
3598                 RETURN(rc);
3599         }
3600         ptlrpc_request_set_replen(req);
3601         req->rq_request_portal = OST_CREATE_PORTAL;
3602         ptlrpc_at_set_req_timeout(req);
3603
3604         if (flags & OBD_STATFS_NODELAY) {
3605                 /* procfs requests not want stat in wait for avoid deadlock */
3606                 req->rq_no_resend = 1;
3607                 req->rq_no_delay = 1;
3608         }
3609
3610         rc = ptlrpc_queue_wait(req);
3611         if (rc)
3612                 GOTO(out, rc);
3613
3614         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3615         if (msfs == NULL) {
3616                 GOTO(out, rc = -EPROTO);
3617         }
3618
3619         *osfs = *msfs;
3620
3621         EXIT;
3622  out:
3623         ptlrpc_req_finished(req);
3624         return rc;
3625 }
3626
3627 /* Retrieve object striping information.
3628  *
3629  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3630  * the maximum number of OST indices which will fit in the user buffer.
3631  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3632  */
3633 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3634 {
3635         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3636         struct lov_user_md_v3 lum, *lumk;
3637         struct lov_user_ost_data_v1 *lmm_objects;
3638         int rc = 0, lum_size;
3639         ENTRY;
3640
3641         if (!lsm)
3642                 RETURN(-ENODATA);
3643
3644         /* we only need the header part from user space to get lmm_magic and
3645          * lmm_stripe_count, (the header part is common to v1 and v3) */
3646         lum_size = sizeof(struct lov_user_md_v1);
3647         if (copy_from_user(&lum, lump, lum_size))
3648                 RETURN(-EFAULT);
3649
3650         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3651             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3652                 RETURN(-EINVAL);
3653
3654         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3655         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3656         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3657         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3658
3659         /* we can use lov_mds_md_size() to compute lum_size
3660          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3661         if (lum.lmm_stripe_count > 0) {
3662                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3663                 OBD_ALLOC(lumk, lum_size);
3664                 if (!lumk)
3665                         RETURN(-ENOMEM);
3666
3667                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3668                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3669                 else
3670                         lmm_objects = &(lumk->lmm_objects[0]);
3671                 lmm_objects->l_object_id = lsm->lsm_object_id;
3672         } else {
3673                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3674                 lumk = &lum;
3675         }
3676
3677         lumk->lmm_object_id = lsm->lsm_object_id;
3678         lumk->lmm_object_gr = lsm->lsm_object_gr;
3679         lumk->lmm_stripe_count = 1;
3680
3681         if (copy_to_user(lump, lumk, lum_size))
3682                 rc = -EFAULT;
3683
3684         if (lumk != &lum)
3685                 OBD_FREE(lumk, lum_size);
3686
3687         RETURN(rc);
3688 }
3689
3690
3691 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3692                          void *karg, void *uarg)
3693 {
3694         struct obd_device *obd = exp->exp_obd;
3695         struct obd_ioctl_data *data = karg;
3696         int err = 0;
3697         ENTRY;
3698
3699         if (!try_module_get(THIS_MODULE)) {
3700                 CERROR("Can't get module. Is it alive?");
3701                 return -EINVAL;
3702         }
3703         switch (cmd) {
3704         case OBD_IOC_LOV_GET_CONFIG: {
3705                 char *buf;
3706                 struct lov_desc *desc;
3707                 struct obd_uuid uuid;
3708
3709                 buf = NULL;
3710                 len = 0;
3711                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3712                         GOTO(out, err = -EINVAL);
3713
3714                 data = (struct obd_ioctl_data *)buf;
3715
3716                 if (sizeof(*desc) > data->ioc_inllen1) {
3717                         obd_ioctl_freedata(buf, len);
3718                         GOTO(out, err = -EINVAL);
3719                 }
3720
3721                 if (data->ioc_inllen2 < sizeof(uuid)) {
3722                         obd_ioctl_freedata(buf, len);
3723                         GOTO(out, err = -EINVAL);
3724                 }
3725
3726                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3727                 desc->ld_tgt_count = 1;
3728                 desc->ld_active_tgt_count = 1;
3729                 desc->ld_default_stripe_count = 1;
3730                 desc->ld_default_stripe_size = 0;
3731                 desc->ld_default_stripe_offset = 0;
3732                 desc->ld_pattern = 0;
3733                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3734
3735                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3736
3737                 err = copy_to_user((void *)uarg, buf, len);
3738                 if (err)
3739                         err = -EFAULT;
3740                 obd_ioctl_freedata(buf, len);
3741                 GOTO(out, err);
3742         }
3743         case LL_IOC_LOV_SETSTRIPE:
3744                 err = obd_alloc_memmd(exp, karg);
3745                 if (err > 0)
3746                         err = 0;
3747                 GOTO(out, err);
3748         case LL_IOC_LOV_GETSTRIPE:
3749                 err = osc_getstripe(karg, uarg);
3750                 GOTO(out, err);
3751         case OBD_IOC_CLIENT_RECOVER:
3752                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3753                                             data->ioc_inlbuf1);
3754                 if (err > 0)
3755                         err = 0;
3756                 GOTO(out, err);
3757         case IOC_OSC_SET_ACTIVE:
3758                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3759                                                data->ioc_offset);
3760                 GOTO(out, err);
3761         case OBD_IOC_POLL_QUOTACHECK:
3762                 err = lquota_poll_check(quota_interface, exp,
3763                                         (struct if_quotacheck *)karg);
3764                 GOTO(out, err);
3765         case OBD_IOC_PING_TARGET:
3766                 err = ptlrpc_obd_ping(obd);
3767                 GOTO(out, err);
3768         default:
3769                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3770                        cmd, cfs_curproc_comm());
3771                 GOTO(out, err = -ENOTTY);
3772         }
3773 out:
3774         module_put(THIS_MODULE);
3775         return err;
3776 }
3777
3778 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3779                         void *key, __u32 *vallen, void *val,
3780                         struct lov_stripe_md *lsm)
3781 {
3782         ENTRY;
3783         if (!vallen || !val)
3784                 RETURN(-EFAULT);
3785
3786         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3787                 __u32 *stripe = val;
3788                 *vallen = sizeof(*stripe);
3789                 *stripe = 0;
3790                 RETURN(0);
3791         } else if (KEY_IS(KEY_LAST_ID)) {
3792                 struct ptlrpc_request *req;
3793                 obd_id                *reply;
3794                 char                  *tmp;
3795                 int                    rc;
3796
3797                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3798                                            &RQF_OST_GET_INFO_LAST_ID);
3799                 if (req == NULL)
3800                         RETURN(-ENOMEM);
3801
3802                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3803                                      RCL_CLIENT, keylen);
3804                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3805                 if (rc) {
3806                         ptlrpc_request_free(req);
3807                         RETURN(rc);
3808                 }
3809
3810                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3811                 memcpy(tmp, key, keylen);
3812
3813                 req->rq_no_delay = req->rq_no_resend = 1;
3814                 ptlrpc_request_set_replen(req);
3815                 rc = ptlrpc_queue_wait(req);
3816                 if (rc)
3817                         GOTO(out, rc);
3818
3819                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3820                 if (reply == NULL)
3821                         GOTO(out, rc = -EPROTO);
3822
3823                 *((obd_id *)val) = *reply;
3824         out:
3825                 ptlrpc_req_finished(req);
3826                 RETURN(rc);
3827         } else if (KEY_IS(KEY_FIEMAP)) {
3828                 struct ptlrpc_request *req;
3829                 struct ll_user_fiemap *reply;
3830                 char *tmp;
3831                 int rc;
3832
3833                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3834                                            &RQF_OST_GET_INFO_FIEMAP);
3835                 if (req == NULL)
3836                         RETURN(-ENOMEM);
3837
3838                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3839                                      RCL_CLIENT, keylen);
3840                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3841                                      RCL_CLIENT, *vallen);
3842                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3843                                      RCL_SERVER, *vallen);
3844
3845                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3846                 if (rc) {
3847                         ptlrpc_request_free(req);
3848                         RETURN(rc);
3849                 }
3850
3851                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3852                 memcpy(tmp, key, keylen);
3853                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3854                 memcpy(tmp, val, *vallen);
3855
3856                 ptlrpc_request_set_replen(req);
3857                 rc = ptlrpc_queue_wait(req);
3858                 if (rc)
3859                         GOTO(out1, rc);
3860
3861                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3862                 if (reply == NULL)
3863                         GOTO(out1, rc = -EPROTO);
3864
3865                 memcpy(val, reply, *vallen);
3866         out1:
3867                 ptlrpc_req_finished(req);
3868
3869                 RETURN(rc);
3870         }
3871
3872         RETURN(-EINVAL);
3873 }
3874
3875 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3876 {
3877         struct llog_ctxt *ctxt;
3878         int rc = 0;
3879         ENTRY;
3880
3881         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3882         if (ctxt) {
3883                 rc = llog_initiator_connect(ctxt);
3884                 llog_ctxt_put(ctxt);
3885         } else {
3886                 /* XXX return an error? skip setting below flags? */
3887         }
3888
3889         spin_lock(&imp->imp_lock);
3890         imp->imp_server_timeout = 1;
3891         imp->imp_pingable = 1;
3892         spin_unlock(&imp->imp_lock);
3893         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3894
3895         RETURN(rc);
3896 }
3897
3898 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3899                                           struct ptlrpc_request *req,
3900                                           void *aa, int rc)
3901 {
3902         ENTRY;
3903         if (rc != 0)
3904                 RETURN(rc);
3905
3906         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3907 }
3908
3909 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3910                               void *key, obd_count vallen, void *val,
3911                               struct ptlrpc_request_set *set)
3912 {
3913         struct ptlrpc_request *req;
3914         struct obd_device     *obd = exp->exp_obd;
3915         struct obd_import     *imp = class_exp2cliimp(exp);
3916         char                  *tmp;
3917         int                    rc;
3918         ENTRY;
3919
3920         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3921
3922         if (KEY_IS(KEY_NEXT_ID)) {
3923                 obd_id new_val;
3924                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3925
3926                 if (vallen != sizeof(obd_id))
3927                         RETURN(-ERANGE);
3928                 if (val == NULL)
3929                         RETURN(-EINVAL);
3930
3931                 if (vallen != sizeof(obd_id))
3932                         RETURN(-EINVAL);
3933
3934                 /* avoid race between allocate new object and set next id
3935                  * from ll_sync thread */
3936                 spin_lock(&oscc->oscc_lock);
3937                 new_val = *((obd_id*)val) + 1;
3938                 if (new_val > oscc->oscc_next_id)
3939                         oscc->oscc_next_id = new_val;
3940                 spin_unlock(&oscc->oscc_lock);                        
3941                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3942                        exp->exp_obd->obd_name,
3943                        obd->u.cli.cl_oscc.oscc_next_id);
3944
3945                 RETURN(0);
3946         }
3947
3948         if (KEY_IS(KEY_INIT_RECOV)) {
3949                 if (vallen != sizeof(int))
3950                         RETURN(-EINVAL);
3951                 spin_lock(&imp->imp_lock);
3952                 imp->imp_initial_recov = *(int *)val;
3953                 spin_unlock(&imp->imp_lock);
3954                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3955                        exp->exp_obd->obd_name,
3956                        imp->imp_initial_recov);
3957                 RETURN(0);
3958         }
3959
3960         if (KEY_IS(KEY_CHECKSUM)) {
3961                 if (vallen != sizeof(int))
3962                         RETURN(-EINVAL);
3963                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3964                 RETURN(0);
3965         }
3966
3967         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3968                 sptlrpc_conf_client_adapt(obd);
3969                 RETURN(0);
3970         }
3971
3972         if (KEY_IS(KEY_FLUSH_CTX)) {
3973                 sptlrpc_import_flush_my_ctx(imp);
3974                 RETURN(0);
3975         }
3976
3977         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3978                 RETURN(-EINVAL);
3979
3980         /* We pass all other commands directly to OST. Since nobody calls osc
3981            methods directly and everybody is supposed to go through LOV, we
3982            assume lov checked invalid values for us.
3983            The only recognised values so far are evict_by_nid and mds_conn.
3984            Even if something bad goes through, we'd get a -EINVAL from OST
3985            anyway. */
3986
3987         if (KEY_IS(KEY_GRANT_SHRINK))
3988                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3989         else
3990                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3991
3992         if (req == NULL)
3993                 RETURN(-ENOMEM);
3994
3995         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3996                              RCL_CLIENT, keylen);
3997         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3998                              RCL_CLIENT, vallen);
3999         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4000         if (rc) {
4001                 ptlrpc_request_free(req);
4002                 RETURN(rc);
4003         }
4004
4005         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4006         memcpy(tmp, key, keylen);
4007         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4008         memcpy(tmp, val, vallen);
4009
4010         if (KEY_IS(KEY_MDS_CONN)) {
4011                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4012
4013                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4014                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4015                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4016                 req->rq_no_delay = req->rq_no_resend = 1;
4017                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4018         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4019                 struct osc_grant_args *aa;
4020                 struct obdo *oa;
4021
4022                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4023                 aa = ptlrpc_req_async_args(req);
4024                 OBD_ALLOC_PTR(oa);
4025                 if (!oa) {
4026                         ptlrpc_req_finished(req);
4027                         RETURN(-ENOMEM);
4028                 }
4029                 *oa = ((struct ost_body *)val)->oa;
4030                 aa->aa_oa = oa;
4031                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4032         }
4033
4034         ptlrpc_request_set_replen(req);
4035         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4036                 LASSERT(set != NULL);
4037                 ptlrpc_set_add_req(set, req);
4038                 ptlrpc_check_set(NULL, set);
4039         } else
4040                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4041
4042         RETURN(0);
4043 }
4044
4045
4046 static struct llog_operations osc_size_repl_logops = {
4047         lop_cancel: llog_obd_repl_cancel
4048 };
4049
4050 static struct llog_operations osc_mds_ost_orig_logops;
4051
4052 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4053                            struct obd_device *tgt, struct llog_catid *catid)
4054 {
4055         int rc;
4056         ENTRY;
4057
4058         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4059                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4060         if (rc) {
4061                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4062                 GOTO(out, rc);
4063         }
4064
4065         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4066                         NULL, &osc_size_repl_logops);
4067         if (rc) {
4068                 struct llog_ctxt *ctxt =
4069                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4070                 if (ctxt)
4071                         llog_cleanup(ctxt);
4072                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4073         }
4074         GOTO(out, rc);
4075 out:
4076         if (rc) {
4077                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4078                        obd->obd_name, tgt->obd_name, catid, rc);
4079                 CERROR("logid "LPX64":0x%x\n",
4080                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4081         }
4082         return rc;
4083 }
4084
4085 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4086                          struct obd_device *disk_obd, int *index)
4087 {
4088         struct llog_catid catid;
4089         static char name[32] = CATLIST;
4090         int rc;
4091         ENTRY;
4092
4093         LASSERT(olg == &obd->obd_olg);
4094
4095         mutex_down(&olg->olg_cat_processing);
4096         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4097         if (rc) {
4098                 CERROR("rc: %d\n", rc);
4099                 GOTO(out, rc);
4100         }
4101
4102         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4103                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4104                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4105
4106         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4107         if (rc) {
4108                 CERROR("rc: %d\n", rc);
4109                 GOTO(out, rc);
4110         }
4111
4112         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4113         if (rc) {
4114                 CERROR("rc: %d\n", rc);
4115                 GOTO(out, rc);
4116         }
4117
4118  out:
4119         mutex_up(&olg->olg_cat_processing);
4120
4121         return rc;
4122 }
4123
4124 static int osc_llog_finish(struct obd_device *obd, int count)
4125 {
4126         struct llog_ctxt *ctxt;
4127         int rc = 0, rc2 = 0;
4128         ENTRY;
4129
4130         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4131         if (ctxt)
4132                 rc = llog_cleanup(ctxt);
4133
4134         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4135         if (ctxt)
4136                 rc2 = llog_cleanup(ctxt);
4137         if (!rc)
4138                 rc = rc2;
4139
4140         RETURN(rc);
4141 }
4142
4143 static int osc_reconnect(const struct lu_env *env,
4144                          struct obd_export *exp, struct obd_device *obd,
4145                          struct obd_uuid *cluuid,
4146                          struct obd_connect_data *data,
4147                          void *localdata)
4148 {
4149         struct client_obd *cli = &obd->u.cli;
4150
4151         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4152                 long lost_grant;
4153
4154                 client_obd_list_lock(&cli->cl_loi_list_lock);
4155                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4156                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4157                 lost_grant = cli->cl_lost_grant;
4158                 cli->cl_lost_grant = 0;
4159                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4160
4161                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4162                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4163                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4164                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4165                        " ocd_grant: %d\n", data->ocd_connect_flags,
4166                        data->ocd_version, data->ocd_grant);
4167         }
4168
4169         RETURN(0);
4170 }
4171
4172 static int osc_disconnect(struct obd_export *exp)
4173 {
4174         struct obd_device *obd = class_exp2obd(exp);
4175         struct llog_ctxt  *ctxt;
4176         int rc;
4177
4178         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4179         if (ctxt) {
4180                 if (obd->u.cli.cl_conn_count == 1) {
4181                         /* Flush any remaining cancel messages out to the
4182                          * target */
4183                         llog_sync(ctxt, exp);
4184                 }
4185                 llog_ctxt_put(ctxt);
4186         } else {
4187                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4188                        obd);
4189         }
4190
4191         rc = client_disconnect_export(exp);
4192         /**
4193          * Initially we put del_shrink_grant before disconnect_export, but it
4194          * causes the following problem if setup (connect) and cleanup
4195          * (disconnect) are tangled together.
4196          *      connect p1                     disconnect p2
4197          *   ptlrpc_connect_import
4198          *     ...............               class_manual_cleanup
4199          *                                     osc_disconnect
4200          *                                     del_shrink_grant
4201          *   ptlrpc_connect_interrupt
4202          *     init_grant_shrink
4203          *   add this client to shrink list
4204          *                                      cleanup_osc
4205          * Bang! pinger trigger the shrink.
4206          * So the osc should be disconnected from the shrink list, after we
4207          * are sure the import has been destroyed. BUG18662
4208          */
4209         if (obd->u.cli.cl_import == NULL)
4210                 osc_del_shrink_grant(&obd->u.cli);
4211         return rc;
4212 }
4213
4214 static int osc_import_event(struct obd_device *obd,
4215                             struct obd_import *imp,
4216                             enum obd_import_event event)
4217 {
4218         struct client_obd *cli;
4219         int rc = 0;
4220
4221         ENTRY;
4222         LASSERT(imp->imp_obd == obd);
4223
4224         switch (event) {
4225         case IMP_EVENT_DISCON: {
4226                 /* Only do this on the MDS OSC's */
4227                 if (imp->imp_server_timeout) {
4228                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4229
4230                         spin_lock(&oscc->oscc_lock);
4231                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4232                         spin_unlock(&oscc->oscc_lock);
4233                 }
4234                 cli = &obd->u.cli;
4235                 client_obd_list_lock(&cli->cl_loi_list_lock);
4236                 cli->cl_avail_grant = 0;
4237                 cli->cl_lost_grant = 0;
4238                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4239                 break;
4240         }
4241         case IMP_EVENT_INACTIVE: {
4242                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4243                 break;
4244         }
4245         case IMP_EVENT_INVALIDATE: {
4246                 struct ldlm_namespace *ns = obd->obd_namespace;
4247                 struct lu_env         *env;
4248                 int                    refcheck;
4249
4250                 env = cl_env_get(&refcheck);
4251                 if (!IS_ERR(env)) {
4252                         /* Reset grants */
4253                         cli = &obd->u.cli;
4254                         client_obd_list_lock(&cli->cl_loi_list_lock);
4255                         /* all pages go to failing rpcs due to the invalid
4256                          * import */
4257                         osc_check_rpcs(env, cli);
4258                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4259
4260                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4261                         cl_env_put(env, &refcheck);
4262                 } else
4263                         rc = PTR_ERR(env);
4264                 break;
4265         }
4266         case IMP_EVENT_ACTIVE: {
4267                 /* Only do this on the MDS OSC's */
4268                 if (imp->imp_server_timeout) {
4269                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4270
4271                         spin_lock(&oscc->oscc_lock);
4272                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4273                         spin_unlock(&oscc->oscc_lock);
4274                 }
4275                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4276                 break;
4277         }
4278         case IMP_EVENT_OCD: {
4279                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4280
4281                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4282                         osc_init_grant(&obd->u.cli, ocd);
4283
4284                 /* See bug 7198 */
4285                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4286                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4287
4288                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4289                 break;
4290         }
4291         default:
4292                 CERROR("Unknown import event %d\n", event);
4293                 LBUG();
4294         }
4295         RETURN(rc);
4296 }
4297
4298 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4299 {
4300         int rc;
4301         ENTRY;
4302
4303         ENTRY;
4304         rc = ptlrpcd_addref();
4305         if (rc)
4306                 RETURN(rc);
4307
4308         rc = client_obd_setup(obd, lcfg);
4309         if (rc) {
4310                 ptlrpcd_decref();
4311         } else {
4312                 struct lprocfs_static_vars lvars = { 0 };
4313                 struct client_obd *cli = &obd->u.cli;
4314
4315                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4316                 lprocfs_osc_init_vars(&lvars);
4317                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4318                         lproc_osc_attach_seqstat(obd);
4319                         sptlrpc_lprocfs_cliobd_attach(obd);
4320                         ptlrpc_lprocfs_register_obd(obd);
4321                 }
4322
4323                 oscc_init(obd);
4324                 /* We need to allocate a few requests more, because
4325                    brw_interpret tries to create new requests before freeing
4326                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4327                    reserved, but I afraid that might be too much wasted RAM
4328                    in fact, so 2 is just my guess and still should work. */
4329                 cli->cl_import->imp_rq_pool =
4330                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4331                                             OST_MAXREQSIZE,
4332                                             ptlrpc_add_rqs_to_pool);
4333
4334                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4335                 sema_init(&cli->cl_grant_sem, 1);
4336         }
4337
4338         RETURN(rc);
4339 }
4340
4341 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4342 {
4343         int rc = 0;
4344         ENTRY;
4345
4346         switch (stage) {
4347         case OBD_CLEANUP_EARLY: {
4348                 struct obd_import *imp;
4349                 imp = obd->u.cli.cl_import;
4350                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4351                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4352                 ptlrpc_deactivate_import(imp);
4353                 spin_lock(&imp->imp_lock);
4354                 imp->imp_pingable = 0;
4355                 spin_unlock(&imp->imp_lock);
4356                 break;
4357         }
4358         case OBD_CLEANUP_EXPORTS: {
4359                 /* If we set up but never connected, the
4360                    client import will not have been cleaned. */
4361                 if (obd->u.cli.cl_import) {
4362                         struct obd_import *imp;
4363                         down_write(&obd->u.cli.cl_sem);
4364                         imp = obd->u.cli.cl_import;
4365                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4366                                obd->obd_name);
4367                         ptlrpc_invalidate_import(imp);
4368                         if (imp->imp_rq_pool) {
4369                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4370                                 imp->imp_rq_pool = NULL;
4371                         }
4372                         class_destroy_import(imp);
4373                         up_write(&obd->u.cli.cl_sem);
4374                         obd->u.cli.cl_import = NULL;
4375                 }
4376                 rc = obd_llog_finish(obd, 0);
4377                 if (rc != 0)
4378                         CERROR("failed to cleanup llogging subsystems\n");
4379                 break;
4380                 }
4381         }
4382         RETURN(rc);
4383 }
4384
4385 int osc_cleanup(struct obd_device *obd)
4386 {
4387         int rc;
4388
4389         ENTRY;
4390         ptlrpc_lprocfs_unregister_obd(obd);
4391         lprocfs_obd_cleanup(obd);
4392
4393         /* free memory of osc quota cache */
4394         lquota_cleanup(quota_interface, obd);
4395
4396         rc = client_obd_cleanup(obd);
4397
4398         ptlrpcd_decref();
4399         RETURN(rc);
4400 }
4401
4402 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4403 {
4404         struct lprocfs_static_vars lvars = { 0 };
4405         int rc = 0;
4406
4407         lprocfs_osc_init_vars(&lvars);
4408
4409         switch (lcfg->lcfg_command) {
4410         default:
4411                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4412                                               lcfg, obd);
4413                 if (rc > 0)
4414                         rc = 0;
4415                 break;
4416         }
4417
4418         return(rc);
4419 }
4420
4421 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4422 {
4423         return osc_process_config_base(obd, buf);
4424 }
4425
4426 struct obd_ops osc_obd_ops = {
4427         .o_owner                = THIS_MODULE,
4428         .o_setup                = osc_setup,
4429         .o_precleanup           = osc_precleanup,
4430         .o_cleanup              = osc_cleanup,
4431         .o_add_conn             = client_import_add_conn,
4432         .o_del_conn             = client_import_del_conn,
4433         .o_connect              = client_connect_import,
4434         .o_reconnect            = osc_reconnect,
4435         .o_disconnect           = osc_disconnect,
4436         .o_statfs               = osc_statfs,
4437         .o_statfs_async         = osc_statfs_async,
4438         .o_packmd               = osc_packmd,
4439         .o_unpackmd             = osc_unpackmd,
4440         .o_precreate            = osc_precreate,
4441         .o_create               = osc_create,
4442         .o_create_async         = osc_create_async,
4443         .o_destroy              = osc_destroy,
4444         .o_getattr              = osc_getattr,
4445         .o_getattr_async        = osc_getattr_async,
4446         .o_setattr              = osc_setattr,
4447         .o_setattr_async        = osc_setattr_async,
4448         .o_brw                  = osc_brw,
4449         .o_punch                = osc_punch,
4450         .o_sync                 = osc_sync,
4451         .o_enqueue              = osc_enqueue,
4452         .o_change_cbdata        = osc_change_cbdata,
4453         .o_cancel               = osc_cancel,
4454         .o_cancel_unused        = osc_cancel_unused,
4455         .o_iocontrol            = osc_iocontrol,
4456         .o_get_info             = osc_get_info,
4457         .o_set_info_async       = osc_set_info_async,
4458         .o_import_event         = osc_import_event,
4459         .o_llog_init            = osc_llog_init,
4460         .o_llog_finish          = osc_llog_finish,
4461         .o_process_config       = osc_process_config,
4462 };
4463
4464 extern struct lu_kmem_descr  osc_caches[];
4465 extern spinlock_t            osc_ast_guard;
4466 extern struct lock_class_key osc_ast_guard_class;
4467
4468 int __init osc_init(void)
4469 {
4470         struct lprocfs_static_vars lvars = { 0 };
4471         int rc;
4472         ENTRY;
4473
4474         /* print an address of _any_ initialized kernel symbol from this
4475          * module, to allow debugging with gdb that doesn't support data
4476          * symbols from modules.*/
4477         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4478
4479         rc = lu_kmem_init(osc_caches);
4480
4481         lprocfs_osc_init_vars(&lvars);
4482
4483         request_module("lquota");
4484         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4485         lquota_init(quota_interface);
4486         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4487
4488         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4489                                  LUSTRE_OSC_NAME, &osc_device_type);
4490         if (rc) {
4491                 if (quota_interface)
4492                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4493                 lu_kmem_fini(osc_caches);
4494                 RETURN(rc);
4495         }
4496
4497         spin_lock_init(&osc_ast_guard);
4498         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4499
4500         osc_mds_ost_orig_logops = llog_lvfs_ops;
4501         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4502         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4503         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4504         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4505
4506         RETURN(rc);
4507 }
4508
4509 #ifdef __KERNEL__
4510 static void /*__exit*/ osc_exit(void)
4511 {
4512         lu_device_type_fini(&osc_device_type);
4513
4514         lquota_exit(quota_interface);
4515         if (quota_interface)
4516                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4517
4518         class_unregister_type(LUSTRE_OSC_NAME);
4519         lu_kmem_fini(osc_caches);
4520 }
4521
4522 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4523 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4524 MODULE_LICENSE("GPL");
4525
4526 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4527 #endif