Whamcloud - gitweb
e5f368cefa7a9be75df05ecd012f80b53c6a8c21
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #include <libcfs/libcfs.h>
38
39 #ifndef __KERNEL__
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <obd_cksum.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
61
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
181         LASSERT(body);
182
183         body->oa = *oinfo->oi_oa;
184         osc_pack_capa(req, body, oinfo->oi_capa);
185 }
186
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188                                      const struct req_msg_field *field,
189                                      struct obd_capa *oc)
190 {
191         if (oc == NULL)
192                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
193         else
194                 /* it is already calculated as sizeof struct obd_capa */
195                 ;
196 }
197
198 static int osc_getattr_interpret(struct ptlrpc_request *req,
199                                  struct osc_async_args *aa, int rc)
200 {
201         struct ost_body *body;
202         ENTRY;
203
204         if (rc != 0)
205                 GOTO(out, rc);
206
207         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
208                                   lustre_swab_ost_body);
209         if (body) {
210                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
212
213                 /* This should really be sent by the OST */
214                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
215                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
216         } else {
217                 CDEBUG(D_INFO, "can't unpack ost_body\n");
218                 rc = -EPROTO;
219                 aa->aa_oi->oi_oa->o_valid = 0;
220         }
221 out:
222         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223         RETURN(rc);
224 }
225
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227                              struct ptlrpc_request_set *set)
228 {
229         struct ptlrpc_request *req;
230         struct osc_async_args *aa;
231         int                    rc;
232         ENTRY;
233
234         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
235         if (req == NULL)
236                 RETURN(-ENOMEM);
237
238         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oinfo);
246
247         ptlrpc_request_set_replen(req);
248         req->rq_interpret_reply = osc_getattr_interpret;
249
250         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
251         aa = (struct osc_async_args *)&req->rq_async_args;
252         aa->aa_oi = oinfo;
253
254         ptlrpc_set_add_req(set, req);
255         RETURN(0);
256 }
257
258 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body       *body;
262         int                    rc;
263         ENTRY;
264
265         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
266         if (req == NULL)
267                 RETURN(-ENOMEM);
268
269         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
271         if (rc) {
272                 ptlrpc_request_free(req);
273                 RETURN(rc);
274         }
275
276         osc_pack_req_body(req, oinfo);
277
278         ptlrpc_request_set_replen(req);
279  
280         rc = ptlrpc_queue_wait(req);
281         if (rc)
282                 GOTO(out, rc);
283
284         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285         if (body == NULL)
286                 GOTO(out, rc = -EPROTO);
287
288         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
289         *oinfo->oi_oa = body->oa;
290
291         /* This should really be sent by the OST */
292         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
293         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
294
295         EXIT;
296  out:
297         ptlrpc_req_finished(req);
298         return rc;
299 }
300
301 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
302                        struct obd_trans_info *oti)
303 {
304         struct ptlrpc_request *req;
305         struct ost_body       *body;
306         int                    rc;
307         ENTRY;
308
309         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
310                                         oinfo->oi_oa->o_gr > 0);
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
318         if (rc) {
319                 ptlrpc_request_free(req);
320                 RETURN(rc);
321         }
322
323         osc_pack_req_body(req, oinfo);
324
325         ptlrpc_request_set_replen(req);
326  
327
328         rc = ptlrpc_queue_wait(req);
329         if (rc)
330                 GOTO(out, rc);
331
332         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
333         if (body == NULL)
334                 GOTO(out, rc = -EPROTO);
335
336         *oinfo->oi_oa = body->oa;
337
338         EXIT;
339 out:
340         ptlrpc_req_finished(req);
341         RETURN(rc);
342 }
343
344 static int osc_setattr_interpret(struct ptlrpc_request *req,
345                                  struct osc_async_args *aa, int rc)
346 {
347         struct ost_body *body;
348         ENTRY;
349
350         if (rc != 0)
351                 GOTO(out, rc);
352
353         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
354         if (body == NULL)
355                 GOTO(out, rc = -EPROTO);
356
357         *aa->aa_oi->oi_oa = body->oa;
358 out:
359         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
360         RETURN(rc);
361 }
362
363 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
364                              struct obd_trans_info *oti,
365                              struct ptlrpc_request_set *rqset)
366 {
367         struct ptlrpc_request *req;
368         struct osc_async_args *aa;
369         int                    rc;
370         ENTRY;
371
372         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
373         if (req == NULL)
374                 RETURN(-ENOMEM);
375
376         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
377         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
378         if (rc) {
379                 ptlrpc_request_free(req);
380                 RETURN(rc);
381         }
382
383         osc_pack_req_body(req, oinfo);
384
385         ptlrpc_request_set_replen(req);
386  
387         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
388                 LASSERT(oti);
389                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
390         }
391
392         /* do mds to ost setattr asynchronouly */
393         if (!rqset) {
394                 /* Do not wait for response. */
395                 ptlrpcd_add_req(req);
396         } else {
397                 req->rq_interpret_reply = osc_setattr_interpret;
398
399                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
400                 aa = (struct osc_async_args *)&req->rq_async_args;
401                 aa->aa_oi = oinfo;
402
403                 ptlrpc_set_add_req(rqset, req);
404         }
405
406         RETURN(0);
407 }
408
409 int osc_real_create(struct obd_export *exp, struct obdo *oa,
410                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
411 {
412         struct ptlrpc_request *req;
413         struct ost_body       *body;
414         struct lov_stripe_md  *lsm;
415         int                    rc;
416         ENTRY;
417
418         LASSERT(oa);
419         LASSERT(ea);
420
421         lsm = *ea;
422         if (!lsm) {
423                 rc = obd_alloc_memmd(exp, &lsm);
424                 if (rc < 0)
425                         RETURN(rc);
426         }
427
428         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
429         if (req == NULL)
430                 GOTO(out, rc = -ENOMEM);
431
432         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
433         if (rc) {
434                 ptlrpc_request_free(req);
435                 GOTO(out, rc);
436         }
437
438         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
439         LASSERT(body);
440         body->oa = *oa;
441
442         ptlrpc_request_set_replen(req);
443
444         if (oa->o_valid & OBD_MD_FLINLINE) {
445                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
446                         oa->o_flags == OBD_FL_DELORPHAN);
447                 DEBUG_REQ(D_HA, req,
448                           "delorphan from OST integration");
449                 /* Don't resend the delorphan req */
450                 req->rq_no_resend = req->rq_no_delay = 1;
451         }
452
453         rc = ptlrpc_queue_wait(req);
454         if (rc)
455                 GOTO(out_req, rc);
456
457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         if (body == NULL)
459                 GOTO(out_req, rc = -EPROTO);
460
461         *oa = body->oa;
462
463         /* This should really be sent by the OST */
464         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
465         oa->o_valid |= OBD_MD_FLBLKSZ;
466
467         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
468          * have valid lsm_oinfo data structs, so don't go touching that.
469          * This needs to be fixed in a big way.
470          */
471         lsm->lsm_object_id = oa->o_id;
472         lsm->lsm_object_gr = oa->o_gr;
473         *ea = lsm;
474
475         if (oti != NULL) {
476                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
477
478                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
479                         if (!oti->oti_logcookies)
480                                 oti_alloc_cookies(oti, 1);
481                         *oti->oti_logcookies = *obdo_logcookie(oa);
482                 }
483         }
484
485         CDEBUG(D_HA, "transno: "LPD64"\n",
486                lustre_msg_get_transno(req->rq_repmsg));
487 out_req:
488         ptlrpc_req_finished(req);
489 out:
490         if (rc && !*ea)
491                 obd_free_memmd(exp, &lsm);
492         RETURN(rc);
493 }
494
495 static int osc_punch_interpret(struct ptlrpc_request *req,
496                                struct osc_async_args *aa, int rc)
497 {
498         struct ost_body *body;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL)
506                 GOTO(out, rc = -EPROTO);
507
508         *aa->aa_oi->oi_oa = body->oa;
509 out:
510         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
511         RETURN(rc);
512 }
513
514 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
515                      struct obd_trans_info *oti,
516                      struct ptlrpc_request_set *rqset)
517 {
518         struct ptlrpc_request *req;
519         struct osc_async_args *aa;
520         struct ost_body       *body;
521         int                    rc;
522         ENTRY;
523
524         if (!oinfo->oi_oa) {
525                 CDEBUG(D_INFO, "oa NULL\n");
526                 RETURN(-EINVAL);
527         }
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541         osc_pack_req_body(req, oinfo);
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         body->oa.o_size = oinfo->oi_policy.l_extent.start;
547         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
548         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
549         ptlrpc_request_set_replen(req);
550
551
552         req->rq_interpret_reply = osc_punch_interpret;
553         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554         aa = (struct osc_async_args *)&req->rq_async_args;
555         aa->aa_oi = oinfo;
556         ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync(struct obd_export *exp, struct obdo *oa,
562                     struct lov_stripe_md *md, obd_size start, obd_size end,
563                     void *capa)
564 {
565         struct ptlrpc_request *req;
566         struct ost_body       *body;
567         int                    rc;
568         ENTRY;
569
570         if (!oa) {
571                 CDEBUG(D_INFO, "oa NULL\n");
572                 RETURN(-EINVAL);
573         }
574
575         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
576         if (req == NULL)
577                 RETURN(-ENOMEM);
578
579         osc_set_capa_size(req, &RMF_CAPA1, capa);
580         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
581         if (rc) {
582                 ptlrpc_request_free(req);
583                 RETURN(rc);
584         }
585
586         /* overload the size and blocks fields in the oa with start/end */
587         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
588         LASSERT(body);
589         body->oa = *oa;
590         body->oa.o_size = start;
591         body->oa.o_blocks = end;
592         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
593         osc_pack_capa(req, body, capa);
594
595         ptlrpc_request_set_replen(req);
596
597         rc = ptlrpc_queue_wait(req);
598         if (rc)
599                 GOTO(out, rc);
600
601         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602         if (body == NULL)
603                 GOTO(out, rc = -EPROTO);
604
605         *oa = body->oa;
606
607         EXIT;
608  out:
609         ptlrpc_req_finished(req);
610         return rc;
611 }
612
613 /* Find and cancel locally locks matched by @mode in the resource found by
614  * @objid. Found locks are added into @cancel list. Returns the amount of
615  * locks added to @cancels list. */
616 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
617                                    struct list_head *cancels, ldlm_mode_t mode,
618                                    int lock_flags)
619 {
620         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
621         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
622         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
623         int count;
624         ENTRY;
625
626         if (res == NULL)
627                 RETURN(0);
628
629         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
630                                            lock_flags, 0, NULL);
631         ldlm_resource_putref(res);
632         RETURN(count);
633 }
634
635 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
636                                  int rc)
637 {
638         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
639
640         atomic_dec(&cli->cl_destroy_in_flight);
641         cfs_waitq_signal(&cli->cl_destroy_waitq);
642         return 0;
643 }
644
645 static int osc_can_send_destroy(struct client_obd *cli)
646 {
647         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
648             cli->cl_max_rpcs_in_flight) {
649                 /* The destroy request can be sent */
650                 return 1;
651         }
652         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
653             cli->cl_max_rpcs_in_flight) {
654                 /*
655                  * The counter has been modified between the two atomic
656                  * operations.
657                  */
658                 cfs_waitq_signal(&cli->cl_destroy_waitq);
659         }
660         return 0;
661 }
662
663 /* Destroy requests can be async always on the client, and we don't even really
664  * care about the return code since the client cannot do anything at all about
665  * a destroy failure.
666  * When the MDS is unlinking a filename, it saves the file objects into a
667  * recovery llog, and these object records are cancelled when the OST reports
668  * they were destroyed and sync'd to disk (i.e. transaction committed).
669  * If the client dies, or the OST is down when the object should be destroyed,
670  * the records are not cancelled, and when the OST reconnects to the MDS next,
671  * it will retrieve the llog unlink logs and then sends the log cancellation
672  * cookies to the MDS after committing destroy transactions. */
673 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
674                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
675                        struct obd_export *md_export)
676 {
677         struct client_obd     *cli = &exp->exp_obd->u.cli;
678         struct ptlrpc_request *req;
679         struct ost_body       *body;
680         CFS_LIST_HEAD(cancels);
681         int rc, count;
682         ENTRY;
683
684         if (!oa) {
685                 CDEBUG(D_INFO, "oa NULL\n");
686                 RETURN(-EINVAL);
687         }
688
689         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
690                                         LDLM_FL_DISCARD_DATA);
691
692         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
693         if (req == NULL) {
694                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
695                 RETURN(-ENOMEM);
696         }
697
698         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
699                                0, &cancels, count);
700         if (rc) {
701                 ptlrpc_request_free(req);
702                 RETURN(rc);
703         }
704
705         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
706         req->rq_interpret_reply = osc_destroy_interpret;
707         ptlrpc_at_set_req_timeout(req);
708
709         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
710                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
711                        sizeof(*oti->oti_logcookies));
712         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
713         LASSERT(body);
714         body->oa = *oa;
715
716         ptlrpc_request_set_replen(req);
717
718         if (!osc_can_send_destroy(cli)) {
719                 struct l_wait_info lwi = { 0 };
720
721                 /*
722                  * Wait until the number of on-going destroy RPCs drops
723                  * under max_rpc_in_flight
724                  */
725                 l_wait_event_exclusive(cli->cl_destroy_waitq,
726                                        osc_can_send_destroy(cli), &lwi);
727         }
728
729         /* Do not wait for response */
730         ptlrpcd_add_req(req);
731         RETURN(0);
732 }
733
734 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
735                                 long writing_bytes)
736 {
737         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
738
739         LASSERT(!(oa->o_valid & bits));
740
741         oa->o_valid |= bits;
742         client_obd_list_lock(&cli->cl_loi_list_lock);
743         oa->o_dirty = cli->cl_dirty;
744         if (cli->cl_dirty > cli->cl_dirty_max) {
745                 CERROR("dirty %lu > dirty_max %lu\n",
746                        cli->cl_dirty, cli->cl_dirty_max);
747                 oa->o_undirty = 0;
748         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
749                 CERROR("dirty %d > system dirty_max %d\n",
750                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
751                 oa->o_undirty = 0;
752         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
753                 CERROR("dirty %lu - dirty_max %lu too big???\n",
754                        cli->cl_dirty, cli->cl_dirty_max);
755                 oa->o_undirty = 0;
756         } else {
757                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
758                                 (cli->cl_max_rpcs_in_flight + 1);
759                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
760         }
761         oa->o_grant = cli->cl_avail_grant;
762         oa->o_dropped = cli->cl_lost_grant;
763         cli->cl_lost_grant = 0;
764         client_obd_list_unlock(&cli->cl_loi_list_lock);
765         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
766                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
767 }
768
769 /* caller must hold loi_list_lock */
770 static void osc_consume_write_grant(struct client_obd *cli,
771                                     struct brw_page *pga)
772 {
773         atomic_inc(&obd_dirty_pages);
774         cli->cl_dirty += CFS_PAGE_SIZE;
775         cli->cl_avail_grant -= CFS_PAGE_SIZE;
776         pga->flag |= OBD_BRW_FROM_GRANT;
777         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
778                CFS_PAGE_SIZE, pga, pga->pg);
779         LASSERT(cli->cl_avail_grant >= 0);
780 }
781
782 /* the companion to osc_consume_write_grant, called when a brw has completed.
783  * must be called with the loi lock held. */
784 static void osc_release_write_grant(struct client_obd *cli,
785                                     struct brw_page *pga, int sent)
786 {
787         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
788         ENTRY;
789
790         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
791                 EXIT;
792                 return;
793         }
794
795         pga->flag &= ~OBD_BRW_FROM_GRANT;
796         atomic_dec(&obd_dirty_pages);
797         cli->cl_dirty -= CFS_PAGE_SIZE;
798         if (!sent) {
799                 cli->cl_lost_grant += CFS_PAGE_SIZE;
800                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
801                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
802         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
803                 /* For short writes we shouldn't count parts of pages that
804                  * span a whole block on the OST side, or our accounting goes
805                  * wrong.  Should match the code in filter_grant_check. */
806                 int offset = pga->off & ~CFS_PAGE_MASK;
807                 int count = pga->count + (offset & (blocksize - 1));
808                 int end = (offset + pga->count) & (blocksize - 1);
809                 if (end)
810                         count += blocksize - end;
811
812                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
813                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
814                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
815                        cli->cl_avail_grant, cli->cl_dirty);
816         }
817
818         EXIT;
819 }
820
821 static unsigned long rpcs_in_flight(struct client_obd *cli)
822 {
823         return cli->cl_r_in_flight + cli->cl_w_in_flight;
824 }
825
826 /* caller must hold loi_list_lock */
827 void osc_wake_cache_waiters(struct client_obd *cli)
828 {
829         struct list_head *l, *tmp;
830         struct osc_cache_waiter *ocw;
831
832         ENTRY;
833         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
834                 /* if we can't dirty more, we must wait until some is written */
835                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
836                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
837                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
838                                "osc max %ld, sys max %d\n", cli->cl_dirty,
839                                cli->cl_dirty_max, obd_max_dirty_pages);
840                         return;
841                 }
842
843                 /* if still dirty cache but no grant wait for pending RPCs that
844                  * may yet return us some grant before doing sync writes */
845                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
846                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
847                                cli->cl_w_in_flight);
848                         return;
849                 }
850
851                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
852                 list_del_init(&ocw->ocw_entry);
853                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
854                         /* no more RPCs in flight to return grant, do sync IO */
855                         ocw->ocw_rc = -EDQUOT;
856                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
857                 } else {
858                         osc_consume_write_grant(cli,
859                                                 &ocw->ocw_oap->oap_brw_page);
860                 }
861
862                 cfs_waitq_signal(&ocw->ocw_waitq);
863         }
864
865         EXIT;
866 }
867
868 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
869 {
870         client_obd_list_lock(&cli->cl_loi_list_lock);
871         cli->cl_avail_grant = ocd->ocd_grant;
872         client_obd_list_unlock(&cli->cl_loi_list_lock);
873
874         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
875                cli->cl_avail_grant, cli->cl_lost_grant);
876         LASSERT(cli->cl_avail_grant >= 0);
877 }
878
879 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
880 {
881         client_obd_list_lock(&cli->cl_loi_list_lock);
882         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
883         if (body->oa.o_valid & OBD_MD_FLGRANT)
884                 cli->cl_avail_grant += body->oa.o_grant;
885         /* waiters are woken in brw_interpret */
886         client_obd_list_unlock(&cli->cl_loi_list_lock);
887 }
888
889 /* We assume that the reason this OSC got a short read is because it read
890  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
891  * via the LOV, and it _knows_ it's reading inside the file, it's just that
892  * this stripe never got written at or beyond this stripe offset yet. */
893 static void handle_short_read(int nob_read, obd_count page_count,
894                               struct brw_page **pga)
895 {
896         char *ptr;
897         int i = 0;
898
899         /* skip bytes read OK */
900         while (nob_read > 0) {
901                 LASSERT (page_count > 0);
902
903                 if (pga[i]->count > nob_read) {
904                         /* EOF inside this page */
905                         ptr = cfs_kmap(pga[i]->pg) +
906                                 (pga[i]->off & ~CFS_PAGE_MASK);
907                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
908                         cfs_kunmap(pga[i]->pg);
909                         page_count--;
910                         i++;
911                         break;
912                 }
913
914                 nob_read -= pga[i]->count;
915                 page_count--;
916                 i++;
917         }
918
919         /* zero remaining pages */
920         while (page_count-- > 0) {
921                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
922                 memset(ptr, 0, pga[i]->count);
923                 cfs_kunmap(pga[i]->pg);
924                 i++;
925         }
926 }
927
928 static int check_write_rcs(struct ptlrpc_request *req,
929                            int requested_nob, int niocount,
930                            obd_count page_count, struct brw_page **pga)
931 {
932         int    *remote_rcs, i;
933
934         /* return error if any niobuf was in error */
935         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
936                                         sizeof(*remote_rcs) * niocount, NULL);
937         if (remote_rcs == NULL) {
938                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
939                 return(-EPROTO);
940         }
941         if (lustre_msg_swabbed(req->rq_repmsg))
942                 for (i = 0; i < niocount; i++)
943                         __swab32s(&remote_rcs[i]);
944
945         for (i = 0; i < niocount; i++) {
946                 if (remote_rcs[i] < 0)
947                         return(remote_rcs[i]);
948
949                 if (remote_rcs[i] != 0) {
950                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
951                                 i, remote_rcs[i], req);
952                         return(-EPROTO);
953                 }
954         }
955
956         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
957                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
958                        requested_nob, req->rq_bulk->bd_nob_transferred);
959                 return(-EPROTO);
960         }
961
962         return (0);
963 }
964
965 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
966 {
967         if (p1->flag != p2->flag) {
968                 unsigned mask = ~OBD_BRW_FROM_GRANT;
969
970                 /* warn if we try to combine flags that we don't know to be
971                  * safe to combine */
972                 if ((p1->flag & mask) != (p2->flag & mask))
973                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
974                                "same brw?\n", p1->flag, p2->flag);
975                 return 0;
976         }
977
978         return (p1->off + p1->count == p2->off);
979 }
980
981 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
982                                    struct brw_page **pga, int opc,
983                                    cksum_type_t cksum_type)
984 {
985         __u32 cksum;
986         int i = 0;
987
988         LASSERT (pg_count > 0);
989         cksum = init_checksum(cksum_type);
990         while (nob > 0 && pg_count > 0) {
991                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
992                 int off = pga[i]->off & ~CFS_PAGE_MASK;
993                 int count = pga[i]->count > nob ? nob : pga[i]->count;
994
995                 /* corrupt the data before we compute the checksum, to
996                  * simulate an OST->client data error */
997                 if (i == 0 && opc == OST_READ &&
998                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
999                         memcpy(ptr + off, "bad1", min(4, nob));
1000                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1001                 cfs_kunmap(pga[i]->pg);
1002                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1003                                off, cksum);
1004
1005                 nob -= pga[i]->count;
1006                 pg_count--;
1007                 i++;
1008         }
1009         /* For sending we only compute the wrong checksum instead
1010          * of corrupting the data so it is still correct on a redo */
1011         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1012                 cksum++;
1013
1014         return cksum;
1015 }
1016
1017 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1018                                 struct lov_stripe_md *lsm, obd_count page_count,
1019                                 struct brw_page **pga, 
1020                                 struct ptlrpc_request **reqp,
1021                                 struct obd_capa *ocapa)
1022 {
1023         struct ptlrpc_request   *req;
1024         struct ptlrpc_bulk_desc *desc;
1025         struct ost_body         *body;
1026         struct obd_ioobj        *ioobj;
1027         struct niobuf_remote    *niobuf;
1028         int niocount, i, requested_nob, opc, rc;
1029         struct osc_brw_async_args *aa;
1030         struct req_capsule      *pill;
1031         struct brw_page *pg_prev;
1032
1033         ENTRY;
1034         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1035                 RETURN(-ENOMEM); /* Recoverable */
1036         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1037                 RETURN(-EINVAL); /* Fatal */
1038
1039         if ((cmd & OBD_BRW_WRITE) != 0) {
1040                 opc = OST_WRITE;
1041                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1042                                                 cli->cl_import->imp_rq_pool,
1043                                                 &RQF_OST_BRW);
1044         } else {
1045                 opc = OST_READ;
1046                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1047         }
1048
1049         if (req == NULL)
1050                 RETURN(-ENOMEM);
1051
1052         for (niocount = i = 1; i < page_count; i++) {
1053                 if (!can_merge_pages(pga[i - 1], pga[i]))
1054                         niocount++;
1055         }
1056
1057         pill = &req->rq_pill;
1058         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1059                              niocount * sizeof(*niobuf));
1060         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1061
1062         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1063         if (rc) {
1064                 ptlrpc_request_free(req);
1065                 RETURN(rc);
1066         }
1067         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1068         ptlrpc_at_set_req_timeout(req);
1069
1070         if (opc == OST_WRITE)
1071                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1072                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1073         else
1074                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1075                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1076
1077         if (desc == NULL)
1078                 GOTO(out, rc = -ENOMEM);
1079         /* NB request now owns desc and will free it when it gets freed */
1080
1081         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1082         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1083         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1084         LASSERT(body && ioobj && niobuf);
1085
1086         body->oa = *oa;
1087
1088         obdo_to_ioobj(oa, ioobj);
1089         ioobj->ioo_bufcnt = niocount;
1090         osc_pack_capa(req, body, ocapa);
1091         LASSERT (page_count > 0);
1092         pg_prev = pga[0];
1093         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1094                 struct brw_page *pg = pga[i];
1095
1096                 LASSERT(pg->count > 0);
1097                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1098                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1099                          pg->off, pg->count);
1100 #ifdef __linux__
1101                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1102                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1103                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1104                          i, page_count,
1105                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1106                          pg_prev->pg, page_private(pg_prev->pg),
1107                          pg_prev->pg->index, pg_prev->off);
1108 #else
1109                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1110                          "i %d p_c %u\n", i, page_count);
1111 #endif
1112                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1113                         (pg->flag & OBD_BRW_SRVLOCK));
1114
1115                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1116                                       pg->count);
1117                 requested_nob += pg->count;
1118
1119                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1120                         niobuf--;
1121                         niobuf->len += pg->count;
1122                 } else {
1123                         niobuf->offset = pg->off;
1124                         niobuf->len    = pg->count;
1125                         niobuf->flags  = pg->flag;
1126                 }
1127                 pg_prev = pg;
1128         }
1129
1130         LASSERTF((void *)(niobuf - niocount) ==
1131                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1132                                niocount * sizeof(*niobuf)),
1133                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1134                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1135                 (void *)(niobuf - niocount));
1136
1137         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1138
1139         /* size[REQ_REC_OFF] still sizeof (*body) */
1140         if (opc == OST_WRITE) {
1141                 if (unlikely(cli->cl_checksum) &&
1142                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1143                         /* store cl_cksum_type in a local variable since
1144                          * it can be changed via lprocfs */
1145                         cksum_type_t cksum_type = cli->cl_cksum_type;
1146
1147                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1148                                 oa->o_flags = body->oa.o_flags = 0;
1149                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1150                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1151                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1152                                                              page_count, pga,
1153                                                              OST_WRITE,
1154                                                              cksum_type);
1155                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1156                                body->oa.o_cksum);
1157                         /* save this in 'oa', too, for later checking */
1158                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1159                         oa->o_flags |= cksum_type_pack(cksum_type);
1160                 } else {
1161                         /* clear out the checksum flag, in case this is a
1162                          * resend but cl_checksum is no longer set. b=11238 */
1163                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1164                 }
1165                 oa->o_cksum = body->oa.o_cksum;
1166                 /* 1 RC per niobuf */
1167                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1168                                      sizeof(__u32) * niocount);
1169         } else {
1170                 if (unlikely(cli->cl_checksum) &&
1171                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1172                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1173                                 body->oa.o_flags = 0;
1174                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1175                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1176                 }
1177                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1178                 /* 1 RC for the whole I/O */
1179         }
1180         ptlrpc_request_set_replen(req);
1181
1182         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1183         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1184         aa->aa_oa = oa;
1185         aa->aa_requested_nob = requested_nob;
1186         aa->aa_nio_count = niocount;
1187         aa->aa_page_count = page_count;
1188         aa->aa_resends = 0;
1189         aa->aa_ppga = pga;
1190         aa->aa_cli = cli;
1191         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1192
1193         *reqp = req;
1194         RETURN(0);
1195
1196  out:
1197         ptlrpc_req_finished(req);
1198         RETURN(rc);
1199 }
1200
1201 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1202                                 __u32 client_cksum, __u32 server_cksum, int nob,
1203                                 obd_count page_count, struct brw_page **pga,
1204                                 cksum_type_t client_cksum_type)
1205 {
1206         __u32 new_cksum;
1207         char *msg;
1208         cksum_type_t cksum_type;
1209
1210         if (server_cksum == client_cksum) {
1211                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1212                 return 0;
1213         }
1214
1215         if (oa->o_valid & OBD_MD_FLFLAGS)
1216                 cksum_type = cksum_type_unpack(oa->o_flags);
1217         else
1218                 cksum_type = OBD_CKSUM_CRC32;
1219
1220         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1221                                       cksum_type);
1222
1223         if (cksum_type != client_cksum_type)
1224                 msg = "the server did not use the checksum type specified in "
1225                       "the original request - likely a protocol problem";
1226         else if (new_cksum == server_cksum)
1227                 msg = "changed on the client after we checksummed it - "
1228                       "likely false positive due to mmap IO (bug 11742)";
1229         else if (new_cksum == client_cksum)
1230                 msg = "changed in transit before arrival at OST";
1231         else
1232                 msg = "changed in transit AND doesn't match the original - "
1233                       "likely false positive due to mmap IO (bug 11742)";
1234
1235         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1236                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1237                            "["LPU64"-"LPU64"]\n",
1238                            msg, libcfs_nid2str(peer->nid),
1239                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1240                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1241                                                         (__u64)0,
1242                            oa->o_id,
1243                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1244                            pga[0]->off,
1245                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1246         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1247                "client csum now %x\n", client_cksum, client_cksum_type,
1248                server_cksum, cksum_type, new_cksum);
1249         return 1;        
1250 }
1251
1252 /* Note rc enters this function as number of bytes transferred */
1253 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1254 {
1255         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1256         const lnet_process_id_t *peer =
1257                         &req->rq_import->imp_connection->c_peer;
1258         struct client_obd *cli = aa->aa_cli;
1259         struct ost_body *body;
1260         __u32 client_cksum = 0;
1261         ENTRY;
1262
1263         if (rc < 0 && rc != -EDQUOT)
1264                 RETURN(rc);
1265
1266         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1267         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1268                                   lustre_swab_ost_body);
1269         if (body == NULL) {
1270                 CDEBUG(D_INFO, "Can't unpack body\n");
1271                 RETURN(-EPROTO);
1272         }
1273
1274         /* set/clear over quota flag for a uid/gid */
1275         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1276             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1277                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1278                              body->oa.o_gid, body->oa.o_valid,
1279                              body->oa.o_flags);
1280
1281         if (rc < 0)
1282                 RETURN(rc);
1283
1284         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1285                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1286
1287         osc_update_grant(cli, body);
1288
1289         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1290                 if (rc > 0) {
1291                         CERROR("Unexpected +ve rc %d\n", rc);
1292                         RETURN(-EPROTO);
1293                 }
1294                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1295
1296                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1297                     check_write_checksum(&body->oa, peer, client_cksum,
1298                                          body->oa.o_cksum, aa->aa_requested_nob,
1299                                          aa->aa_page_count, aa->aa_ppga,
1300                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1301                         RETURN(-EAGAIN);
1302
1303                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1304                         RETURN(-EAGAIN);
1305
1306                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1307                                      aa->aa_page_count, aa->aa_ppga);
1308                 GOTO(out, rc);
1309         }
1310
1311         /* The rest of this function executes only for OST_READs */
1312         if (rc > aa->aa_requested_nob) {
1313                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1314                        aa->aa_requested_nob);
1315                 RETURN(-EPROTO);
1316         }
1317
1318         if (rc != req->rq_bulk->bd_nob_transferred) {
1319                 CERROR ("Unexpected rc %d (%d transferred)\n",
1320                         rc, req->rq_bulk->bd_nob_transferred);
1321                 return (-EPROTO);
1322         }
1323
1324         if (rc < aa->aa_requested_nob)
1325                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1326
1327         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1328                                          aa->aa_ppga))
1329                 GOTO(out, rc = -EAGAIN);
1330
1331         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1332                 static int cksum_counter;
1333                 __u32      server_cksum = body->oa.o_cksum;
1334                 char      *via;
1335                 char      *router;
1336                 cksum_type_t cksum_type;
1337
1338                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1339                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1340                 else
1341                         cksum_type = OBD_CKSUM_CRC32;
1342                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1343                                                  aa->aa_ppga, OST_READ,
1344                                                  cksum_type);
1345
1346                 if (peer->nid == req->rq_bulk->bd_sender) {
1347                         via = router = "";
1348                 } else {
1349                         via = " via ";
1350                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1351                 }
1352
1353                 if (server_cksum == ~0 && rc > 0) {
1354                         CERROR("Protocol error: server %s set the 'checksum' "
1355                                "bit, but didn't send a checksum.  Not fatal, "
1356                                "but please tell CFS.\n",
1357                                libcfs_nid2str(peer->nid));
1358                 } else if (server_cksum != client_cksum) {
1359                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1360                                            "%s%s%s inum "LPU64"/"LPU64" object "
1361                                            LPU64"/"LPU64" extent "
1362                                            "["LPU64"-"LPU64"]\n",
1363                                            req->rq_import->imp_obd->obd_name,
1364                                            libcfs_nid2str(peer->nid),
1365                                            via, router,
1366                                            body->oa.o_valid & OBD_MD_FLFID ?
1367                                                 body->oa.o_fid : (__u64)0,
1368                                            body->oa.o_valid & OBD_MD_FLFID ?
1369                                                 body->oa.o_generation :(__u64)0,
1370                                            body->oa.o_id,
1371                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1372                                                 body->oa.o_gr : (__u64)0,
1373                                            aa->aa_ppga[0]->off,
1374                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1375                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1376                                                                         1);
1377                         CERROR("client %x, server %x, cksum_type %x\n",
1378                                client_cksum, server_cksum, cksum_type);
1379                         cksum_counter = 0;
1380                         aa->aa_oa->o_cksum = client_cksum;
1381                         rc = -EAGAIN;
1382                 } else {
1383                         cksum_counter++;
1384                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1385                         rc = 0;
1386                 }
1387         } else if (unlikely(client_cksum)) {
1388                 static int cksum_missed;
1389
1390                 cksum_missed++;
1391                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1392                         CERROR("Checksum %u requested from %s but not sent\n",
1393                                cksum_missed, libcfs_nid2str(peer->nid));
1394         } else {
1395                 rc = 0;
1396         }
1397 out:
1398         if (rc >= 0)
1399                 *aa->aa_oa = body->oa;
1400
1401         RETURN(rc);
1402 }
1403
1404 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1405                             struct lov_stripe_md *lsm,
1406                             obd_count page_count, struct brw_page **pga,
1407                             struct obd_capa *ocapa)
1408 {
1409         struct ptlrpc_request *req;
1410         int                    rc;
1411         cfs_waitq_t            waitq;
1412         int                    resends = 0;
1413         struct l_wait_info     lwi;
1414
1415         ENTRY;
1416
1417         cfs_waitq_init(&waitq);
1418
1419 restart_bulk:
1420         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1421                                   page_count, pga, &req, ocapa);
1422         if (rc != 0)
1423                 return (rc);
1424
1425         rc = ptlrpc_queue_wait(req);
1426
1427         if (rc == -ETIMEDOUT && req->rq_resend) {
1428                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1429                 ptlrpc_req_finished(req);
1430                 goto restart_bulk;
1431         }
1432
1433         rc = osc_brw_fini_request(req, rc);
1434
1435         ptlrpc_req_finished(req);
1436         if (osc_recoverable_error(rc)) {
1437                 resends++;
1438                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1439                         CERROR("too many resend retries, returning error\n");
1440                         RETURN(-EIO);
1441                 }
1442
1443                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1444                 l_wait_event(waitq, 0, &lwi);
1445
1446                 goto restart_bulk;
1447         }
1448         
1449         RETURN (rc);
1450 }
1451
1452 int osc_brw_redo_request(struct ptlrpc_request *request,
1453                          struct osc_brw_async_args *aa)
1454 {
1455         struct ptlrpc_request *new_req;
1456         struct ptlrpc_request_set *set = request->rq_set;
1457         struct osc_brw_async_args *new_aa;
1458         struct osc_async_page *oap;
1459         int rc = 0;
1460         ENTRY;
1461
1462         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1463                 CERROR("too many resend retries, returning error\n");
1464                 RETURN(-EIO);
1465         }
1466
1467         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1468 /*
1469         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1470         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1471                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1472                                            REQ_REC_OFF + 3);
1473 */
1474         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1475                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1476                                   aa->aa_cli, aa->aa_oa,
1477                                   NULL /* lsm unused by osc currently */,
1478                                   aa->aa_page_count, aa->aa_ppga, 
1479                                   &new_req, NULL /* ocapa */);
1480         if (rc)
1481                 RETURN(rc);
1482
1483         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1484
1485         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1486                 if (oap->oap_request != NULL) {
1487                         LASSERTF(request == oap->oap_request,
1488                                  "request %p != oap_request %p\n",
1489                                  request, oap->oap_request);
1490                         if (oap->oap_interrupted) {
1491                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1492                                 ptlrpc_req_finished(new_req);
1493                                 RETURN(-EINTR);
1494                         }
1495                 }
1496         }
1497         /* New request takes over pga and oaps from old request.
1498          * Note that copying a list_head doesn't work, need to move it... */
1499         aa->aa_resends++;
1500         new_req->rq_interpret_reply = request->rq_interpret_reply;
1501         new_req->rq_async_args = request->rq_async_args;
1502         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1503
1504         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1505
1506         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1507         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1508         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1509
1510         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1511                 if (oap->oap_request) {
1512                         ptlrpc_req_finished(oap->oap_request);
1513                         oap->oap_request = ptlrpc_request_addref(new_req);
1514                 }
1515         }
1516
1517         /* use ptlrpc_set_add_req is safe because interpret functions work 
1518          * in check_set context. only one way exist with access to request 
1519          * from different thread got -EINTR - this way protected with 
1520          * cl_loi_list_lock */
1521         ptlrpc_set_add_req(set, new_req);
1522
1523         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1524
1525         DEBUG_REQ(D_INFO, new_req, "new request");
1526         RETURN(0);
1527 }
1528
1529 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1530                           struct lov_stripe_md *lsm, obd_count page_count,
1531                           struct brw_page **pga, struct ptlrpc_request_set *set,
1532                           struct obd_capa *ocapa)
1533 {
1534         struct ptlrpc_request     *req;
1535         struct client_obd         *cli = &exp->exp_obd->u.cli;
1536         int                        rc, i;
1537         struct osc_brw_async_args *aa;
1538         ENTRY;
1539
1540         /* Consume write credits even if doing a sync write -
1541          * otherwise we may run out of space on OST due to grant. */
1542         if (cmd == OBD_BRW_WRITE) {
1543                 spin_lock(&cli->cl_loi_list_lock);
1544                 for (i = 0; i < page_count; i++) {
1545                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1546                                 osc_consume_write_grant(cli, pga[i]);
1547                 }
1548                 spin_unlock(&cli->cl_loi_list_lock);
1549         }
1550
1551         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1552                                   &req, ocapa);
1553
1554         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1555         if (cmd == OBD_BRW_READ) {
1556                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1557                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1558                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1559         } else {
1560                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1561                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1562                                  cli->cl_w_in_flight);
1563                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1564         }
1565
1566         LASSERT(list_empty(&aa->aa_oaps));
1567         if (rc == 0) {
1568                 req->rq_interpret_reply = brw_interpret;
1569                 ptlrpc_set_add_req(set, req);
1570                 client_obd_list_lock(&cli->cl_loi_list_lock);
1571                 if (cmd == OBD_BRW_READ)
1572                         cli->cl_r_in_flight++;
1573                 else
1574                         cli->cl_w_in_flight++;
1575                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1576                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1577         } else if (cmd == OBD_BRW_WRITE) {
1578                 client_obd_list_lock(&cli->cl_loi_list_lock);
1579                 for (i = 0; i < page_count; i++)
1580                         osc_release_write_grant(cli, pga[i], 0);
1581                 osc_wake_cache_waiters(cli);
1582                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1583         }
1584         RETURN (rc);
1585 }
1586
1587 /*
1588  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1589  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1590  * fine for our small page arrays and doesn't require allocation.  its an
1591  * insertion sort that swaps elements that are strides apart, shrinking the
1592  * stride down until its '1' and the array is sorted.
1593  */
1594 static void sort_brw_pages(struct brw_page **array, int num)
1595 {
1596         int stride, i, j;
1597         struct brw_page *tmp;
1598
1599         if (num == 1)
1600                 return;
1601         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1602                 ;
1603
1604         do {
1605                 stride /= 3;
1606                 for (i = stride ; i < num ; i++) {
1607                         tmp = array[i];
1608                         j = i;
1609                         while (j >= stride && array[j - stride]->off > tmp->off) {
1610                                 array[j] = array[j - stride];
1611                                 j -= stride;
1612                         }
1613                         array[j] = tmp;
1614                 }
1615         } while (stride > 1);
1616 }
1617
1618 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1619 {
1620         int count = 1;
1621         int offset;
1622         int i = 0;
1623
1624         LASSERT (pages > 0);
1625         offset = pg[i]->off & ~CFS_PAGE_MASK;
1626
1627         for (;;) {
1628                 pages--;
1629                 if (pages == 0)         /* that's all */
1630                         return count;
1631
1632                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1633                         return count;   /* doesn't end on page boundary */
1634
1635                 i++;
1636                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1637                 if (offset != 0)        /* doesn't start on page boundary */
1638                         return count;
1639
1640                 count++;
1641         }
1642 }
1643
1644 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1645 {
1646         struct brw_page **ppga;
1647         int i;
1648
1649         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1650         if (ppga == NULL)
1651                 return NULL;
1652
1653         for (i = 0; i < count; i++)
1654                 ppga[i] = pga + i;
1655         return ppga;
1656 }
1657
1658 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1659 {
1660         LASSERT(ppga != NULL);
1661         OBD_FREE(ppga, sizeof(*ppga) * count);
1662 }
1663
1664 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1665                    obd_count page_count, struct brw_page *pga,
1666                    struct obd_trans_info *oti)
1667 {
1668         struct obdo *saved_oa = NULL;
1669         struct brw_page **ppga, **orig;
1670         struct obd_import *imp = class_exp2cliimp(exp);
1671         struct client_obd *cli = &imp->imp_obd->u.cli;
1672         int rc, page_count_orig;
1673         ENTRY;
1674
1675         if (cmd & OBD_BRW_CHECK) {
1676                 /* The caller just wants to know if there's a chance that this
1677                  * I/O can succeed */
1678
1679                 if (imp == NULL || imp->imp_invalid)
1680                         RETURN(-EIO);
1681                 RETURN(0);
1682         }
1683
1684         /* test_brw with a failed create can trip this, maybe others. */
1685         LASSERT(cli->cl_max_pages_per_rpc);
1686
1687         rc = 0;
1688
1689         orig = ppga = osc_build_ppga(pga, page_count);
1690         if (ppga == NULL)
1691                 RETURN(-ENOMEM);
1692         page_count_orig = page_count;
1693
1694         sort_brw_pages(ppga, page_count);
1695         while (page_count) {
1696                 obd_count pages_per_brw;
1697
1698                 if (page_count > cli->cl_max_pages_per_rpc)
1699                         pages_per_brw = cli->cl_max_pages_per_rpc;
1700                 else
1701                         pages_per_brw = page_count;
1702
1703                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1704
1705                 if (saved_oa != NULL) {
1706                         /* restore previously saved oa */
1707                         *oinfo->oi_oa = *saved_oa;
1708                 } else if (page_count > pages_per_brw) {
1709                         /* save a copy of oa (brw will clobber it) */
1710                         OBDO_ALLOC(saved_oa);
1711                         if (saved_oa == NULL)
1712                                 GOTO(out, rc = -ENOMEM);
1713                         *saved_oa = *oinfo->oi_oa;
1714                 }
1715
1716                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1717                                       pages_per_brw, ppga, oinfo->oi_capa);
1718
1719                 if (rc != 0)
1720                         break;
1721
1722                 page_count -= pages_per_brw;
1723                 ppga += pages_per_brw;
1724         }
1725
1726 out:
1727         osc_release_ppga(orig, page_count_orig);
1728
1729         if (saved_oa != NULL)
1730                 OBDO_FREE(saved_oa);
1731
1732         RETURN(rc);
1733 }
1734
1735 static int osc_brw_async(int cmd, struct obd_export *exp,
1736                          struct obd_info *oinfo, obd_count page_count,
1737                          struct brw_page *pga, struct obd_trans_info *oti,
1738                          struct ptlrpc_request_set *set)
1739 {
1740         struct brw_page **ppga, **orig;
1741         struct client_obd *cli = &exp->exp_obd->u.cli;
1742         int page_count_orig;
1743         int rc = 0;
1744         ENTRY;
1745
1746         if (cmd & OBD_BRW_CHECK) {
1747                 struct obd_import *imp = class_exp2cliimp(exp);
1748                 /* The caller just wants to know if there's a chance that this
1749                  * I/O can succeed */
1750
1751                 if (imp == NULL || imp->imp_invalid)
1752                         RETURN(-EIO);
1753                 RETURN(0);
1754         }
1755
1756         orig = ppga = osc_build_ppga(pga, page_count);
1757         if (ppga == NULL)
1758                 RETURN(-ENOMEM);
1759         page_count_orig = page_count;
1760
1761         sort_brw_pages(ppga, page_count);
1762         while (page_count) {
1763                 struct brw_page **copy;
1764                 obd_count pages_per_brw;
1765
1766                 pages_per_brw = min_t(obd_count, page_count,
1767                                       cli->cl_max_pages_per_rpc);
1768
1769                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1770
1771                 /* use ppga only if single RPC is going to fly */
1772                 if (pages_per_brw != page_count_orig || ppga != orig) {
1773                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1774                         if (copy == NULL)
1775                                 GOTO(out, rc = -ENOMEM);
1776                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1777                 } else
1778                         copy = ppga;
1779
1780                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1781                                     pages_per_brw, copy, set, oinfo->oi_capa);
1782
1783                 if (rc != 0) {
1784                         if (copy != ppga)
1785                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1786                         break;
1787                 }
1788                 if (copy == orig) {
1789                         /* we passed it to async_internal() which is
1790                          * now responsible for releasing memory */
1791                         orig = NULL;
1792                 }
1793
1794                 page_count -= pages_per_brw;
1795                 ppga += pages_per_brw;
1796         }
1797 out:
1798         if (orig)
1799                 osc_release_ppga(orig, page_count_orig);
1800         RETURN(rc);
1801 }
1802
1803 static void osc_check_rpcs(struct client_obd *cli);
1804
1805 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1806  * the dirty accounting.  Writeback completes or truncate happens before
1807  * writing starts.  Must be called with the loi lock held. */
1808 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1809                            int sent)
1810 {
1811         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1812 }
1813
1814
1815 /* This maintains the lists of pending pages to read/write for a given object
1816  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1817  * to quickly find objects that are ready to send an RPC. */
1818 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1819                          int cmd)
1820 {
1821         int optimal;
1822         ENTRY;
1823
1824         if (lop->lop_num_pending == 0)
1825                 RETURN(0);
1826
1827         /* if we have an invalid import we want to drain the queued pages
1828          * by forcing them through rpcs that immediately fail and complete
1829          * the pages.  recovery relies on this to empty the queued pages
1830          * before canceling the locks and evicting down the llite pages */
1831         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1832                 RETURN(1);
1833
1834         /* stream rpcs in queue order as long as as there is an urgent page
1835          * queued.  this is our cheap solution for good batching in the case
1836          * where writepage marks some random page in the middle of the file
1837          * as urgent because of, say, memory pressure */
1838         if (!list_empty(&lop->lop_urgent)) {
1839                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1840                 RETURN(1);
1841         }
1842         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1843         optimal = cli->cl_max_pages_per_rpc;
1844         if (cmd & OBD_BRW_WRITE) {
1845                 /* trigger a write rpc stream as long as there are dirtiers
1846                  * waiting for space.  as they're waiting, they're not going to
1847                  * create more pages to coallesce with what's waiting.. */
1848                 if (!list_empty(&cli->cl_cache_waiters)) {
1849                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1850                         RETURN(1);
1851                 }
1852                 /* +16 to avoid triggering rpcs that would want to include pages
1853                  * that are being queued but which can't be made ready until
1854                  * the queuer finishes with the page. this is a wart for
1855                  * llite::commit_write() */
1856                 optimal += 16;
1857         }
1858         if (lop->lop_num_pending >= optimal)
1859                 RETURN(1);
1860
1861         RETURN(0);
1862 }
1863
1864 static void on_list(struct list_head *item, struct list_head *list,
1865                     int should_be_on)
1866 {
1867         if (list_empty(item) && should_be_on)
1868                 list_add_tail(item, list);
1869         else if (!list_empty(item) && !should_be_on)
1870                 list_del_init(item);
1871 }
1872
1873 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1874  * can find pages to build into rpcs quickly */
1875 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1876 {
1877         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1878                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1879                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1880
1881         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1882                 loi->loi_write_lop.lop_num_pending);
1883
1884         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1885                 loi->loi_read_lop.lop_num_pending);
1886 }
1887
1888 static void lop_update_pending(struct client_obd *cli,
1889                                struct loi_oap_pages *lop, int cmd, int delta)
1890 {
1891         lop->lop_num_pending += delta;
1892         if (cmd & OBD_BRW_WRITE)
1893                 cli->cl_pending_w_pages += delta;
1894         else
1895                 cli->cl_pending_r_pages += delta;
1896 }
1897
1898 /* this is called when a sync waiter receives an interruption.  Its job is to
1899  * get the caller woken as soon as possible.  If its page hasn't been put in an
1900  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1901  * desiring interruption which will forcefully complete the rpc once the rpc
1902  * has timed out */
1903 static void osc_occ_interrupted(struct oig_callback_context *occ)
1904 {
1905         struct osc_async_page *oap;
1906         struct loi_oap_pages *lop;
1907         struct lov_oinfo *loi;
1908         ENTRY;
1909
1910         /* XXX member_of() */
1911         oap = list_entry(occ, struct osc_async_page, oap_occ);
1912
1913         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1914
1915         oap->oap_interrupted = 1;
1916
1917         /* ok, it's been put in an rpc. only one oap gets a request reference */
1918         if (oap->oap_request != NULL) {
1919                 ptlrpc_mark_interrupted(oap->oap_request);
1920                 ptlrpcd_wake(oap->oap_request);
1921                 GOTO(unlock, 0);
1922         }
1923
1924         /* we don't get interruption callbacks until osc_trigger_group_io()
1925          * has been called and put the sync oaps in the pending/urgent lists.*/
1926         if (!list_empty(&oap->oap_pending_item)) {
1927                 list_del_init(&oap->oap_pending_item);
1928                 list_del_init(&oap->oap_urgent_item);
1929
1930                 loi = oap->oap_loi;
1931                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1932                         &loi->loi_write_lop : &loi->loi_read_lop;
1933                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1934                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1935
1936                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1937                 oap->oap_oig = NULL;
1938         }
1939
1940 unlock:
1941         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1942 }
1943
1944 /* this is trying to propogate async writeback errors back up to the
1945  * application.  As an async write fails we record the error code for later if
1946  * the app does an fsync.  As long as errors persist we force future rpcs to be
1947  * sync so that the app can get a sync error and break the cycle of queueing
1948  * pages for which writeback will fail. */
1949 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1950                            int rc)
1951 {
1952         if (rc) {
1953                 if (!ar->ar_rc)
1954                         ar->ar_rc = rc;
1955
1956                 ar->ar_force_sync = 1;
1957                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1958                 return;
1959
1960         }
1961
1962         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1963                 ar->ar_force_sync = 0;
1964 }
1965
1966 static void osc_oap_to_pending(struct osc_async_page *oap)
1967 {
1968         struct loi_oap_pages *lop;
1969
1970         if (oap->oap_cmd & OBD_BRW_WRITE)
1971                 lop = &oap->oap_loi->loi_write_lop;
1972         else
1973                 lop = &oap->oap_loi->loi_read_lop;
1974
1975         if (oap->oap_async_flags & ASYNC_URGENT)
1976                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1977         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1978         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1979 }
1980
1981 /* this must be called holding the loi list lock to give coverage to exit_cache,
1982  * async_flag maintenance, and oap_request */
1983 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1984                               struct osc_async_page *oap, int sent, int rc)
1985 {
1986         __u64 xid = 0;
1987
1988         ENTRY;
1989         if (oap->oap_request != NULL) {
1990                 xid = ptlrpc_req_xid(oap->oap_request);
1991                 ptlrpc_req_finished(oap->oap_request);
1992                 oap->oap_request = NULL;
1993         }
1994
1995         oap->oap_async_flags = 0;
1996         oap->oap_interrupted = 0;
1997
1998         if (oap->oap_cmd & OBD_BRW_WRITE) {
1999                 osc_process_ar(&cli->cl_ar, xid, rc);
2000                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2001         }
2002
2003         if (rc == 0 && oa != NULL) {
2004                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2005                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2006                 if (oa->o_valid & OBD_MD_FLMTIME)
2007                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2008                 if (oa->o_valid & OBD_MD_FLATIME)
2009                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2010                 if (oa->o_valid & OBD_MD_FLCTIME)
2011                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2012         }
2013
2014         if (oap->oap_oig) {
2015                 osc_exit_cache(cli, oap, sent);
2016                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2017                 oap->oap_oig = NULL;
2018                 EXIT;
2019                 return;
2020         }
2021
2022         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2023                                                 oap->oap_cmd, oa, rc);
2024
2025         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2026          * I/O on the page could start, but OSC calls it under lock
2027          * and thus we can add oap back to pending safely */
2028         if (rc)
2029                 /* upper layer wants to leave the page on pending queue */
2030                 osc_oap_to_pending(oap);
2031         else
2032                 osc_exit_cache(cli, oap, sent);
2033         EXIT;
2034 }
2035
2036 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2037 {
2038         struct osc_brw_async_args *aa = data;
2039         struct client_obd *cli;
2040         ENTRY;
2041
2042         rc = osc_brw_fini_request(req, rc);
2043         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2044         if (osc_recoverable_error(rc)) {
2045                 rc = osc_brw_redo_request(req, aa);
2046                 if (rc == 0)
2047                         RETURN(0);
2048         }
2049
2050         cli = aa->aa_cli;
2051
2052         client_obd_list_lock(&cli->cl_loi_list_lock);
2053
2054         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2055          * is called so we know whether to go to sync BRWs or wait for more
2056          * RPCs to complete */
2057         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2058                 cli->cl_w_in_flight--;
2059         else
2060                 cli->cl_r_in_flight--;
2061
2062         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2063                 struct osc_async_page *oap, *tmp;
2064                 /* the caller may re-use the oap after the completion call so
2065                  * we need to clean it up a little */
2066                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2067                         list_del_init(&oap->oap_rpc_item);
2068                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2069                 }
2070                 OBDO_FREE(aa->aa_oa);
2071         } else { /* from async_internal() */
2072                 int i;
2073                 for (i = 0; i < aa->aa_page_count; i++)
2074                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2075         }
2076         osc_wake_cache_waiters(cli);
2077         osc_check_rpcs(cli);
2078         client_obd_list_unlock(&cli->cl_loi_list_lock);
2079
2080         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2081         RETURN(rc);
2082 }
2083
2084 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2085                                             struct list_head *rpc_list,
2086                                             int page_count, int cmd)
2087 {
2088         struct ptlrpc_request *req;
2089         struct brw_page **pga = NULL;
2090         struct osc_brw_async_args *aa;
2091         struct obdo *oa = NULL;
2092         struct obd_async_page_ops *ops = NULL;
2093         void *caller_data = NULL;
2094         struct obd_capa *ocapa;
2095         struct osc_async_page *oap;
2096         int i, rc;
2097
2098         ENTRY;
2099         LASSERT(!list_empty(rpc_list));
2100
2101         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2102         if (pga == NULL)
2103                 RETURN(ERR_PTR(-ENOMEM));
2104
2105         OBDO_ALLOC(oa);
2106         if (oa == NULL)
2107                 GOTO(out, req = ERR_PTR(-ENOMEM));
2108
2109         i = 0;
2110         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2111                 if (ops == NULL) {
2112                         ops = oap->oap_caller_ops;
2113                         caller_data = oap->oap_caller_data;
2114                 }
2115                 pga[i] = &oap->oap_brw_page;
2116                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2117                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2118                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2119                 i++;
2120         }
2121
2122         /* always get the data for the obdo for the rpc */
2123         LASSERT(ops != NULL);
2124         ops->ap_fill_obdo(caller_data, cmd, oa);
2125         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2126
2127         sort_brw_pages(pga, page_count);
2128         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2129                                   pga, &req, ocapa);
2130         capa_put(ocapa);
2131         if (rc != 0) {
2132                 CERROR("prep_req failed: %d\n", rc);
2133                 GOTO(out, req = ERR_PTR(rc));
2134         }
2135
2136         /* Need to update the timestamps after the request is built in case
2137          * we race with setattr (locally or in queue at OST).  If OST gets
2138          * later setattr before earlier BRW (as determined by the request xid),
2139          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2140          * way to do this in a single call.  bug 10150 */
2141         ops->ap_update_obdo(caller_data, cmd, oa,
2142                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2143
2144         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2145         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2146         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2147         list_splice(rpc_list, &aa->aa_oaps);
2148         CFS_INIT_LIST_HEAD(rpc_list);
2149
2150 out:
2151         if (IS_ERR(req)) {
2152                 if (oa)
2153                         OBDO_FREE(oa);
2154                 if (pga)
2155                         OBD_FREE(pga, sizeof(*pga) * page_count);
2156         }
2157         RETURN(req);
2158 }
2159
2160 /* the loi lock is held across this function but it's allowed to release
2161  * and reacquire it during its work */
2162 /**
2163  * prepare pages for ASYNC io and put pages in send queue.
2164  *
2165  * \param cli -
2166  * \param loi -
2167  * \param cmd - OBD_BRW_* macroses
2168  * \param lop - pending pages
2169  *
2170  * \return zero if pages successfully add to send queue.
2171  * \return not zere if error occurring.
2172  */
2173 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2174                             int cmd, struct loi_oap_pages *lop)
2175 {
2176         struct ptlrpc_request *req;
2177         obd_count page_count = 0;
2178         struct osc_async_page *oap = NULL, *tmp;
2179         struct osc_brw_async_args *aa;
2180         struct obd_async_page_ops *ops;
2181         CFS_LIST_HEAD(rpc_list);
2182         unsigned int ending_offset;
2183         unsigned  starting_offset = 0;
2184         int srvlock = 0;
2185         ENTRY;
2186
2187         /* first we find the pages we're allowed to work with */
2188         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2189                                  oap_pending_item) {
2190                 ops = oap->oap_caller_ops;
2191
2192                 LASSERT(oap->oap_magic == OAP_MAGIC);
2193
2194                 if (page_count != 0 &&
2195                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2196                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2197                                " oap %p, page %p, srvlock %u\n",
2198                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2199                         break;
2200                 }
2201                 /* in llite being 'ready' equates to the page being locked
2202                  * until completion unlocks it.  commit_write submits a page
2203                  * as not ready because its unlock will happen unconditionally
2204                  * as the call returns.  if we race with commit_write giving
2205                  * us that page we dont' want to create a hole in the page
2206                  * stream, so we stop and leave the rpc to be fired by
2207                  * another dirtier or kupdated interval (the not ready page
2208                  * will still be on the dirty list).  we could call in
2209                  * at the end of ll_file_write to process the queue again. */
2210                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2211                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2212                         if (rc < 0)
2213                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2214                                                 "instead of ready\n", oap,
2215                                                 oap->oap_page, rc);
2216                         switch (rc) {
2217                         case -EAGAIN:
2218                                 /* llite is telling us that the page is still
2219                                  * in commit_write and that we should try
2220                                  * and put it in an rpc again later.  we
2221                                  * break out of the loop so we don't create
2222                                  * a hole in the sequence of pages in the rpc
2223                                  * stream.*/
2224                                 oap = NULL;
2225                                 break;
2226                         case -EINTR:
2227                                 /* the io isn't needed.. tell the checks
2228                                  * below to complete the rpc with EINTR */
2229                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2230                                 oap->oap_count = -EINTR;
2231                                 break;
2232                         case 0:
2233                                 oap->oap_async_flags |= ASYNC_READY;
2234                                 break;
2235                         default:
2236                                 LASSERTF(0, "oap %p page %p returned %d "
2237                                             "from make_ready\n", oap,
2238                                             oap->oap_page, rc);
2239                                 break;
2240                         }
2241                 }
2242                 if (oap == NULL)
2243                         break;
2244                 /*
2245                  * Page submitted for IO has to be locked. Either by
2246                  * ->ap_make_ready() or by higher layers.
2247                  */
2248 #if defined(__KERNEL__) && defined(__linux__)
2249                  if(!(PageLocked(oap->oap_page) &&
2250                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2251                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2252                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2253                         LBUG();
2254                 }
2255 #endif
2256                 /* If there is a gap at the start of this page, it can't merge
2257                  * with any previous page, so we'll hand the network a
2258                  * "fragmented" page array that it can't transfer in 1 RDMA */
2259                 if (page_count != 0 && oap->oap_page_off != 0)
2260                         break;
2261
2262                 /* take the page out of our book-keeping */
2263                 list_del_init(&oap->oap_pending_item);
2264                 lop_update_pending(cli, lop, cmd, -1);
2265                 list_del_init(&oap->oap_urgent_item);
2266
2267                 if (page_count == 0)
2268                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2269                                           (PTLRPC_MAX_BRW_SIZE - 1);
2270
2271                 /* ask the caller for the size of the io as the rpc leaves. */
2272                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2273                         oap->oap_count =
2274                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2275                 if (oap->oap_count <= 0) {
2276                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2277                                oap->oap_count);
2278                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2279                         continue;
2280                 }
2281
2282                 /* now put the page back in our accounting */
2283                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2284                 if (page_count == 0)
2285                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2286                 if (++page_count >= cli->cl_max_pages_per_rpc)
2287                         break;
2288
2289                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2290                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2291                  * have the same alignment as the initial writes that allocated
2292                  * extents on the server. */
2293                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2294                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2295                 if (ending_offset == 0)
2296                         break;
2297
2298                 /* If there is a gap at the end of this page, it can't merge
2299                  * with any subsequent pages, so we'll hand the network a
2300                  * "fragmented" page array that it can't transfer in 1 RDMA */
2301                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2302                         break;
2303         }
2304
2305         osc_wake_cache_waiters(cli);
2306
2307         if (page_count == 0)
2308                 RETURN(0);
2309
2310         loi_list_maint(cli, loi);
2311
2312         client_obd_list_unlock(&cli->cl_loi_list_lock);
2313
2314         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2315         if (IS_ERR(req)) {
2316                 /* this should happen rarely and is pretty bad, it makes the
2317                  * pending list not follow the dirty order */
2318                 client_obd_list_lock(&cli->cl_loi_list_lock);
2319                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2320                         list_del_init(&oap->oap_rpc_item);
2321
2322                         /* queued sync pages can be torn down while the pages
2323                          * were between the pending list and the rpc */
2324                         if (oap->oap_interrupted) {
2325                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2326                                 osc_ap_completion(cli, NULL, oap, 0,
2327                                                   oap->oap_count);
2328                                 continue;
2329                         }
2330                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2331                 }
2332                 loi_list_maint(cli, loi);
2333                 RETURN(PTR_ERR(req));
2334         }
2335
2336         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2337
2338         if (cmd == OBD_BRW_READ) {
2339                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2340                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2341                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2342                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2343                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2344         } else {
2345                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2346                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2347                                  cli->cl_w_in_flight);
2348                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2349                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2350                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2351         }
2352
2353         client_obd_list_lock(&cli->cl_loi_list_lock);
2354
2355         if (cmd == OBD_BRW_READ)
2356                 cli->cl_r_in_flight++;
2357         else
2358                 cli->cl_w_in_flight++;
2359
2360         /* queued sync pages can be torn down while the pages
2361          * were between the pending list and the rpc */
2362         tmp = NULL;
2363         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2364                 /* only one oap gets a request reference */
2365                 if (tmp == NULL)
2366                         tmp = oap;
2367                 if (oap->oap_interrupted && !req->rq_intr) {
2368                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2369                                oap, req);
2370                         ptlrpc_mark_interrupted(req);
2371                 }
2372         }
2373         if (tmp != NULL)
2374                 tmp->oap_request = ptlrpc_request_addref(req);
2375
2376         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2377                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2378
2379         req->rq_interpret_reply = brw_interpret;
2380         ptlrpcd_add_req(req);
2381         RETURN(1);
2382 }
2383
2384 #define LOI_DEBUG(LOI, STR, args...)                                     \
2385         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2386                !list_empty(&(LOI)->loi_cli_item),                        \
2387                (LOI)->loi_write_lop.lop_num_pending,                     \
2388                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2389                (LOI)->loi_read_lop.lop_num_pending,                      \
2390                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2391                args)                                                     \
2392
2393 /* This is called by osc_check_rpcs() to find which objects have pages that
2394  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2395 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2396 {
2397         ENTRY;
2398         /* first return all objects which we already know to have
2399          * pages ready to be stuffed into rpcs */
2400         if (!list_empty(&cli->cl_loi_ready_list))
2401                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2402                                   struct lov_oinfo, loi_cli_item));
2403
2404         /* then if we have cache waiters, return all objects with queued
2405          * writes.  This is especially important when many small files
2406          * have filled up the cache and not been fired into rpcs because
2407          * they don't pass the nr_pending/object threshhold */
2408         if (!list_empty(&cli->cl_cache_waiters) &&
2409             !list_empty(&cli->cl_loi_write_list))
2410                 RETURN(list_entry(cli->cl_loi_write_list.next,
2411                                   struct lov_oinfo, loi_write_item));
2412
2413         /* then return all queued objects when we have an invalid import
2414          * so that they get flushed */
2415         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2416                 if (!list_empty(&cli->cl_loi_write_list))
2417                         RETURN(list_entry(cli->cl_loi_write_list.next,
2418                                           struct lov_oinfo, loi_write_item));
2419                 if (!list_empty(&cli->cl_loi_read_list))
2420                         RETURN(list_entry(cli->cl_loi_read_list.next,
2421                                           struct lov_oinfo, loi_read_item));
2422         }
2423         RETURN(NULL);
2424 }
2425
2426 /* called with the loi list lock held */
2427 static void osc_check_rpcs(struct client_obd *cli)
2428 {
2429         struct lov_oinfo *loi;
2430         int rc = 0, race_counter = 0;
2431         ENTRY;
2432
2433         while ((loi = osc_next_loi(cli)) != NULL) {
2434                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2435
2436                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2437                         break;
2438
2439                 /* attempt some read/write balancing by alternating between
2440                  * reads and writes in an object.  The makes_rpc checks here
2441                  * would be redundant if we were getting read/write work items
2442                  * instead of objects.  we don't want send_oap_rpc to drain a
2443                  * partial read pending queue when we're given this object to
2444                  * do io on writes while there are cache waiters */
2445                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2446                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2447                                               &loi->loi_write_lop);
2448                         if (rc < 0)
2449                                 break;
2450                         if (rc > 0)
2451                                 race_counter = 0;
2452                         else
2453                                 race_counter++;
2454                 }
2455                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2456                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2457                                               &loi->loi_read_lop);
2458                         if (rc < 0)
2459                                 break;
2460                         if (rc > 0)
2461                                 race_counter = 0;
2462                         else
2463                                 race_counter++;
2464                 }
2465
2466                 /* attempt some inter-object balancing by issueing rpcs
2467                  * for each object in turn */
2468                 if (!list_empty(&loi->loi_cli_item))
2469                         list_del_init(&loi->loi_cli_item);
2470                 if (!list_empty(&loi->loi_write_item))
2471                         list_del_init(&loi->loi_write_item);
2472                 if (!list_empty(&loi->loi_read_item))
2473                         list_del_init(&loi->loi_read_item);
2474
2475                 loi_list_maint(cli, loi);
2476
2477                 /* send_oap_rpc fails with 0 when make_ready tells it to
2478                  * back off.  llite's make_ready does this when it tries
2479                  * to lock a page queued for write that is already locked.
2480                  * we want to try sending rpcs from many objects, but we
2481                  * don't want to spin failing with 0.  */
2482                 if (race_counter == 10)
2483                         break;
2484         }
2485         EXIT;
2486 }
2487
2488 /* we're trying to queue a page in the osc so we're subject to the
2489  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2490  * If the osc's queued pages are already at that limit, then we want to sleep
2491  * until there is space in the osc's queue for us.  We also may be waiting for
2492  * write credits from the OST if there are RPCs in flight that may return some
2493  * before we fall back to sync writes.
2494  *
2495  * We need this know our allocation was granted in the presence of signals */
2496 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2497 {
2498         int rc;
2499         ENTRY;
2500         client_obd_list_lock(&cli->cl_loi_list_lock);
2501         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2502         client_obd_list_unlock(&cli->cl_loi_list_lock);
2503         RETURN(rc);
2504 };
2505
2506 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2507  * grant or cache space. */
2508 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2509                            struct osc_async_page *oap)
2510 {
2511         struct osc_cache_waiter ocw;
2512         struct l_wait_info lwi = { 0 };
2513
2514         ENTRY;
2515
2516         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2517                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2518                cli->cl_dirty_max, obd_max_dirty_pages,
2519                cli->cl_lost_grant, cli->cl_avail_grant);
2520
2521         /* force the caller to try sync io.  this can jump the list
2522          * of queued writes and create a discontiguous rpc stream */
2523         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2524             loi->loi_ar.ar_force_sync)
2525                 RETURN(-EDQUOT);
2526
2527         /* Hopefully normal case - cache space and write credits available */
2528         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2529             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2530             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2531                 /* account for ourselves */
2532                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2533                 RETURN(0);
2534         }
2535
2536         /* Make sure that there are write rpcs in flight to wait for.  This
2537          * is a little silly as this object may not have any pending but
2538          * other objects sure might. */
2539         if (cli->cl_w_in_flight) {
2540                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2541                 cfs_waitq_init(&ocw.ocw_waitq);
2542                 ocw.ocw_oap = oap;
2543                 ocw.ocw_rc = 0;
2544
2545                 loi_list_maint(cli, loi);
2546                 osc_check_rpcs(cli);
2547                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2548
2549                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2550                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2551
2552                 client_obd_list_lock(&cli->cl_loi_list_lock);
2553                 if (!list_empty(&ocw.ocw_entry)) {
2554                         list_del(&ocw.ocw_entry);
2555                         RETURN(-EINTR);
2556                 }
2557                 RETURN(ocw.ocw_rc);
2558         }
2559
2560         RETURN(-EDQUOT);
2561 }
2562
2563 /**
2564  * Checks if requested extent lock is compatible with a lock under the page.
2565  *
2566  * Checks if the lock under \a page is compatible with a read or write lock
2567  * (specified by \a rw) for an extent [\a start , \a end].
2568  *
2569  * \param exp osc export
2570  * \param lsm striping information for the file
2571  * \param res osc_async_page placeholder
2572  * \param rw OBD_BRW_READ if requested for reading,
2573  *           OBD_BRW_WRITE if requested for writing
2574  * \param start start of the requested extent
2575  * \param end end of the requested extent
2576  * \param cookie transparent parameter for passing locking context
2577  *
2578  * \post result == 1, *cookie == context, appropriate lock is referenced or
2579  * \post result == 0
2580  *
2581  * \retval 1 owned lock is reused for the request
2582  * \retval 0 no lock reused for the request
2583  *
2584  * \see osc_release_short_lock
2585  */
2586 static int osc_reget_short_lock(struct obd_export *exp,
2587                                 struct lov_stripe_md *lsm,
2588                                 void **res, int rw,
2589                                 obd_off start, obd_off end,
2590                                 void **cookie)
2591 {
2592         struct osc_async_page *oap = *res;
2593         int rc;
2594
2595         ENTRY;
2596
2597         spin_lock(&oap->oap_lock);
2598         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2599                                   start, end, cookie);
2600         spin_unlock(&oap->oap_lock);
2601
2602         RETURN(rc);
2603 }
2604
2605 /**
2606  * Releases a reference to a lock taken in a "fast" way.
2607  *
2608  * Releases a read or a write (specified by \a rw) lock
2609  * referenced by \a cookie.
2610  *
2611  * \param exp osc export
2612  * \param lsm striping information for the file
2613  * \param end end of the locked extent
2614  * \param rw OBD_BRW_READ if requested for reading,
2615  *           OBD_BRW_WRITE if requested for writing
2616  * \param cookie transparent parameter for passing locking context
2617  *
2618  * \post appropriate lock is dereferenced
2619  *
2620  * \see osc_reget_short_lock
2621  */
2622 static int osc_release_short_lock(struct obd_export *exp,
2623                                   struct lov_stripe_md *lsm, obd_off end,
2624                                   void *cookie, int rw)
2625 {
2626         ENTRY;
2627         ldlm_lock_fast_release(cookie, rw);
2628         /* no error could have happened at this layer */
2629         RETURN(0);
2630 }
2631
2632 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2633                         struct lov_oinfo *loi, cfs_page_t *page,
2634                         obd_off offset, struct obd_async_page_ops *ops,
2635                         void *data, void **res, int nocache,
2636                         struct lustre_handle *lockh)
2637 {
2638         struct osc_async_page *oap;
2639         struct ldlm_res_id oid = {{0}};
2640         int rc = 0;
2641         ENTRY;
2642
2643         if (!page)
2644                 return size_round(sizeof(*oap));
2645
2646         oap = *res;
2647         oap->oap_magic = OAP_MAGIC;
2648         oap->oap_cli = &exp->exp_obd->u.cli;
2649         oap->oap_loi = loi;
2650
2651         oap->oap_caller_ops = ops;
2652         oap->oap_caller_data = data;
2653
2654         oap->oap_page = page;
2655         oap->oap_obj_off = offset;
2656
2657         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2658         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2659         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2660         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2661
2662         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2663
2664         spin_lock_init(&oap->oap_lock);
2665
2666         /* If the page was marked as notcacheable - don't add to any locks */ 
2667         if (!nocache) {
2668                 oid.name[0] = loi->loi_id;
2669                 oid.name[2] = loi->loi_gr;
2670                 /* This is the only place where we can call cache_add_extent
2671                    without oap_lock, because this page is locked now, and
2672                    the lock we are adding it to is referenced, so cannot lose
2673                    any pages either. */
2674                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2675                 if (rc)
2676                         RETURN(rc);
2677         }
2678
2679         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2680         RETURN(0);
2681 }
2682
2683 struct osc_async_page *oap_from_cookie(void *cookie)
2684 {
2685         struct osc_async_page *oap = cookie;
2686         if (oap->oap_magic != OAP_MAGIC)
2687                 return ERR_PTR(-EINVAL);
2688         return oap;
2689 };
2690
2691 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2692                               struct lov_oinfo *loi, void *cookie,
2693                               int cmd, obd_off off, int count,
2694                               obd_flag brw_flags, enum async_flags async_flags)
2695 {
2696         struct client_obd *cli = &exp->exp_obd->u.cli;
2697         struct osc_async_page *oap;
2698         int rc = 0;
2699         ENTRY;
2700
2701         oap = oap_from_cookie(cookie);
2702         if (IS_ERR(oap))
2703                 RETURN(PTR_ERR(oap));
2704
2705         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2706                 RETURN(-EIO);
2707
2708         if (!list_empty(&oap->oap_pending_item) ||
2709             !list_empty(&oap->oap_urgent_item) ||
2710             !list_empty(&oap->oap_rpc_item))
2711                 RETURN(-EBUSY);
2712
2713         /* check if the file's owner/group is over quota */
2714 #ifdef HAVE_QUOTA_SUPPORT
2715         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2716                 struct obd_async_page_ops *ops;
2717                 struct obdo *oa;
2718
2719                 OBDO_ALLOC(oa);
2720                 if (oa == NULL)
2721                         RETURN(-ENOMEM);
2722
2723                 ops = oap->oap_caller_ops;
2724                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2725                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2726                     NO_QUOTA)
2727                         rc = -EDQUOT;
2728
2729                 OBDO_FREE(oa);
2730                 if (rc)
2731                         RETURN(rc);
2732         }
2733 #endif
2734
2735         if (loi == NULL)
2736                 loi = lsm->lsm_oinfo[0];
2737
2738         client_obd_list_lock(&cli->cl_loi_list_lock);
2739
2740         oap->oap_cmd = cmd;
2741         oap->oap_page_off = off;
2742         oap->oap_count = count;
2743         oap->oap_brw_flags = brw_flags;
2744         oap->oap_async_flags = async_flags;
2745
2746         if (cmd & OBD_BRW_WRITE) {
2747                 rc = osc_enter_cache(cli, loi, oap);
2748                 if (rc) {
2749                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2750                         RETURN(rc);
2751                 }
2752         }
2753
2754         osc_oap_to_pending(oap);
2755         loi_list_maint(cli, loi);
2756
2757         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2758                   cmd);
2759
2760         osc_check_rpcs(cli);
2761         client_obd_list_unlock(&cli->cl_loi_list_lock);
2762
2763         RETURN(0);
2764 }
2765
2766 /* aka (~was & now & flag), but this is more clear :) */
2767 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2768
2769 static int osc_set_async_flags(struct obd_export *exp,
2770                                struct lov_stripe_md *lsm,
2771                                struct lov_oinfo *loi, void *cookie,
2772                                obd_flag async_flags)
2773 {
2774         struct client_obd *cli = &exp->exp_obd->u.cli;
2775         struct loi_oap_pages *lop;
2776         struct osc_async_page *oap;
2777         int rc = 0;
2778         ENTRY;
2779
2780         oap = oap_from_cookie(cookie);
2781         if (IS_ERR(oap))
2782                 RETURN(PTR_ERR(oap));
2783
2784         /*
2785          * bug 7311: OST-side locking is only supported for liblustre for now
2786          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2787          * implementation has to handle case where OST-locked page was picked
2788          * up by, e.g., ->writepage().
2789          */
2790         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2791         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2792                                      * tread here. */
2793
2794         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2795                 RETURN(-EIO);
2796
2797         if (loi == NULL)
2798                 loi = lsm->lsm_oinfo[0];
2799
2800         if (oap->oap_cmd & OBD_BRW_WRITE) {
2801                 lop = &loi->loi_write_lop;
2802         } else {
2803                 lop = &loi->loi_read_lop;
2804         }
2805
2806         client_obd_list_lock(&cli->cl_loi_list_lock);
2807
2808         if (list_empty(&oap->oap_pending_item))
2809                 GOTO(out, rc = -EINVAL);
2810
2811         if ((oap->oap_async_flags & async_flags) == async_flags)
2812                 GOTO(out, rc = 0);
2813
2814         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2815                 oap->oap_async_flags |= ASYNC_READY;
2816
2817         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2818                 if (list_empty(&oap->oap_rpc_item)) {
2819                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2820                         loi_list_maint(cli, loi);
2821                 }
2822         }
2823
2824         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2825                         oap->oap_async_flags);
2826 out:
2827         osc_check_rpcs(cli);
2828         client_obd_list_unlock(&cli->cl_loi_list_lock);
2829         RETURN(rc);
2830 }
2831
2832 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2833                              struct lov_oinfo *loi,
2834                              struct obd_io_group *oig, void *cookie,
2835                              int cmd, obd_off off, int count,
2836                              obd_flag brw_flags,
2837                              obd_flag async_flags)
2838 {
2839         struct client_obd *cli = &exp->exp_obd->u.cli;
2840         struct osc_async_page *oap;
2841         struct loi_oap_pages *lop;
2842         int rc = 0;
2843         ENTRY;
2844
2845         oap = oap_from_cookie(cookie);
2846         if (IS_ERR(oap))
2847                 RETURN(PTR_ERR(oap));
2848
2849         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2850                 RETURN(-EIO);
2851
2852         if (!list_empty(&oap->oap_pending_item) ||
2853             !list_empty(&oap->oap_urgent_item) ||
2854             !list_empty(&oap->oap_rpc_item))
2855                 RETURN(-EBUSY);
2856
2857         if (loi == NULL)
2858                 loi = lsm->lsm_oinfo[0];
2859
2860         client_obd_list_lock(&cli->cl_loi_list_lock);
2861
2862         oap->oap_cmd = cmd;
2863         oap->oap_page_off = off;
2864         oap->oap_count = count;
2865         oap->oap_brw_flags = brw_flags;
2866         oap->oap_async_flags = async_flags;
2867
2868         if (cmd & OBD_BRW_WRITE)
2869                 lop = &loi->loi_write_lop;
2870         else
2871                 lop = &loi->loi_read_lop;
2872
2873         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2874         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2875                 oap->oap_oig = oig;
2876                 rc = oig_add_one(oig, &oap->oap_occ);
2877         }
2878
2879         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2880                   oap, oap->oap_page, rc);
2881
2882         client_obd_list_unlock(&cli->cl_loi_list_lock);
2883
2884         RETURN(rc);
2885 }
2886
2887 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2888                                  struct loi_oap_pages *lop, int cmd)
2889 {
2890         struct list_head *pos, *tmp;
2891         struct osc_async_page *oap;
2892
2893         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2894                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2895                 list_del(&oap->oap_pending_item);
2896                 osc_oap_to_pending(oap);
2897         }
2898         loi_list_maint(cli, loi);
2899 }
2900
2901 static int osc_trigger_group_io(struct obd_export *exp,
2902                                 struct lov_stripe_md *lsm,
2903                                 struct lov_oinfo *loi,
2904                                 struct obd_io_group *oig)
2905 {
2906         struct client_obd *cli = &exp->exp_obd->u.cli;
2907         ENTRY;
2908
2909         if (loi == NULL)
2910                 loi = lsm->lsm_oinfo[0];
2911
2912         client_obd_list_lock(&cli->cl_loi_list_lock);
2913
2914         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2915         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2916
2917         osc_check_rpcs(cli);
2918         client_obd_list_unlock(&cli->cl_loi_list_lock);
2919
2920         RETURN(0);
2921 }
2922
2923 static int osc_teardown_async_page(struct obd_export *exp,
2924                                    struct lov_stripe_md *lsm,
2925                                    struct lov_oinfo *loi, void *cookie)
2926 {
2927         struct client_obd *cli = &exp->exp_obd->u.cli;
2928         struct loi_oap_pages *lop;
2929         struct osc_async_page *oap;
2930         int rc = 0;
2931         ENTRY;
2932
2933         oap = oap_from_cookie(cookie);
2934         if (IS_ERR(oap))
2935                 RETURN(PTR_ERR(oap));
2936
2937         if (loi == NULL)
2938                 loi = lsm->lsm_oinfo[0];
2939
2940         if (oap->oap_cmd & OBD_BRW_WRITE) {
2941                 lop = &loi->loi_write_lop;
2942         } else {
2943                 lop = &loi->loi_read_lop;
2944         }
2945
2946         client_obd_list_lock(&cli->cl_loi_list_lock);
2947
2948         if (!list_empty(&oap->oap_rpc_item))
2949                 GOTO(out, rc = -EBUSY);
2950
2951         osc_exit_cache(cli, oap, 0);
2952         osc_wake_cache_waiters(cli);
2953
2954         if (!list_empty(&oap->oap_urgent_item)) {
2955                 list_del_init(&oap->oap_urgent_item);
2956                 oap->oap_async_flags &= ~ASYNC_URGENT;
2957         }
2958         if (!list_empty(&oap->oap_pending_item)) {
2959                 list_del_init(&oap->oap_pending_item);
2960                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2961         }
2962         loi_list_maint(cli, loi);
2963         cache_remove_extent(cli->cl_cache, oap);
2964
2965         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2966 out:
2967         client_obd_list_unlock(&cli->cl_loi_list_lock);
2968         RETURN(rc);
2969 }
2970
2971 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2972                            struct ldlm_lock_desc *new, void *data,
2973                            int flag)
2974 {
2975         struct lustre_handle lockh = { 0 };
2976         int rc;
2977         ENTRY;  
2978                 
2979         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2980                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2981                 LBUG(); 
2982         }       
2983
2984         switch (flag) {
2985         case LDLM_CB_BLOCKING:
2986                 ldlm_lock2handle(lock, &lockh);
2987                 rc = ldlm_cli_cancel(&lockh);
2988                 if (rc != ELDLM_OK)
2989                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2990                 break;
2991         case LDLM_CB_CANCELING: {
2992
2993                 ldlm_lock2handle(lock, &lockh);
2994                 /* This lock wasn't granted, don't try to do anything */
2995                 if (lock->l_req_mode != lock->l_granted_mode)
2996                         RETURN(0);
2997
2998                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2999                                   &lockh);
3000
3001                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3002                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3003                                                           lock, new, data,flag);
3004                 break;
3005         }
3006         default:
3007                 LBUG();
3008         }
3009
3010         RETURN(0);
3011 }
3012 EXPORT_SYMBOL(osc_extent_blocking_cb);
3013
3014 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3015                                     int flags)
3016 {
3017         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3018
3019         if (lock == NULL) {
3020                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3021                 return;
3022         }
3023         lock_res_and_lock(lock);
3024 #if defined (__KERNEL__) && defined (__linux__)
3025         /* Liang XXX: Darwin and Winnt checking should be added */
3026         if (lock->l_ast_data && lock->l_ast_data != data) {
3027                 struct inode *new_inode = data;
3028                 struct inode *old_inode = lock->l_ast_data;
3029                 if (!(old_inode->i_state & I_FREEING))
3030                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3031                 LASSERTF(old_inode->i_state & I_FREEING,
3032                          "Found existing inode %p/%lu/%u state %lu in lock: "
3033                          "setting data to %p/%lu/%u\n", old_inode,
3034                          old_inode->i_ino, old_inode->i_generation,
3035                          old_inode->i_state,
3036                          new_inode, new_inode->i_ino, new_inode->i_generation);
3037         }
3038 #endif
3039         lock->l_ast_data = data;
3040         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3041         unlock_res_and_lock(lock);
3042         LDLM_LOCK_PUT(lock);
3043 }
3044
3045 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3046                              ldlm_iterator_t replace, void *data)
3047 {
3048         struct ldlm_res_id res_id = { .name = {0} };
3049         struct obd_device *obd = class_exp2obd(exp);
3050
3051         res_id.name[0] = lsm->lsm_object_id;
3052         res_id.name[2] = lsm->lsm_object_gr;
3053
3054         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3055         return 0;
3056 }
3057
3058 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3059                             struct obd_info *oinfo, int intent, int rc)
3060 {
3061         ENTRY;
3062
3063         if (intent) {
3064                 /* The request was created before ldlm_cli_enqueue call. */
3065                 if (rc == ELDLM_LOCK_ABORTED) {
3066                         struct ldlm_reply *rep;
3067                         rep = req_capsule_server_get(&req->rq_pill,
3068                                                      &RMF_DLM_REP);
3069
3070                         LASSERT(rep != NULL);
3071                         if (rep->lock_policy_res1)
3072                                 rc = rep->lock_policy_res1;
3073                 }
3074         }
3075
3076         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3077                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3078                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3079                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3080                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3081         }
3082
3083         if (!rc)
3084                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3085
3086         /* Call the update callback. */
3087         rc = oinfo->oi_cb_up(oinfo, rc);
3088         RETURN(rc);
3089 }
3090
3091 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3092                                  struct osc_enqueue_args *aa, int rc)
3093 {
3094         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3095         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3096         struct ldlm_lock *lock;
3097
3098         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3099          * be valid. */
3100         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3101
3102         /* Complete obtaining the lock procedure. */
3103         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3104                                    aa->oa_ei->ei_mode,
3105                                    &aa->oa_oi->oi_flags,
3106                                    &lsm->lsm_oinfo[0]->loi_lvb,
3107                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3108                                    lustre_swab_ost_lvb,
3109                                    aa->oa_oi->oi_lockh, rc);
3110
3111         /* Complete osc stuff. */
3112         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3113
3114         /* Release the lock for async request. */
3115         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3116                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3117
3118         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3119                  aa->oa_oi->oi_lockh, req, aa);
3120         LDLM_LOCK_PUT(lock);
3121         return rc;
3122 }
3123
3124 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3125  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3126  * other synchronous requests, however keeping some locks and trying to obtain
3127  * others may take a considerable amount of time in a case of ost failure; and
3128  * when other sync requests do not get released lock from a client, the client
3129  * is excluded from the cluster -- such scenarious make the life difficult, so
3130  * release locks just after they are obtained. */
3131 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3132                        struct ldlm_enqueue_info *einfo,
3133                        struct ptlrpc_request_set *rqset)
3134 {
3135         struct ldlm_res_id res_id = { .name = {0} };
3136         struct obd_device *obd = exp->exp_obd;
3137         struct ptlrpc_request *req = NULL;
3138         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3139         ldlm_mode_t mode;
3140         int rc;
3141         ENTRY;
3142
3143         res_id.name[0] = oinfo->oi_md->lsm_object_id;
3144         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3145
3146         /* Filesystem lock extents are extended to page boundaries so that
3147          * dealing with the page cache is a little smoother.  */
3148         oinfo->oi_policy.l_extent.start -=
3149                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3150         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3151
3152         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3153                 goto no_match;
3154
3155         /* Next, search for already existing extent locks that will cover us */
3156         /* If we're trying to read, we also search for an existing PW lock.  The
3157          * VFS and page cache already protect us locally, so lots of readers/
3158          * writers can share a single PW lock.
3159          *
3160          * There are problems with conversion deadlocks, so instead of
3161          * converting a read lock to a write lock, we'll just enqueue a new
3162          * one.
3163          *
3164          * At some point we should cancel the read lock instead of making them
3165          * send us a blocking callback, but there are problems with canceling
3166          * locks out from other users right now, too. */
3167         mode = einfo->ei_mode;
3168         if (einfo->ei_mode == LCK_PR)
3169                 mode |= LCK_PW;
3170         mode = ldlm_lock_match(obd->obd_namespace,
3171                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3172                                einfo->ei_type, &oinfo->oi_policy, mode,
3173                                oinfo->oi_lockh);
3174         if (mode) {
3175                 /* addref the lock only if not async requests and PW lock is
3176                  * matched whereas we asked for PR. */
3177                 if (!rqset && einfo->ei_mode != mode)
3178                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3179                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3180                                         oinfo->oi_flags);
3181                 if (intent) {
3182                         /* I would like to be able to ASSERT here that rss <=
3183                          * kms, but I can't, for reasons which are explained in
3184                          * lov_enqueue() */
3185                 }
3186
3187                 /* We already have a lock, and it's referenced */
3188                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3189
3190                 /* For async requests, decref the lock. */
3191                 if (einfo->ei_mode != mode)
3192                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3193                 else if (rqset)
3194                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3195
3196                 RETURN(ELDLM_OK);
3197         }
3198
3199  no_match:
3200         if (intent) {
3201                 CFS_LIST_HEAD(cancels);
3202                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3203                                            &RQF_LDLM_ENQUEUE_LVB);
3204                 if (req == NULL)
3205                         RETURN(-ENOMEM);
3206
3207                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3208                 if (rc)
3209                         RETURN(rc);
3210
3211                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3212                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3213                 ptlrpc_request_set_replen(req);
3214         }
3215
3216         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3217         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3218
3219         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3220                               &oinfo->oi_policy, &oinfo->oi_flags,
3221                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3222                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3223                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3224                               rqset ? 1 : 0);
3225         if (rqset) {
3226                 if (!rc) {
3227                         struct osc_enqueue_args *aa;
3228                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3229                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3230                         aa->oa_oi = oinfo;
3231                         aa->oa_ei = einfo;
3232                         aa->oa_exp = exp;
3233
3234                         req->rq_interpret_reply = osc_enqueue_interpret;
3235                         ptlrpc_set_add_req(rqset, req);
3236                 } else if (intent) {
3237                         ptlrpc_req_finished(req);
3238                 }
3239                 RETURN(rc);
3240         }
3241
3242         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3243         if (intent)
3244                 ptlrpc_req_finished(req);
3245
3246         RETURN(rc);
3247 }
3248
3249 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3250                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3251                      int *flags, void *data, struct lustre_handle *lockh)
3252 {
3253         struct ldlm_res_id res_id = { .name = {0} };
3254         struct obd_device *obd = exp->exp_obd;
3255         int lflags = *flags;
3256         ldlm_mode_t rc;
3257         ENTRY;
3258
3259         res_id.name[0] = lsm->lsm_object_id;
3260         res_id.name[2] = lsm->lsm_object_gr;
3261
3262         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3263                 RETURN(-EIO);
3264
3265         /* Filesystem lock extents are extended to page boundaries so that
3266          * dealing with the page cache is a little smoother */
3267         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3268         policy->l_extent.end |= ~CFS_PAGE_MASK;
3269
3270         /* Next, search for already existing extent locks that will cover us */
3271         /* If we're trying to read, we also search for an existing PW lock.  The
3272          * VFS and page cache already protect us locally, so lots of readers/
3273          * writers can share a single PW lock. */
3274         rc = mode;
3275         if (mode == LCK_PR)
3276                 rc |= LCK_PW;
3277         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3278                              &res_id, type, policy, rc, lockh);
3279         if (rc) {
3280                 osc_set_data_with_check(lockh, data, lflags);
3281                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3282                         ldlm_lock_addref(lockh, LCK_PR);
3283                         ldlm_lock_decref(lockh, LCK_PW);
3284                 }
3285                 RETURN(rc);
3286         }
3287         RETURN(rc);
3288 }
3289
3290 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3291                       __u32 mode, struct lustre_handle *lockh)
3292 {
3293         ENTRY;
3294
3295         if (unlikely(mode == LCK_GROUP))
3296                 ldlm_lock_decref_and_cancel(lockh, mode);
3297         else
3298                 ldlm_lock_decref(lockh, mode);
3299
3300         RETURN(0);
3301 }
3302
3303 static int osc_cancel_unused(struct obd_export *exp,
3304                              struct lov_stripe_md *lsm, int flags,
3305                              void *opaque)
3306 {
3307         struct obd_device *obd = class_exp2obd(exp);
3308         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3309
3310         if (lsm != NULL) {
3311                 res_id.name[0] = lsm->lsm_object_id;
3312                 res_id.name[2] = lsm->lsm_object_gr;
3313                 resp = &res_id;
3314         }
3315
3316         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3317 }
3318
3319 static int osc_join_lru(struct obd_export *exp,
3320                         struct lov_stripe_md *lsm, int join)
3321 {
3322         struct obd_device *obd = class_exp2obd(exp);
3323         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3324
3325         if (lsm != NULL) {
3326                 res_id.name[0] = lsm->lsm_object_id;
3327                 res_id.name[2] = lsm->lsm_object_gr;
3328                 resp = &res_id;
3329         }
3330
3331         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3332 }
3333
3334 static int osc_statfs_interpret(struct ptlrpc_request *req,
3335                                 struct osc_async_args *aa, int rc)
3336 {
3337         struct obd_statfs *msfs;
3338         ENTRY;
3339
3340         if (rc != 0)
3341                 GOTO(out, rc);
3342
3343         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3344         if (msfs == NULL) {
3345                 GOTO(out, rc = -EPROTO);
3346         }
3347
3348         *aa->aa_oi->oi_osfs = *msfs;
3349 out:
3350         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3351         RETURN(rc);
3352 }
3353
3354 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3355                             __u64 max_age, struct ptlrpc_request_set *rqset)
3356 {
3357         struct ptlrpc_request *req;
3358         struct osc_async_args *aa;
3359         int                    rc;
3360         ENTRY;
3361
3362         /* We could possibly pass max_age in the request (as an absolute
3363          * timestamp or a "seconds.usec ago") so the target can avoid doing
3364          * extra calls into the filesystem if that isn't necessary (e.g.
3365          * during mount that would help a bit).  Having relative timestamps
3366          * is not so great if request processing is slow, while absolute
3367          * timestamps are not ideal because they need time synchronization. */
3368         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3369         if (req == NULL)
3370                 RETURN(-ENOMEM);
3371
3372         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3373         if (rc) {
3374                 ptlrpc_request_free(req);
3375                 RETURN(rc);
3376         }
3377         ptlrpc_request_set_replen(req);
3378         req->rq_request_portal = OST_CREATE_PORTAL;
3379         ptlrpc_at_set_req_timeout(req);
3380
3381         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3382                 /* procfs requests not want stat in wait for avoid deadlock */
3383                 req->rq_no_resend = 1;
3384                 req->rq_no_delay = 1;
3385         }
3386
3387         req->rq_interpret_reply = osc_statfs_interpret;
3388         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3389         aa = (struct osc_async_args *)&req->rq_async_args;
3390         aa->aa_oi = oinfo;
3391
3392         ptlrpc_set_add_req(rqset, req);
3393         RETURN(0);
3394 }
3395
3396 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3397                       __u64 max_age, __u32 flags)
3398 {
3399         struct obd_statfs     *msfs;
3400         struct ptlrpc_request *req;
3401         struct obd_import     *imp = NULL;
3402         int rc;
3403         ENTRY;
3404
3405         /*Since the request might also come from lprocfs, so we need 
3406          *sync this with client_disconnect_export Bug15684*/
3407         down_read(&obd->u.cli.cl_sem);
3408         if (obd->u.cli.cl_import)
3409                 imp = class_import_get(obd->u.cli.cl_import);
3410         up_read(&obd->u.cli.cl_sem);
3411         if (!imp)
3412                 RETURN(-ENODEV);
3413         
3414         /* We could possibly pass max_age in the request (as an absolute
3415          * timestamp or a "seconds.usec ago") so the target can avoid doing
3416          * extra calls into the filesystem if that isn't necessary (e.g.
3417          * during mount that would help a bit).  Having relative timestamps
3418          * is not so great if request processing is slow, while absolute
3419          * timestamps are not ideal because they need time synchronization. */
3420         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3421         
3422         class_import_put(imp);
3423         
3424         if (req == NULL)
3425                 RETURN(-ENOMEM);
3426
3427         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3428         if (rc) {
3429                 ptlrpc_request_free(req);
3430                 RETURN(rc);
3431         }
3432         ptlrpc_request_set_replen(req);
3433         req->rq_request_portal = OST_CREATE_PORTAL;
3434         ptlrpc_at_set_req_timeout(req);
3435
3436         if (flags & OBD_STATFS_NODELAY) {
3437                 /* procfs requests not want stat in wait for avoid deadlock */
3438                 req->rq_no_resend = 1;
3439                 req->rq_no_delay = 1;
3440         }
3441
3442         rc = ptlrpc_queue_wait(req);
3443         if (rc)
3444                 GOTO(out, rc);
3445
3446         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3447         if (msfs == NULL) {
3448                 GOTO(out, rc = -EPROTO);
3449         }
3450
3451         *osfs = *msfs;
3452
3453         EXIT;
3454  out:
3455         ptlrpc_req_finished(req);
3456         return rc;
3457 }
3458
3459 /* Retrieve object striping information.
3460  *
3461  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3462  * the maximum number of OST indices which will fit in the user buffer.
3463  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3464  */
3465 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3466 {
3467         struct lov_user_md lum, *lumk;
3468         int rc = 0, lum_size;
3469         ENTRY;
3470
3471         if (!lsm)
3472                 RETURN(-ENODATA);
3473
3474         if (copy_from_user(&lum, lump, sizeof(lum)))
3475                 RETURN(-EFAULT);
3476
3477         if (lum.lmm_magic != LOV_USER_MAGIC)
3478                 RETURN(-EINVAL);
3479
3480         if (lum.lmm_stripe_count > 0) {
3481                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3482                 OBD_ALLOC(lumk, lum_size);
3483                 if (!lumk)
3484                         RETURN(-ENOMEM);
3485
3486                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3487                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3488         } else {
3489                 lum_size = sizeof(lum);
3490                 lumk = &lum;
3491         }
3492
3493         lumk->lmm_object_id = lsm->lsm_object_id;
3494         lumk->lmm_object_gr = lsm->lsm_object_gr;
3495         lumk->lmm_stripe_count = 1;
3496
3497         if (copy_to_user(lump, lumk, lum_size))
3498                 rc = -EFAULT;
3499
3500         if (lumk != &lum)
3501                 OBD_FREE(lumk, lum_size);
3502
3503         RETURN(rc);
3504 }
3505
3506
3507 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3508                          void *karg, void *uarg)
3509 {
3510         struct obd_device *obd = exp->exp_obd;
3511         struct obd_ioctl_data *data = karg;
3512         int err = 0;
3513         ENTRY;
3514
3515         if (!try_module_get(THIS_MODULE)) {
3516                 CERROR("Can't get module. Is it alive?");
3517                 return -EINVAL;
3518         }
3519         switch (cmd) {
3520         case OBD_IOC_LOV_GET_CONFIG: {
3521                 char *buf;
3522                 struct lov_desc *desc;
3523                 struct obd_uuid uuid;
3524
3525                 buf = NULL;
3526                 len = 0;
3527                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3528                         GOTO(out, err = -EINVAL);
3529
3530                 data = (struct obd_ioctl_data *)buf;
3531
3532                 if (sizeof(*desc) > data->ioc_inllen1) {
3533                         obd_ioctl_freedata(buf, len);
3534                         GOTO(out, err = -EINVAL);
3535                 }
3536
3537                 if (data->ioc_inllen2 < sizeof(uuid)) {
3538                         obd_ioctl_freedata(buf, len);
3539                         GOTO(out, err = -EINVAL);
3540                 }
3541
3542                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3543                 desc->ld_tgt_count = 1;
3544                 desc->ld_active_tgt_count = 1;
3545                 desc->ld_default_stripe_count = 1;
3546                 desc->ld_default_stripe_size = 0;
3547                 desc->ld_default_stripe_offset = 0;
3548                 desc->ld_pattern = 0;
3549                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3550
3551                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3552
3553                 err = copy_to_user((void *)uarg, buf, len);
3554                 if (err)
3555                         err = -EFAULT;
3556                 obd_ioctl_freedata(buf, len);
3557                 GOTO(out, err);
3558         }
3559         case LL_IOC_LOV_SETSTRIPE:
3560                 err = obd_alloc_memmd(exp, karg);
3561                 if (err > 0)
3562                         err = 0;
3563                 GOTO(out, err);
3564         case LL_IOC_LOV_GETSTRIPE:
3565                 err = osc_getstripe(karg, uarg);
3566                 GOTO(out, err);
3567         case OBD_IOC_CLIENT_RECOVER:
3568                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3569                                             data->ioc_inlbuf1);
3570                 if (err > 0)
3571                         err = 0;
3572                 GOTO(out, err);
3573         case IOC_OSC_SET_ACTIVE:
3574                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3575                                                data->ioc_offset);
3576                 GOTO(out, err);
3577         case OBD_IOC_POLL_QUOTACHECK:
3578                 err = lquota_poll_check(quota_interface, exp,
3579                                         (struct if_quotacheck *)karg);
3580                 GOTO(out, err);
3581         default:
3582                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3583                        cmd, cfs_curproc_comm());
3584                 GOTO(out, err = -ENOTTY);
3585         }
3586 out:
3587         module_put(THIS_MODULE);
3588         return err;
3589 }
3590
3591 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3592                         void *key, __u32 *vallen, void *val)
3593 {
3594         ENTRY;
3595         if (!vallen || !val)
3596                 RETURN(-EFAULT);
3597
3598         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3599                 __u32 *stripe = val;
3600                 *vallen = sizeof(*stripe);
3601                 *stripe = 0;
3602                 RETURN(0);
3603         } else if (KEY_IS(KEY_LAST_ID)) {
3604                 struct ptlrpc_request *req;
3605                 obd_id                *reply;
3606                 char                  *tmp;
3607                 int                    rc;
3608
3609                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3610                                            &RQF_OST_GET_INFO_LAST_ID);
3611                 if (req == NULL)
3612                         RETURN(-ENOMEM);
3613
3614                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3615                                      RCL_CLIENT, keylen);
3616                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3617                 if (rc) {
3618                         ptlrpc_request_free(req);
3619                         RETURN(rc);
3620                 }
3621
3622                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3623                 memcpy(tmp, key, keylen);
3624
3625                 ptlrpc_request_set_replen(req);
3626                 rc = ptlrpc_queue_wait(req);
3627                 if (rc)
3628                         GOTO(out, rc);
3629
3630                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3631                 if (reply == NULL)
3632                         GOTO(out, rc = -EPROTO);
3633
3634                 *((obd_id *)val) = *reply;
3635         out:
3636                 ptlrpc_req_finished(req);
3637                 RETURN(rc);
3638         }
3639         RETURN(-EINVAL);
3640 }
3641
3642 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3643                                           void *aa, int rc)
3644 {
3645         struct llog_ctxt *ctxt;
3646         struct obd_import *imp = req->rq_import;
3647         ENTRY;
3648
3649         if (rc != 0)
3650                 RETURN(rc);
3651
3652         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3653         if (ctxt) {
3654                 if (rc == 0)
3655                         rc = llog_initiator_connect(ctxt);
3656                 else
3657                         CERROR("cannot establish connection for "
3658                                "ctxt %p: %d\n", ctxt, rc);
3659         }
3660
3661         llog_ctxt_put(ctxt);
3662         spin_lock(&imp->imp_lock);
3663         imp->imp_server_timeout = 1;
3664         imp->imp_pingable = 1;
3665         spin_unlock(&imp->imp_lock);
3666         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3667
3668         RETURN(rc);
3669 }
3670
3671 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3672                               void *key, obd_count vallen, void *val,
3673                               struct ptlrpc_request_set *set)
3674 {
3675         struct ptlrpc_request *req;
3676         struct obd_device     *obd = exp->exp_obd;
3677         struct obd_import     *imp = class_exp2cliimp(exp);
3678         char                  *tmp;
3679         int                    rc;
3680         ENTRY;
3681
3682         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3683
3684         if (KEY_IS(KEY_NEXT_ID)) {
3685                 if (vallen != sizeof(obd_id))
3686                         RETURN(-ERANGE);
3687                 if (val == NULL)
3688                         RETURN(-EINVAL);
3689                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3690                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3691                        exp->exp_obd->obd_name,
3692                        obd->u.cli.cl_oscc.oscc_next_id);
3693
3694                 RETURN(0);
3695         }
3696