Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453         EXIT;
454 out_req:
455         ptlrpc_req_finished(req);
456 out:
457         if (rc && !*ea)
458                 obd_free_memmd(exp, &lsm);
459         return rc;
460 }
461
462 static int osc_punch_interpret(struct ptlrpc_request *req,
463                                struct osc_async_args *aa, int rc)
464 {
465         struct ost_body *body;
466         ENTRY;
467
468         if (rc != 0)
469                 GOTO(out, rc);
470
471         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
472                                   lustre_swab_ost_body);
473         if (body == NULL) {
474                 CERROR ("can't unpack ost_body\n");
475                 GOTO(out, rc = -EPROTO);
476         }
477
478         *aa->aa_oi->oi_oa = body->oa;
479 out:
480         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
481         RETURN(rc);
482 }
483
484 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
485                      struct obd_trans_info *oti,
486                      struct ptlrpc_request_set *rqset)
487 {
488         struct ptlrpc_request *req;
489         struct osc_async_args *aa;
490         struct ost_body *body;
491         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
492         ENTRY;
493
494         if (!oinfo->oi_oa) {
495                 CERROR("oa NULL\n");
496                 RETURN(-EINVAL);
497         }
498
499         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
500         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
501                               OST_PUNCH, 3, size, NULL);
502         if (!req)
503                 RETURN(-ENOMEM);
504
505         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
506
507         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
508         /* overload the size and blocks fields in the oa with start/end */
509         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
510         body->oa.o_size = oinfo->oi_policy.l_extent.start;
511         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
512         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
513
514         ptlrpc_req_set_repsize(req, 2, size);
515
516         req->rq_interpret_reply = osc_punch_interpret;
517         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
518         aa = (struct osc_async_args *)&req->rq_async_args;
519         aa->aa_oi = oinfo;
520         ptlrpc_set_add_req(rqset, req);
521
522         RETURN(0);
523 }
524
525 static int osc_sync(struct obd_export *exp, struct obdo *oa,
526                     struct lov_stripe_md *md, obd_size start, obd_size end,
527                     void *capa)
528 {
529         struct ptlrpc_request *req;
530         struct ost_body *body;
531         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
532         ENTRY;
533
534         if (!oa) {
535                 CERROR("oa NULL\n");
536                 RETURN(-EINVAL);
537         }
538
539         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
540
541         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
542                               OST_SYNC, 3, size, NULL);
543         if (!req)
544                 RETURN(-ENOMEM);
545
546         /* overload the size and blocks fields in the oa with start/end */
547         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
548         body->oa = *oa;
549         body->oa.o_size = start;
550         body->oa.o_blocks = end;
551         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552
553         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
554
555         ptlrpc_req_set_repsize(req, 2, size);
556
557         rc = ptlrpc_queue_wait(req);
558         if (rc)
559                 GOTO(out, rc);
560
561         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
562                                   lustre_swab_ost_body);
563         if (body == NULL) {
564                 CERROR ("can't unpack ost_body\n");
565                 GOTO (out, rc = -EPROTO);
566         }
567
568         *oa = body->oa;
569
570         EXIT;
571  out:
572         ptlrpc_req_finished(req);
573         return rc;
574 }
575
576 /* Destroy requests can be async always on the client, and we don't even really
577  * care about the return code since the client cannot do anything at all about
578  * a destroy failure.
579  * When the MDS is unlinking a filename, it saves the file objects into a
580  * recovery llog, and these object records are cancelled when the OST reports
581  * they were destroyed and sync'd to disk (i.e. transaction committed).
582  * If the client dies, or the OST is down when the object should be destroyed,
583  * the records are not cancelled, and when the OST reconnects to the MDS next,
584  * it will retrieve the llog unlink logs and then sends the log cancellation
585  * cookies to the MDS after committing destroy transactions. */
586 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
587                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
588                        struct obd_export *md_export)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body *body;
592         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
593         ENTRY;
594
595         if (!oa) {
596                 CERROR("oa NULL\n");
597                 RETURN(-EINVAL);
598         }
599
600         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
601                               OST_DESTROY, 2, size, NULL);
602         if (!req)
603                 RETURN(-ENOMEM);
604
605         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
606
607         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
608         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
609                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
610                        sizeof(*oti->oti_logcookies));
611         body->oa = *oa;
612
613         ptlrpc_req_set_repsize(req, 2, size);
614
615         ptlrpcd_add_req(req);
616         RETURN(0);
617 }
618
619 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
620                                 long writing_bytes)
621 {
622         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
623
624         LASSERT(!(oa->o_valid & bits));
625
626         oa->o_valid |= bits;
627         client_obd_list_lock(&cli->cl_loi_list_lock);
628         oa->o_dirty = cli->cl_dirty;
629         if (cli->cl_dirty > cli->cl_dirty_max) {
630                 CERROR("dirty %lu > dirty_max %lu\n",
631                        cli->cl_dirty, cli->cl_dirty_max);
632                 oa->o_undirty = 0;
633         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
634                 CERROR("dirty %d > system dirty_max %d\n",
635                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
636                 oa->o_undirty = 0;
637         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
638                 CERROR("dirty %lu - dirty_max %lu too big???\n",
639                        cli->cl_dirty, cli->cl_dirty_max);
640                 oa->o_undirty = 0;
641         } else {
642                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
643                                 (cli->cl_max_rpcs_in_flight + 1);
644                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
645         }
646         oa->o_grant = cli->cl_avail_grant;
647         oa->o_dropped = cli->cl_lost_grant;
648         cli->cl_lost_grant = 0;
649         client_obd_list_unlock(&cli->cl_loi_list_lock);
650         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
651                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
652 }
653
654 /* caller must hold loi_list_lock */
655 static void osc_consume_write_grant(struct client_obd *cli,
656                                     struct brw_page *pga)
657 {
658         atomic_inc(&obd_dirty_pages);
659         cli->cl_dirty += CFS_PAGE_SIZE;
660         cli->cl_avail_grant -= CFS_PAGE_SIZE;
661         pga->flag |= OBD_BRW_FROM_GRANT;
662         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
663                CFS_PAGE_SIZE, pga, pga->pg);
664         LASSERT(cli->cl_avail_grant >= 0);
665 }
666
667 /* the companion to osc_consume_write_grant, called when a brw has completed.
668  * must be called with the loi lock held. */
669 static void osc_release_write_grant(struct client_obd *cli,
670                                     struct brw_page *pga, int sent)
671 {
672         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
673         ENTRY;
674
675         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
676                 EXIT;
677                 return;
678         }
679
680         pga->flag &= ~OBD_BRW_FROM_GRANT;
681         atomic_dec(&obd_dirty_pages);
682         cli->cl_dirty -= CFS_PAGE_SIZE;
683         if (!sent) {
684                 cli->cl_lost_grant += CFS_PAGE_SIZE;
685                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
686                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
687         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
688                 /* For short writes we shouldn't count parts of pages that
689                  * span a whole block on the OST side, or our accounting goes
690                  * wrong.  Should match the code in filter_grant_check. */
691                 int offset = pga->off & ~CFS_PAGE_MASK;
692                 int count = pga->count + (offset & (blocksize - 1));
693                 int end = (offset + pga->count) & (blocksize - 1);
694                 if (end)
695                         count += blocksize - end;
696
697                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
698                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
699                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
700                        cli->cl_avail_grant, cli->cl_dirty);
701         }
702
703         EXIT;
704 }
705
706 static unsigned long rpcs_in_flight(struct client_obd *cli)
707 {
708         return cli->cl_r_in_flight + cli->cl_w_in_flight;
709 }
710
711 /* caller must hold loi_list_lock */
712 void osc_wake_cache_waiters(struct client_obd *cli)
713 {
714         struct list_head *l, *tmp;
715         struct osc_cache_waiter *ocw;
716
717         ENTRY;
718         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
719                 /* if we can't dirty more, we must wait until some is written */
720                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
721                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
722                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
723                                "osc max %ld, sys max %d\n", cli->cl_dirty,
724                                cli->cl_dirty_max, obd_max_dirty_pages);
725                         return;
726                 }
727
728                 /* if still dirty cache but no grant wait for pending RPCs that
729                  * may yet return us some grant before doing sync writes */
730                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
731                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
732                                cli->cl_w_in_flight);
733                         return;
734                 }
735
736                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
737                 list_del_init(&ocw->ocw_entry);
738                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
739                         /* no more RPCs in flight to return grant, do sync IO */
740                         ocw->ocw_rc = -EDQUOT;
741                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
742                 } else {
743                         osc_consume_write_grant(cli,
744                                                 &ocw->ocw_oap->oap_brw_page);
745                 }
746
747                 cfs_waitq_signal(&ocw->ocw_waitq);
748         }
749
750         EXIT;
751 }
752
753 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
754 {
755         client_obd_list_lock(&cli->cl_loi_list_lock);
756         cli->cl_avail_grant = ocd->ocd_grant;
757         client_obd_list_unlock(&cli->cl_loi_list_lock);
758
759         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
760                cli->cl_avail_grant, cli->cl_lost_grant);
761         LASSERT(cli->cl_avail_grant >= 0);
762 }
763
764 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
765 {
766         client_obd_list_lock(&cli->cl_loi_list_lock);
767         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
768         cli->cl_avail_grant += body->oa.o_grant;
769         /* waiters are woken in brw_interpret_oap */
770         client_obd_list_unlock(&cli->cl_loi_list_lock);
771 }
772
773 /* We assume that the reason this OSC got a short read is because it read
774  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
775  * via the LOV, and it _knows_ it's reading inside the file, it's just that
776  * this stripe never got written at or beyond this stripe offset yet. */
777 static void handle_short_read(int nob_read, obd_count page_count,
778                               struct brw_page **pga)
779 {
780         char *ptr;
781         int i = 0;
782
783         /* skip bytes read OK */
784         while (nob_read > 0) {
785                 LASSERT (page_count > 0);
786
787                 if (pga[i]->count > nob_read) {
788                         /* EOF inside this page */
789                         ptr = cfs_kmap(pga[i]->pg) +
790                                 (pga[i]->off & ~CFS_PAGE_MASK);
791                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
792                         cfs_kunmap(pga[i]->pg);
793                         page_count--;
794                         i++;
795                         break;
796                 }
797
798                 nob_read -= pga[i]->count;
799                 page_count--;
800                 i++;
801         }
802
803         /* zero remaining pages */
804         while (page_count-- > 0) {
805                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
806                 memset(ptr, 0, pga[i]->count);
807                 cfs_kunmap(pga[i]->pg);
808                 i++;
809         }
810 }
811
812 static int check_write_rcs(struct ptlrpc_request *req,
813                            int requested_nob, int niocount,
814                            obd_count page_count, struct brw_page **pga)
815 {
816         int    *remote_rcs, i;
817
818         /* return error if any niobuf was in error */
819         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
820                                         sizeof(*remote_rcs) * niocount, NULL);
821         if (remote_rcs == NULL) {
822                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
823                 return(-EPROTO);
824         }
825         if (lustre_msg_swabbed(req->rq_repmsg))
826                 for (i = 0; i < niocount; i++)
827                         __swab32s(&remote_rcs[i]);
828
829         for (i = 0; i < niocount; i++) {
830                 if (remote_rcs[i] < 0)
831                         return(remote_rcs[i]);
832
833                 if (remote_rcs[i] != 0) {
834                         CERROR("rc[%d] invalid (%d) req %p\n",
835                                 i, remote_rcs[i], req);
836                         return(-EPROTO);
837                 }
838         }
839
840         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
841                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
842                        requested_nob, req->rq_bulk->bd_nob_transferred);
843                 return(-EPROTO);
844         }
845
846         return (0);
847 }
848
849 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
850 {
851         if (p1->flag != p2->flag) {
852                 unsigned mask = ~OBD_BRW_FROM_GRANT;
853
854                 /* warn if we try to combine flags that we don't know to be
855                  * safe to combine */
856                 if ((p1->flag & mask) != (p2->flag & mask))
857                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
858                                "same brw?\n", p1->flag, p2->flag);
859                 return 0;
860         }
861
862         return (p1->off + p1->count == p2->off);
863 }
864
865 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
866                                    struct brw_page **pga)
867 {
868         __u32 cksum = ~0;
869         int i = 0;
870
871         LASSERT (pg_count > 0);
872         while (nob > 0 && pg_count > 0) {
873                 char *ptr = cfs_kmap(pga[i]->pg);
874                 int off = pga[i]->off & ~CFS_PAGE_MASK;
875                 int count = pga[i]->count > nob ? nob : pga[i]->count;
876
877                 /* corrupt the data before we compute the checksum, to
878                  * simulate an OST->client data error */
879                 if (i == 0 &&
880                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
881                         memcpy(ptr + off, "bad1", min(4, nob));
882                 cksum = crc32_le(cksum, ptr + off, count);
883                 cfs_kunmap(pga[i]->pg);
884                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
885                                off, cksum);
886
887                 nob -= pga[i]->count;
888                 pg_count--;
889                 i++;
890         }
891         /* For sending we only compute the wrong checksum instead
892          * of corrupting the data so it is still correct on a redo */
893         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
894                 cksum++;
895
896         return cksum;
897 }
898
899 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
900                                 struct lov_stripe_md *lsm, obd_count page_count,
901                                 struct brw_page **pga, 
902                                 struct ptlrpc_request **reqp,
903                                 struct obd_capa *ocapa)
904 {
905         struct ptlrpc_request   *req;
906         struct ptlrpc_bulk_desc *desc;
907         struct ost_body         *body;
908         struct obd_ioobj        *ioobj;
909         struct niobuf_remote    *niobuf;
910         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
911         int niocount, i, requested_nob, opc, rc;
912         struct ptlrpc_request_pool *pool;
913         struct lustre_capa      *capa;
914         struct osc_brw_async_args *aa;
915
916         ENTRY;
917         if ((cmd & OBD_BRW_WRITE) != 0) {
918                 opc = OST_WRITE;
919                 pool = cli->cl_import->imp_rq_pool;
920         } else {
921                 opc = OST_READ;
922                 pool = NULL;
923         }
924
925         for (niocount = i = 1; i < page_count; i++) {
926                 if (!can_merge_pages(pga[i - 1], pga[i]))
927                         niocount++;
928         }
929
930         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
931         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
932         if (ocapa)
933                 size[REQ_REC_OFF + 3] = sizeof(*capa);
934
935         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
936         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
937                                    size, NULL, pool, NULL);
938         if (req == NULL)
939                 RETURN (-ENOMEM);
940
941         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
942
943         if (opc == OST_WRITE)
944                 desc = ptlrpc_prep_bulk_imp (req, page_count,
945                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
946         else
947                 desc = ptlrpc_prep_bulk_imp (req, page_count,
948                                              BULK_PUT_SINK, OST_BULK_PORTAL);
949         if (desc == NULL)
950                 GOTO(out, rc = -ENOMEM);
951         /* NB request now owns desc and will free it when it gets freed */
952
953         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
954         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
955         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
956                                 niocount * sizeof(*niobuf));
957
958         body->oa = *oa;
959
960         obdo_to_ioobj(oa, ioobj);
961         ioobj->ioo_bufcnt = niocount;
962         if (ocapa) {
963                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
964                                       sizeof(*capa));
965                 capa_cpy(capa, ocapa);
966                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
967         }
968
969         LASSERT (page_count > 0);
970         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
971                 struct brw_page *pg = pga[i];
972                 struct brw_page *pg_prev = pga[i - 1];
973
974                 LASSERT(pg->count > 0);
975                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
976                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
977                          pg->off, pg->count);
978 #ifdef __LINUX__
979                 LASSERTF(i == 0 || pg->off > pg_prev->off,
980                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
981                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
982                          i, page_count,
983                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
984                          pg_prev->pg, page_private(pg_prev->pg),
985                          pg_prev->pg->index, pg_prev->off);
986 #else
987                 LASSERTF(i == 0 || pg->off > pg_prev->off,
988                          "i %d p_c %u\n", i, page_count);
989 #endif
990                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
991                         (pg->flag & OBD_BRW_SRVLOCK));
992
993                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
994                                       pg->count);
995                 requested_nob += pg->count;
996
997                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
998                         niobuf--;
999                         niobuf->len += pg->count;
1000                 } else {
1001                         niobuf->offset = pg->off;
1002                         niobuf->len    = pg->count;
1003                         niobuf->flags  = pg->flag;
1004                 }
1005         }
1006
1007         LASSERT((void *)(niobuf - niocount) ==
1008                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1009                                niocount * sizeof(*niobuf)));
1010         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1011
1012         /* size[REQ_REC_OFF] still sizeof (*body) */
1013         if (opc == OST_WRITE) {
1014                 if (unlikely(cli->cl_checksum)) {
1015                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1016                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1017                                                              page_count, pga);
1018                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1019                                body->oa.o_cksum);
1020                         /* save this in 'oa', too, for later checking */
1021                         oa->o_valid |= OBD_MD_FLCKSUM;
1022                 } else {
1023                         /* clear out the checksum flag, in case this is a
1024                          * resend but cl_checksum is no longer set. b=11238 */
1025                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1026                 }
1027                 oa->o_cksum = body->oa.o_cksum;
1028                 /* 1 RC per niobuf */
1029                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1030                 ptlrpc_req_set_repsize(req, 3, size);
1031         } else {
1032                 if (unlikely(cli->cl_checksum))
1033                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1034                 /* 1 RC for the whole I/O */
1035                 ptlrpc_req_set_repsize(req, 2, size);
1036         }
1037
1038         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1039         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1040         aa->aa_oa = oa;
1041         aa->aa_requested_nob = requested_nob;
1042         aa->aa_nio_count = niocount;
1043         aa->aa_page_count = page_count;
1044         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1045         aa->aa_ppga = pga;
1046         aa->aa_cli = cli;
1047         INIT_LIST_HEAD(&aa->aa_oaps);
1048
1049         *reqp = req;
1050         RETURN (0);
1051
1052  out:
1053         ptlrpc_req_finished (req);
1054         RETURN (rc);
1055 }
1056
1057 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1058                                 __u32 client_cksum, __u32 server_cksum,
1059                                 int nob, obd_count page_count,
1060                                 struct brw_page **pga)
1061 {
1062         __u32 new_cksum;
1063         char *msg;
1064
1065         if (server_cksum == client_cksum) {
1066                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1067                 return 0;
1068         }
1069
1070         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1071
1072         if (new_cksum == server_cksum)
1073                 msg = "changed on the client after we checksummed it - "
1074                       "likely false positive due to mmap IO (bug 11742)";
1075         else if (new_cksum == client_cksum)
1076                 msg = "changed in transit before arrival at OST";
1077         else
1078                 msg = "changed in transit AND doesn't match the original - "
1079                       "likely false positive due to mmap IO (bug 11742)";
1080
1081         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1082                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1083                            "["LPU64"-"LPU64"]\n",
1084                            msg, libcfs_nid2str(peer->nid),
1085                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1086                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1087                                                         (__u64)0,
1088                            oa->o_id,
1089                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1090                            pga[0]->off,
1091                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1092         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1093                client_cksum, server_cksum, new_cksum);
1094         return 1;        
1095 }
1096
1097 /* Note rc enters this function as number of bytes transferred */
1098 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1099 {
1100         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1101         const lnet_process_id_t *peer =
1102                         &req->rq_import->imp_connection->c_peer;
1103         struct client_obd *cli = aa->aa_cli;
1104         struct ost_body *body;
1105         __u32 client_cksum = 0;
1106         ENTRY;
1107
1108         if (rc < 0 && rc != -EDQUOT)
1109                 RETURN(rc);
1110
1111         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1112         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1113                                   lustre_swab_ost_body);
1114         if (body == NULL) {
1115                 CERROR ("Can't unpack body\n");
1116                 RETURN(-EPROTO);
1117         }
1118
1119         /* set/clear over quota flag for a uid/gid */
1120         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1121             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1122                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1123                              body->oa.o_gid, body->oa.o_valid,
1124                              body->oa.o_flags);
1125
1126         if (rc < 0)
1127                 RETURN(rc);
1128
1129         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1130                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1131
1132         osc_update_grant(cli, body);
1133
1134         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1135                 if (rc > 0) {
1136                         CERROR ("Unexpected +ve rc %d\n", rc);
1137                         RETURN(-EPROTO);
1138                 }
1139                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1140
1141                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1142                              client_cksum &&
1143                              check_write_checksum(&body->oa, peer, client_cksum,
1144                                                   body->oa.o_cksum,
1145                                                   aa->aa_requested_nob,
1146                                                   aa->aa_page_count,
1147                                                   aa->aa_ppga)))
1148                         RETURN(-EAGAIN);
1149
1150                 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
1151
1152                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1153                                      aa->aa_page_count, aa->aa_ppga);
1154                 GOTO(out, rc);
1155         }
1156
1157         /* The rest of this function executes only for OST_READs */
1158         if (rc > aa->aa_requested_nob) {
1159                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1160                        aa->aa_requested_nob);
1161                 RETURN(-EPROTO);
1162         }
1163
1164         if (rc != req->rq_bulk->bd_nob_transferred) {
1165                 CERROR ("Unexpected rc %d (%d transferred)\n",
1166                         rc, req->rq_bulk->bd_nob_transferred);
1167                 return (-EPROTO);
1168         }
1169
1170         if (rc < aa->aa_requested_nob)
1171                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1172
1173         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1174                 static int cksum_counter;
1175                 __u32      server_cksum = body->oa.o_cksum;
1176                 char      *via;
1177                 char      *router;
1178
1179                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1180                                                  aa->aa_ppga);
1181
1182                 if (peer->nid == req->rq_bulk->bd_sender) {
1183                         via = router = "";
1184                 } else {
1185                         via = " via ";
1186                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1187                 }
1188
1189                 if (server_cksum == ~0 && rc > 0) {
1190                         CERROR("Protocol error: server %s set the 'checksum' "
1191                                "bit, but didn't send a checksum.  Not fatal, "
1192                                "but please tell CFS.\n",
1193                                libcfs_nid2str(peer->nid));
1194                 } else if (server_cksum != client_cksum) {
1195                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1196                                            "%s%s%s inum "LPU64"/"LPU64" object "
1197                                            LPU64"/"LPU64" extent "
1198                                            "["LPU64"-"LPU64"]\n",
1199                                            req->rq_import->imp_obd->obd_name,
1200                                            libcfs_nid2str(peer->nid),
1201                                            via, router,
1202                                            body->oa.o_valid & OBD_MD_FLFID ?
1203                                                 body->oa.o_fid : (__u64)0,
1204                                            body->oa.o_valid & OBD_MD_FLFID ?
1205                                                 body->oa.o_generation :(__u64)0,
1206                                            body->oa.o_id,
1207                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1208                                                 body->oa.o_gr : (__u64)0,
1209                                            aa->aa_ppga[0]->off,
1210                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1211                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1212                                                                         1);
1213                         CERROR("client %x, server %x\n",
1214                                client_cksum, server_cksum);
1215                         cksum_counter = 0;
1216                         aa->aa_oa->o_cksum = client_cksum;
1217                         rc = -EAGAIN;
1218                 } else {
1219                         cksum_counter++;
1220                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1221                         rc = 0;
1222                 }
1223         } else if (unlikely(client_cksum)) {
1224                 static int cksum_missed;
1225
1226                 cksum_missed++;
1227                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1228                         CERROR("Checksum %u requested from %s but not sent\n",
1229                                cksum_missed, libcfs_nid2str(peer->nid));
1230         } else {
1231                 rc = 0;
1232         }
1233 out:
1234         if (rc >= 0)
1235                 *aa->aa_oa = body->oa;
1236
1237         sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, aa->aa_ppga);
1238
1239         RETURN(rc);
1240 }
1241
1242 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1243                             struct lov_stripe_md *lsm,
1244                             obd_count page_count, struct brw_page **pga,
1245                             struct obd_capa *ocapa)
1246 {
1247         struct ptlrpc_request *req;
1248         int                    rc, retries = 5; /* lprocfs? */
1249         ENTRY;
1250
1251 restart_bulk:
1252         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1253                                   page_count, pga, &req, ocapa);
1254         if (rc != 0)
1255                 return (rc);
1256
1257         rc = ptlrpc_queue_wait(req);
1258
1259         if (rc == -ETIMEDOUT && req->rq_resend) {
1260                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1261                 ptlrpc_req_finished(req);
1262                 goto restart_bulk;
1263         }
1264
1265         rc = osc_brw_fini_request(req, rc);
1266
1267         ptlrpc_req_finished(req);
1268         if (rc == -EAGAIN) {
1269                 if (retries-- > 0)
1270                         goto restart_bulk;
1271                 rc = -EIO;
1272         }
1273         RETURN (rc);
1274 }
1275
1276 int osc_brw_redo_request(struct ptlrpc_request *req,
1277                          struct osc_brw_async_args *aa)
1278 {
1279         struct ptlrpc_request *new_req;
1280         struct ptlrpc_request_set *set = req->rq_set;
1281         struct osc_brw_async_args *new_aa;
1282         struct osc_async_page *oap;
1283         int rc = 0;
1284         ENTRY;
1285
1286         if (aa->aa_retries-- <= 0) {
1287                 CERROR("too many checksum retries, returning error\n");
1288                 RETURN(-EIO);
1289         }
1290
1291         DEBUG_REQ(D_ERROR, req, "redo for checksum error");
1292         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1293                 if (oap->oap_request != NULL) {
1294                         LASSERTF(req == oap->oap_request,
1295                                  "request %p != oap_request %p\n",
1296                                  req, oap->oap_request);
1297                         if (oap->oap_interrupted) {
1298                                 ptlrpc_mark_interrupted(oap->oap_request);
1299                                 rc = -EINTR;
1300                                 break;
1301                         }
1302                 }
1303         }
1304         if (rc)
1305                 RETURN(rc);
1306         /* TODO-MERGE: and where to get ocapa?? */
1307         rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
1308                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1309                                   aa->aa_cli, aa->aa_oa,
1310                                   NULL /* lsm unused by osc currently */,
1311                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1312                                   NULL /* ocapa */);
1313         if (rc)
1314                 RETURN(rc);
1315
1316         /* New request takes over pga and oaps from old request.
1317          * Note that copying a list_head doesn't work, need to move it... */
1318         new_req->rq_interpret_reply = req->rq_interpret_reply;
1319         new_req->rq_async_args = req->rq_async_args;
1320         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1321         INIT_LIST_HEAD(&new_aa->aa_oaps);
1322         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1323         INIT_LIST_HEAD(&aa->aa_oaps);
1324
1325         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1326                 if (oap->oap_request) {
1327                         ptlrpc_req_finished(oap->oap_request);
1328                         oap->oap_request = ptlrpc_request_addref(new_req);
1329                 }
1330         }
1331
1332         ptlrpc_set_add_req(set, new_req);
1333
1334         RETURN(0);
1335 }
1336
1337 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1338 {
1339         struct osc_brw_async_args *aa = data;
1340         int                        i;
1341         int                        nob = rc;
1342         ENTRY;
1343
1344         rc = osc_brw_fini_request(req, rc);
1345         if (rc == -EAGAIN) {
1346                 rc = osc_brw_redo_request(req, aa);
1347                 if (rc == 0)
1348                         RETURN(0);
1349         }
1350         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1351                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1352
1353         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1354         for (i = 0; i < aa->aa_page_count; i++)
1355                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1356         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1357
1358         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1359
1360         RETURN(rc);
1361 }
1362
1363 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1364                           struct lov_stripe_md *lsm, obd_count page_count,
1365                           struct brw_page **pga, struct ptlrpc_request_set *set,
1366                           struct obd_capa *ocapa)
1367 {
1368         struct ptlrpc_request     *req;
1369         struct client_obd         *cli = &exp->exp_obd->u.cli;
1370         int                        rc, i;
1371         ENTRY;
1372
1373         /* Consume write credits even if doing a sync write -
1374          * otherwise we may run out of space on OST due to grant. */
1375         if (cmd == OBD_BRW_WRITE) {
1376                 spin_lock(&cli->cl_loi_list_lock);
1377                 for (i = 0; i < page_count; i++) {
1378                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1379                                 osc_consume_write_grant(cli, pga[i]);
1380                 }
1381                 spin_unlock(&cli->cl_loi_list_lock);
1382         }
1383
1384         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1385                                   &req, ocapa);
1386         if (rc == 0) {
1387                 req->rq_interpret_reply = brw_interpret;
1388                 ptlrpc_set_add_req(set, req);
1389         } else if (cmd == OBD_BRW_WRITE) {
1390                 spin_lock(&cli->cl_loi_list_lock);
1391                 for (i = 0; i < page_count; i++)
1392                         osc_release_write_grant(cli, pga[i], 0);
1393                 spin_unlock(&cli->cl_loi_list_lock);
1394         }
1395         RETURN (rc);
1396 }
1397
1398 /*
1399  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1400  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1401  * fine for our small page arrays and doesn't require allocation.  its an
1402  * insertion sort that swaps elements that are strides apart, shrinking the
1403  * stride down until its '1' and the array is sorted.
1404  */
1405 static void sort_brw_pages(struct brw_page **array, int num)
1406 {
1407         int stride, i, j;
1408         struct brw_page *tmp;
1409
1410         if (num == 1)
1411                 return;
1412         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1413                 ;
1414
1415         do {
1416                 stride /= 3;
1417                 for (i = stride ; i < num ; i++) {
1418                         tmp = array[i];
1419                         j = i;
1420                         while (j >= stride && array[j - stride]->off > tmp->off) {
1421                                 array[j] = array[j - stride];
1422                                 j -= stride;
1423                         }
1424                         array[j] = tmp;
1425                 }
1426         } while (stride > 1);
1427 }
1428
1429 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1430 {
1431         int count = 1;
1432         int offset;
1433         int i = 0;
1434
1435         LASSERT (pages > 0);
1436         offset = pg[i]->off & ~CFS_PAGE_MASK;
1437
1438         for (;;) {
1439                 pages--;
1440                 if (pages == 0)         /* that's all */
1441                         return count;
1442
1443                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1444                         return count;   /* doesn't end on page boundary */
1445
1446                 i++;
1447                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1448                 if (offset != 0)        /* doesn't start on page boundary */
1449                         return count;
1450
1451                 count++;
1452         }
1453 }
1454
1455 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1456 {
1457         struct brw_page **ppga;
1458         int i;
1459
1460         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1461         if (ppga == NULL)
1462                 return NULL;
1463
1464         for (i = 0; i < count; i++)
1465                 ppga[i] = pga + i;
1466         return ppga;
1467 }
1468
1469 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1470 {
1471         LASSERT(ppga != NULL);
1472         OBD_FREE(ppga, sizeof(*ppga) * count);
1473 }
1474
1475 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1476                    obd_count page_count, struct brw_page *pga,
1477                    struct obd_trans_info *oti)
1478 {
1479         struct obdo *saved_oa = NULL;
1480         struct brw_page **ppga, **orig;
1481         struct obd_import *imp = class_exp2cliimp(exp);
1482         struct client_obd *cli = &imp->imp_obd->u.cli;
1483         int rc, page_count_orig;
1484         ENTRY;
1485
1486         if (cmd & OBD_BRW_CHECK) {
1487                 /* The caller just wants to know if there's a chance that this
1488                  * I/O can succeed */
1489
1490                 if (imp == NULL || imp->imp_invalid)
1491                         RETURN(-EIO);
1492                 RETURN(0);
1493         }
1494
1495         /* test_brw with a failed create can trip this, maybe others. */
1496         LASSERT(cli->cl_max_pages_per_rpc);
1497
1498         rc = 0;
1499
1500         orig = ppga = osc_build_ppga(pga, page_count);
1501         if (ppga == NULL)
1502                 RETURN(-ENOMEM);
1503         page_count_orig = page_count;
1504
1505         sort_brw_pages(ppga, page_count);
1506         while (page_count) {
1507                 obd_count pages_per_brw;
1508
1509                 if (page_count > cli->cl_max_pages_per_rpc)
1510                         pages_per_brw = cli->cl_max_pages_per_rpc;
1511                 else
1512                         pages_per_brw = page_count;
1513
1514                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1515
1516                 if (saved_oa != NULL) {
1517                         /* restore previously saved oa */
1518                         *oinfo->oi_oa = *saved_oa;
1519                 } else if (page_count > pages_per_brw) {
1520                         /* save a copy of oa (brw will clobber it) */
1521                         OBDO_ALLOC(saved_oa);
1522                         if (saved_oa == NULL)
1523                                 GOTO(out, rc = -ENOMEM);
1524                         *saved_oa = *oinfo->oi_oa;
1525                 }
1526
1527                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1528                                       pages_per_brw, ppga, oinfo->oi_capa);
1529
1530                 if (rc != 0)
1531                         break;
1532
1533                 page_count -= pages_per_brw;
1534                 ppga += pages_per_brw;
1535         }
1536
1537 out:
1538         osc_release_ppga(orig, page_count_orig);
1539
1540         if (saved_oa != NULL)
1541                 OBDO_FREE(saved_oa);
1542
1543         RETURN(rc);
1544 }
1545
1546 static int osc_brw_async(int cmd, struct obd_export *exp,
1547                          struct obd_info *oinfo, obd_count page_count,
1548                          struct brw_page *pga, struct obd_trans_info *oti,
1549                          struct ptlrpc_request_set *set)
1550 {
1551         struct brw_page **ppga, **orig;
1552         struct client_obd *cli = &exp->exp_obd->u.cli;
1553         int page_count_orig;
1554         int rc = 0;
1555         ENTRY;
1556
1557         if (cmd & OBD_BRW_CHECK) {
1558                 struct obd_import *imp = class_exp2cliimp(exp);
1559                 /* The caller just wants to know if there's a chance that this
1560                  * I/O can succeed */
1561
1562                 if (imp == NULL || imp->imp_invalid)
1563                         RETURN(-EIO);
1564                 RETURN(0);
1565         }
1566
1567         orig = ppga = osc_build_ppga(pga, page_count);
1568         if (ppga == NULL)
1569                 RETURN(-ENOMEM);
1570         page_count_orig = page_count;
1571
1572         sort_brw_pages(ppga, page_count);
1573         while (page_count) {
1574                 struct brw_page **copy;
1575                 obd_count pages_per_brw;
1576
1577                 pages_per_brw = min_t(obd_count, page_count,
1578                                       cli->cl_max_pages_per_rpc);
1579
1580                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1581
1582                 /* use ppga only if single RPC is going to fly */
1583                 if (pages_per_brw != page_count_orig || ppga != orig) {
1584                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1585                         if (copy == NULL)
1586                                 GOTO(out, rc = -ENOMEM);
1587                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1588                 } else
1589                         copy = ppga;
1590
1591                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1592                                     pages_per_brw, copy, set, oinfo->oi_capa);
1593
1594                 if (rc != 0) {
1595                         if (copy != ppga)
1596                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1597                         break;
1598                 }
1599                 if (copy == orig) {
1600                         /* we passed it to async_internal() which is
1601                          * now responsible for releasing memory */
1602                         orig = NULL;
1603                 }
1604
1605                 page_count -= pages_per_brw;
1606                 ppga += pages_per_brw;
1607         }
1608 out:
1609         if (orig)
1610                 osc_release_ppga(orig, page_count_orig);
1611         RETURN(rc);
1612 }
1613
1614 static void osc_check_rpcs(struct client_obd *cli);
1615
1616 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1617  * the dirty accounting.  Writeback completes or truncate happens before
1618  * writing starts.  Must be called with the loi lock held. */
1619 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1620                            int sent)
1621 {
1622         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1623 }
1624
1625
1626 /* This maintains the lists of pending pages to read/write for a given object
1627  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1628  * to quickly find objects that are ready to send an RPC. */
1629 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1630                          int cmd)
1631 {
1632         int optimal;
1633         ENTRY;
1634
1635         if (lop->lop_num_pending == 0)
1636                 RETURN(0);
1637
1638         /* if we have an invalid import we want to drain the queued pages
1639          * by forcing them through rpcs that immediately fail and complete
1640          * the pages.  recovery relies on this to empty the queued pages
1641          * before canceling the locks and evicting down the llite pages */
1642         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1643                 RETURN(1);
1644
1645         /* stream rpcs in queue order as long as as there is an urgent page
1646          * queued.  this is our cheap solution for good batching in the case
1647          * where writepage marks some random page in the middle of the file
1648          * as urgent because of, say, memory pressure */
1649         if (!list_empty(&lop->lop_urgent)) {
1650                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1651                 RETURN(1);
1652         }
1653         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1654         optimal = cli->cl_max_pages_per_rpc;
1655         if (cmd & OBD_BRW_WRITE) {
1656                 /* trigger a write rpc stream as long as there are dirtiers
1657                  * waiting for space.  as they're waiting, they're not going to
1658                  * create more pages to coallesce with what's waiting.. */
1659                 if (!list_empty(&cli->cl_cache_waiters)) {
1660                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1661                         RETURN(1);
1662                 }
1663                 /* +16 to avoid triggering rpcs that would want to include pages
1664                  * that are being queued but which can't be made ready until
1665                  * the queuer finishes with the page. this is a wart for
1666                  * llite::commit_write() */
1667                 optimal += 16;
1668         }
1669         if (lop->lop_num_pending >= optimal)
1670                 RETURN(1);
1671
1672         RETURN(0);
1673 }
1674
1675 static void on_list(struct list_head *item, struct list_head *list,
1676                     int should_be_on)
1677 {
1678         if (list_empty(item) && should_be_on)
1679                 list_add_tail(item, list);
1680         else if (!list_empty(item) && !should_be_on)
1681                 list_del_init(item);
1682 }
1683
1684 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1685  * can find pages to build into rpcs quickly */
1686 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1687 {
1688         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1689                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1690                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1691
1692         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1693                 loi->loi_write_lop.lop_num_pending);
1694
1695         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1696                 loi->loi_read_lop.lop_num_pending);
1697 }
1698
1699 static void lop_update_pending(struct client_obd *cli,
1700                                struct loi_oap_pages *lop, int cmd, int delta)
1701 {
1702         lop->lop_num_pending += delta;
1703         if (cmd & OBD_BRW_WRITE)
1704                 cli->cl_pending_w_pages += delta;
1705         else
1706                 cli->cl_pending_r_pages += delta;
1707 }
1708
1709 /* this is called when a sync waiter receives an interruption.  Its job is to
1710  * get the caller woken as soon as possible.  If its page hasn't been put in an
1711  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1712  * desiring interruption which will forcefully complete the rpc once the rpc
1713  * has timed out */
1714 static void osc_occ_interrupted(struct oig_callback_context *occ)
1715 {
1716         struct osc_async_page *oap;
1717         struct loi_oap_pages *lop;
1718         struct lov_oinfo *loi;
1719         ENTRY;
1720
1721         /* XXX member_of() */
1722         oap = list_entry(occ, struct osc_async_page, oap_occ);
1723
1724         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1725
1726         oap->oap_interrupted = 1;
1727
1728         /* ok, it's been put in an rpc. only one oap gets a request reference */
1729         if (oap->oap_request != NULL) {
1730                 ptlrpc_mark_interrupted(oap->oap_request);
1731                 ptlrpcd_wake(oap->oap_request);
1732                 GOTO(unlock, 0);
1733         }
1734
1735         /* we don't get interruption callbacks until osc_trigger_group_io()
1736          * has been called and put the sync oaps in the pending/urgent lists.*/
1737         if (!list_empty(&oap->oap_pending_item)) {
1738                 list_del_init(&oap->oap_pending_item);
1739                 list_del_init(&oap->oap_urgent_item);
1740
1741                 loi = oap->oap_loi;
1742                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1743                         &loi->loi_write_lop : &loi->loi_read_lop;
1744                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1745                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1746
1747                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1748                 oap->oap_oig = NULL;
1749         }
1750
1751 unlock:
1752         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1753 }
1754
1755 /* this is trying to propogate async writeback errors back up to the
1756  * application.  As an async write fails we record the error code for later if
1757  * the app does an fsync.  As long as errors persist we force future rpcs to be
1758  * sync so that the app can get a sync error and break the cycle of queueing
1759  * pages for which writeback will fail. */
1760 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1761                            int rc)
1762 {
1763         if (rc) {
1764                 if (!ar->ar_rc)
1765                         ar->ar_rc = rc;
1766
1767                 ar->ar_force_sync = 1;
1768                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1769                 return;
1770
1771         }
1772
1773         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1774                 ar->ar_force_sync = 0;
1775 }
1776
1777 static void osc_oap_to_pending(struct osc_async_page *oap)
1778 {
1779         struct loi_oap_pages *lop;
1780
1781         if (oap->oap_cmd & OBD_BRW_WRITE)
1782                 lop = &oap->oap_loi->loi_write_lop;
1783         else
1784                 lop = &oap->oap_loi->loi_read_lop;
1785
1786         if (oap->oap_async_flags & ASYNC_URGENT)
1787                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1788         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1789         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1790 }
1791
1792 /* this must be called holding the loi list lock to give coverage to exit_cache,
1793  * async_flag maintenance, and oap_request */
1794 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1795                               struct osc_async_page *oap, int sent, int rc)
1796 {
1797         ENTRY;
1798         oap->oap_async_flags = 0;
1799         oap->oap_interrupted = 0;
1800
1801         if (oap->oap_cmd & OBD_BRW_WRITE) {
1802                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1803                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1804         }
1805
1806         if (oap->oap_request != NULL) {
1807                 ptlrpc_req_finished(oap->oap_request);
1808                 oap->oap_request = NULL;
1809         }
1810
1811         if (rc == 0 && oa != NULL) {
1812                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1813                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1814                 if (oa->o_valid & OBD_MD_FLMTIME)
1815                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1816                 if (oa->o_valid & OBD_MD_FLATIME)
1817                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1818                 if (oa->o_valid & OBD_MD_FLCTIME)
1819                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1820         }
1821
1822         if (oap->oap_oig) {
1823                 osc_exit_cache(cli, oap, sent);
1824                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1825                 oap->oap_oig = NULL;
1826                 EXIT;
1827                 return;
1828         }
1829
1830         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1831                                                 oap->oap_cmd, oa, rc);
1832
1833         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1834          * I/O on the page could start, but OSC calls it under lock
1835          * and thus we can add oap back to pending safely */
1836         if (rc)
1837                 /* upper layer wants to leave the page on pending queue */
1838                 osc_oap_to_pending(oap);
1839         else
1840                 osc_exit_cache(cli, oap, sent);
1841         EXIT;
1842 }
1843
1844 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1845 {
1846         struct osc_async_page *oap, *tmp;
1847         struct osc_brw_async_args *aa = data;
1848         struct client_obd *cli;
1849         ENTRY;
1850
1851         rc = osc_brw_fini_request(req, rc);
1852         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1853         if (rc == -EAGAIN) {
1854                 rc = osc_brw_redo_request(req, aa);
1855                 if (rc == 0)
1856                         RETURN(0);
1857                 GOTO(out, rc);
1858         }
1859
1860         cli = aa->aa_cli;
1861
1862         client_obd_list_lock(&cli->cl_loi_list_lock);
1863
1864         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1865          * is called so we know whether to go to sync BRWs or wait for more
1866          * RPCs to complete */
1867         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1868                 cli->cl_w_in_flight--;
1869         else
1870                 cli->cl_r_in_flight--;
1871
1872         /* the caller may re-use the oap after the completion call so
1873          * we need to clean it up a little */
1874         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1875                 list_del_init(&oap->oap_rpc_item);
1876                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1877         }
1878
1879         osc_wake_cache_waiters(cli);
1880         osc_check_rpcs(cli);
1881
1882         client_obd_list_unlock(&cli->cl_loi_list_lock);
1883
1884         OBDO_FREE(aa->aa_oa);
1885         rc = 0;
1886 out:
1887         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1888         RETURN(rc);
1889 }
1890
1891 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1892                                             struct list_head *rpc_list,
1893                                             int page_count, int cmd)
1894 {
1895         struct ptlrpc_request *req;
1896         struct brw_page **pga = NULL;
1897         struct osc_brw_async_args *aa;
1898         struct obdo *oa = NULL;
1899         struct obd_async_page_ops *ops = NULL;
1900         void *caller_data = NULL;
1901         struct obd_capa *ocapa;
1902         struct osc_async_page *oap;
1903         int i, rc;
1904
1905         ENTRY;
1906         LASSERT(!list_empty(rpc_list));
1907
1908         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1909         if (pga == NULL)
1910                 RETURN(ERR_PTR(-ENOMEM));
1911
1912         OBDO_ALLOC(oa);
1913         if (oa == NULL)
1914                 GOTO(out, req = ERR_PTR(-ENOMEM));
1915
1916         i = 0;
1917         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1918                 if (ops == NULL) {
1919                         ops = oap->oap_caller_ops;
1920                         caller_data = oap->oap_caller_data;
1921                 }
1922                 pga[i] = &oap->oap_brw_page;
1923                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1924                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1925                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1926                 i++;
1927         }
1928
1929         /* always get the data for the obdo for the rpc */
1930         LASSERT(ops != NULL);
1931         ops->ap_fill_obdo(caller_data, cmd, oa);
1932         ocapa = ops->ap_lookup_capa(caller_data, cmd);
1933
1934         sort_brw_pages(pga, page_count);
1935         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1936                                   pga, &req, ocapa);
1937         capa_put(ocapa);
1938         if (rc != 0) {
1939                 CERROR("prep_req failed: %d\n", rc);
1940                 GOTO(out, req = ERR_PTR(rc));
1941         }
1942
1943         /* Need to update the timestamps after the request is built in case
1944          * we race with setattr (locally or in queue at OST).  If OST gets
1945          * later setattr before earlier BRW (as determined by the request xid),
1946          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1947          * way to do this in a single call.  bug 10150 */
1948         ops->ap_update_obdo(caller_data, cmd, oa,
1949                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1950
1951         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1952         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1953         INIT_LIST_HEAD(&aa->aa_oaps);
1954         list_splice(rpc_list, &aa->aa_oaps);
1955         INIT_LIST_HEAD(rpc_list);
1956
1957 out:
1958         if (IS_ERR(req)) {
1959                 if (oa)
1960                         OBDO_FREE(oa);
1961                 if (pga)
1962                         OBD_FREE(pga, sizeof(*pga) * page_count);
1963         }
1964         RETURN(req);
1965 }
1966
1967 /* the loi lock is held across this function but it's allowed to release
1968  * and reacquire it during its work */
1969 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1970                             int cmd, struct loi_oap_pages *lop)
1971 {
1972         struct ptlrpc_request *req;
1973         obd_count page_count = 0;
1974         struct osc_async_page *oap = NULL, *tmp;
1975         struct osc_brw_async_args *aa;
1976         struct obd_async_page_ops *ops;
1977         CFS_LIST_HEAD(rpc_list);
1978         unsigned int ending_offset;
1979         unsigned  starting_offset = 0;
1980         ENTRY;
1981
1982         /* first we find the pages we're allowed to work with */
1983         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
1984                                  oap_pending_item) {
1985                 ops = oap->oap_caller_ops;
1986
1987                 LASSERT(oap->oap_magic == OAP_MAGIC);
1988
1989                 /* in llite being 'ready' equates to the page being locked
1990                  * until completion unlocks it.  commit_write submits a page
1991                  * as not ready because its unlock will happen unconditionally
1992                  * as the call returns.  if we race with commit_write giving
1993                  * us that page we dont' want to create a hole in the page
1994                  * stream, so we stop and leave the rpc to be fired by
1995                  * another dirtier or kupdated interval (the not ready page
1996                  * will still be on the dirty list).  we could call in
1997                  * at the end of ll_file_write to process the queue again. */
1998                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1999                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2000                         if (rc < 0)
2001                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2002                                                 "instead of ready\n", oap,
2003                                                 oap->oap_page, rc);
2004                         switch (rc) {
2005                         case -EAGAIN:
2006                                 /* llite is telling us that the page is still
2007                                  * in commit_write and that we should try
2008                                  * and put it in an rpc again later.  we
2009                                  * break out of the loop so we don't create
2010                                  * a hole in the sequence of pages in the rpc
2011                                  * stream.*/
2012                                 oap = NULL;
2013                                 break;
2014                         case -EINTR:
2015                                 /* the io isn't needed.. tell the checks
2016                                  * below to complete the rpc with EINTR */
2017                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2018                                 oap->oap_count = -EINTR;
2019                                 break;
2020                         case 0:
2021                                 oap->oap_async_flags |= ASYNC_READY;
2022                                 break;
2023                         default:
2024                                 LASSERTF(0, "oap %p page %p returned %d "
2025                                             "from make_ready\n", oap,
2026                                             oap->oap_page, rc);
2027                                 break;
2028                         }
2029                 }
2030                 if (oap == NULL)
2031                         break;
2032                 /*
2033                  * Page submitted for IO has to be locked. Either by
2034                  * ->ap_make_ready() or by higher layers.
2035                  *
2036                  * XXX nikita: this assertion should be adjusted when lustre
2037                  * starts using PG_writeback for pages being written out.
2038                  */
2039 #if defined(__KERNEL__) && defined(__LINUX__)
2040                 LASSERT(PageLocked(oap->oap_page));
2041 #endif
2042                 /* If there is a gap at the start of this page, it can't merge
2043                  * with any previous page, so we'll hand the network a
2044                  * "fragmented" page array that it can't transfer in 1 RDMA */
2045                 if (page_count != 0 && oap->oap_page_off != 0)
2046                         break;
2047
2048                 /* take the page out of our book-keeping */
2049                 list_del_init(&oap->oap_pending_item);
2050                 lop_update_pending(cli, lop, cmd, -1);
2051                 list_del_init(&oap->oap_urgent_item);
2052
2053                 if (page_count == 0)
2054                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2055                                           (PTLRPC_MAX_BRW_SIZE - 1);
2056
2057                 /* ask the caller for the size of the io as the rpc leaves. */
2058                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2059                         oap->oap_count =
2060                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2061                 if (oap->oap_count <= 0) {
2062                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2063                                oap->oap_count);
2064                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2065                         continue;
2066                 }
2067
2068                 /* now put the page back in our accounting */
2069                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2070                 if (++page_count >= cli->cl_max_pages_per_rpc)
2071                         break;
2072
2073                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2074                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2075                  * have the same alignment as the initial writes that allocated
2076                  * extents on the server. */
2077                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2078                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2079                 if (ending_offset == 0)
2080                         break;
2081
2082                 /* If there is a gap at the end of this page, it can't merge
2083                  * with any subsequent pages, so we'll hand the network a
2084                  * "fragmented" page array that it can't transfer in 1 RDMA */
2085                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2086                         break;
2087         }
2088
2089         osc_wake_cache_waiters(cli);
2090
2091         if (page_count == 0)
2092                 RETURN(0);
2093
2094         loi_list_maint(cli, loi);
2095
2096         client_obd_list_unlock(&cli->cl_loi_list_lock);
2097
2098         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2099         if (IS_ERR(req)) {
2100                 /* this should happen rarely and is pretty bad, it makes the
2101                  * pending list not follow the dirty order */
2102                 client_obd_list_lock(&cli->cl_loi_list_lock);
2103                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2104                         list_del_init(&oap->oap_rpc_item);
2105
2106                         /* queued sync pages can be torn down while the pages
2107                          * were between the pending list and the rpc */
2108                         if (oap->oap_interrupted) {
2109                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2110                                 osc_ap_completion(cli, NULL, oap, 0,
2111                                                   oap->oap_count);
2112                                 continue;
2113                         }
2114                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2115                 }
2116                 loi_list_maint(cli, loi);
2117                 RETURN(PTR_ERR(req));
2118         }
2119
2120         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2121
2122         if (cmd == OBD_BRW_READ) {
2123                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2124                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2125                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2126                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2127                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2128         } else {
2129                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2130                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2131                                  cli->cl_w_in_flight);
2132                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2133                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2134                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2135         }
2136
2137         client_obd_list_lock(&cli->cl_loi_list_lock);
2138
2139         if (cmd == OBD_BRW_READ)
2140                 cli->cl_r_in_flight++;
2141         else
2142                 cli->cl_w_in_flight++;
2143
2144         /* queued sync pages can be torn down while the pages
2145          * were between the pending list and the rpc */
2146         tmp = NULL;
2147         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2148                 /* only one oap gets a request reference */
2149                 if (tmp == NULL)
2150                         tmp = oap;
2151                 if (oap->oap_interrupted && !req->rq_intr) {
2152                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2153                                oap, req);
2154                         ptlrpc_mark_interrupted(req);
2155                 }
2156         }
2157         if (tmp != NULL)
2158                 tmp->oap_request = ptlrpc_request_addref(req);
2159
2160         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2161                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2162
2163         req->rq_interpret_reply = brw_interpret_oap;
2164         ptlrpcd_add_req(req);
2165         RETURN(1);
2166 }
2167
2168 #define LOI_DEBUG(LOI, STR, args...)                                     \
2169         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2170                !list_empty(&(LOI)->loi_cli_item),                        \
2171                (LOI)->loi_write_lop.lop_num_pending,                     \
2172                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2173                (LOI)->loi_read_lop.lop_num_pending,                      \
2174                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2175                args)                                                     \
2176
2177 /* This is called by osc_check_rpcs() to find which objects have pages that
2178  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2179 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2180 {
2181         ENTRY;
2182         /* first return all objects which we already know to have
2183          * pages ready to be stuffed into rpcs */
2184         if (!list_empty(&cli->cl_loi_ready_list))
2185                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2186                                   struct lov_oinfo, loi_cli_item));
2187
2188         /* then if we have cache waiters, return all objects with queued
2189          * writes.  This is especially important when many small files
2190          * have filled up the cache and not been fired into rpcs because
2191          * they don't pass the nr_pending/object threshhold */
2192         if (!list_empty(&cli->cl_cache_waiters) &&
2193             !list_empty(&cli->cl_loi_write_list))
2194                 RETURN(list_entry(cli->cl_loi_write_list.next,
2195                                   struct lov_oinfo, loi_write_item));
2196
2197         /* then return all queued objects when we have an invalid import
2198          * so that they get flushed */
2199         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2200                 if (!list_empty(&cli->cl_loi_write_list))
2201                         RETURN(list_entry(cli->cl_loi_write_list.next,
2202                                           struct lov_oinfo, loi_write_item));
2203                 if (!list_empty(&cli->cl_loi_read_list))
2204                         RETURN(list_entry(cli->cl_loi_read_list.next,
2205                                           struct lov_oinfo, loi_read_item));
2206         }
2207         RETURN(NULL);
2208 }
2209
2210 /* called with the loi list lock held */
2211 static void osc_check_rpcs(struct client_obd *cli)
2212 {
2213         struct lov_oinfo *loi;
2214         int rc = 0, race_counter = 0;
2215         ENTRY;
2216
2217         while ((loi = osc_next_loi(cli)) != NULL) {
2218                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2219
2220                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2221                         break;
2222
2223                 /* attempt some read/write balancing by alternating between
2224                  * reads and writes in an object.  The makes_rpc checks here
2225                  * would be redundant if we were getting read/write work items
2226                  * instead of objects.  we don't want send_oap_rpc to drain a
2227                  * partial read pending queue when we're given this object to
2228                  * do io on writes while there are cache waiters */
2229                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2230                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2231                                               &loi->loi_write_lop);
2232                         if (rc < 0)
2233                                 break;
2234                         if (rc > 0)
2235                                 race_counter = 0;
2236                         else
2237                                 race_counter++;
2238                 }
2239                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2240                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2241                                               &loi->loi_read_lop);
2242                         if (rc < 0)
2243                                 break;
2244                         if (rc > 0)
2245                                 race_counter = 0;
2246                         else
2247                                 race_counter++;
2248                 }
2249
2250                 /* attempt some inter-object balancing by issueing rpcs
2251                  * for each object in turn */
2252                 if (!list_empty(&loi->loi_cli_item))
2253                         list_del_init(&loi->loi_cli_item);
2254                 if (!list_empty(&loi->loi_write_item))
2255                         list_del_init(&loi->loi_write_item);
2256                 if (!list_empty(&loi->loi_read_item))
2257                         list_del_init(&loi->loi_read_item);
2258
2259                 loi_list_maint(cli, loi);
2260
2261                 /* send_oap_rpc fails with 0 when make_ready tells it to
2262                  * back off.  llite's make_ready does this when it tries
2263                  * to lock a page queued for write that is already locked.
2264                  * we want to try sending rpcs from many objects, but we
2265                  * don't want to spin failing with 0.  */
2266                 if (race_counter == 10)
2267                         break;
2268         }
2269         EXIT;
2270 }
2271
2272 /* we're trying to queue a page in the osc so we're subject to the
2273  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2274  * If the osc's queued pages are already at that limit, then we want to sleep
2275  * until there is space in the osc's queue for us.  We also may be waiting for
2276  * write credits from the OST if there are RPCs in flight that may return some
2277  * before we fall back to sync writes.
2278  *
2279  * We need this know our allocation was granted in the presence of signals */
2280 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2281 {
2282         int rc;
2283         ENTRY;
2284         client_obd_list_lock(&cli->cl_loi_list_lock);
2285         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2286         client_obd_list_unlock(&cli->cl_loi_list_lock);
2287         RETURN(rc);
2288 };
2289
2290 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2291  * grant or cache space. */
2292 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2293                            struct osc_async_page *oap)
2294 {
2295         struct osc_cache_waiter ocw;
2296         struct l_wait_info lwi = { 0 };
2297
2298         ENTRY;
2299
2300         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2301                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2302                cli->cl_dirty_max, obd_max_dirty_pages,
2303                cli->cl_lost_grant, cli->cl_avail_grant);
2304
2305         /* force the caller to try sync io.  this can jump the list
2306          * of queued writes and create a discontiguous rpc stream */
2307         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2308             loi->loi_ar.ar_force_sync)
2309                 RETURN(-EDQUOT);
2310
2311         /* Hopefully normal case - cache space and write credits available */
2312         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2313             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2314             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2315                 /* account for ourselves */
2316                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2317                 RETURN(0);
2318         }
2319
2320         /* Make sure that there are write rpcs in flight to wait for.  This
2321          * is a little silly as this object may not have any pending but
2322          * other objects sure might. */
2323         if (cli->cl_w_in_flight) {
2324                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2325                 cfs_waitq_init(&ocw.ocw_waitq);
2326                 ocw.ocw_oap = oap;
2327                 ocw.ocw_rc = 0;
2328
2329                 loi_list_maint(cli, loi);
2330                 osc_check_rpcs(cli);
2331                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2332
2333                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2334                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2335
2336                 client_obd_list_lock(&cli->cl_loi_list_lock);
2337                 if (!list_empty(&ocw.ocw_entry)) {
2338                         list_del(&ocw.ocw_entry);
2339                         RETURN(-EINTR);
2340                 }
2341                 RETURN(ocw.ocw_rc);
2342         }
2343
2344         RETURN(-EDQUOT);
2345 }
2346
2347 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2348                         struct lov_oinfo *loi, cfs_page_t *page,
2349                         obd_off offset, struct obd_async_page_ops *ops,
2350                         void *data, void **res)
2351 {
2352         struct osc_async_page *oap;
2353         ENTRY;
2354
2355         if (!page)
2356                 return size_round(sizeof(*oap));
2357
2358         oap = *res;
2359         oap->oap_magic = OAP_MAGIC;
2360         oap->oap_cli = &exp->exp_obd->u.cli;
2361         oap->oap_loi = loi;
2362
2363         oap->oap_caller_ops = ops;
2364         oap->oap_caller_data = data;
2365
2366         oap->oap_page = page;
2367         oap->oap_obj_off = offset;
2368
2369         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2370         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2371         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2372
2373         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2374
2375         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2376         RETURN(0);
2377 }
2378
2379 struct osc_async_page *oap_from_cookie(void *cookie)
2380 {
2381         struct osc_async_page *oap = cookie;
2382         if (oap->oap_magic != OAP_MAGIC)
2383                 return ERR_PTR(-EINVAL);
2384         return oap;
2385 };
2386
2387 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2388                               struct lov_oinfo *loi, void *cookie,
2389                               int cmd, obd_off off, int count,
2390                               obd_flag brw_flags, enum async_flags async_flags)
2391 {
2392         struct client_obd *cli = &exp->exp_obd->u.cli;
2393         struct osc_async_page *oap;
2394         int rc = 0;
2395         ENTRY;
2396
2397         oap = oap_from_cookie(cookie);
2398         if (IS_ERR(oap))
2399                 RETURN(PTR_ERR(oap));
2400
2401         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2402                 RETURN(-EIO);
2403
2404         if (!list_empty(&oap->oap_pending_item) ||
2405             !list_empty(&oap->oap_urgent_item) ||
2406             !list_empty(&oap->oap_rpc_item))
2407                 RETURN(-EBUSY);
2408
2409         /* check if the file's owner/group is over quota */
2410 #ifdef HAVE_QUOTA_SUPPORT
2411         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2412                 struct obd_async_page_ops *ops;
2413                 struct obdo *oa;
2414
2415                 OBDO_ALLOC(oa);
2416                 if (oa == NULL)
2417                         RETURN(-ENOMEM);
2418
2419                 ops = oap->oap_caller_ops;
2420                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2421                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2422                     NO_QUOTA)
2423                         rc = -EDQUOT;
2424
2425                 OBDO_FREE(oa);
2426                 if (rc)
2427                         RETURN(rc);
2428         }
2429 #endif
2430
2431         if (loi == NULL)
2432                 loi = lsm->lsm_oinfo[0];
2433
2434         client_obd_list_lock(&cli->cl_loi_list_lock);
2435
2436         oap->oap_cmd = cmd;
2437         oap->oap_page_off = off;
2438         oap->oap_count = count;
2439         oap->oap_brw_flags = brw_flags;
2440         oap->oap_async_flags = async_flags;
2441
2442         if (cmd & OBD_BRW_WRITE) {
2443                 rc = osc_enter_cache(cli, loi, oap);
2444                 if (rc) {
2445                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2446                         RETURN(rc);
2447                 }
2448         }
2449
2450         osc_oap_to_pending(oap);
2451         loi_list_maint(cli, loi);
2452
2453         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2454                   cmd);
2455
2456         osc_check_rpcs(cli);
2457         client_obd_list_unlock(&cli->cl_loi_list_lock);
2458
2459         RETURN(0);
2460 }
2461
2462 /* aka (~was & now & flag), but this is more clear :) */
2463 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2464
2465 static int osc_set_async_flags(struct obd_export *exp,
2466                                struct lov_stripe_md *lsm,
2467                                struct lov_oinfo *loi, void *cookie,
2468                                obd_flag async_flags)
2469 {
2470         struct client_obd *cli = &exp->exp_obd->u.cli;
2471         struct loi_oap_pages *lop;
2472         struct osc_async_page *oap;
2473         int rc = 0;
2474         ENTRY;
2475
2476         oap = oap_from_cookie(cookie);
2477         if (IS_ERR(oap))
2478                 RETURN(PTR_ERR(oap));
2479
2480         /*
2481          * bug 7311: OST-side locking is only supported for liblustre for now
2482          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2483          * implementation has to handle case where OST-locked page was picked
2484          * up by, e.g., ->writepage().
2485          */
2486         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2487         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2488                                      * tread here. */
2489
2490         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2491                 RETURN(-EIO);
2492
2493         if (loi == NULL)
2494                 loi = lsm->lsm_oinfo[0];
2495
2496         if (oap->oap_cmd & OBD_BRW_WRITE) {
2497                 lop = &loi->loi_write_lop;
2498         } else {
2499                 lop = &loi->loi_read_lop;
2500         }
2501
2502         client_obd_list_lock(&cli->cl_loi_list_lock);
2503
2504         if (list_empty(&oap->oap_pending_item))
2505                 GOTO(out, rc = -EINVAL);
2506
2507         if ((oap->oap_async_flags & async_flags) == async_flags)
2508                 GOTO(out, rc = 0);
2509
2510         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2511                 oap->oap_async_flags |= ASYNC_READY;
2512
2513         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2514                 if (list_empty(&oap->oap_rpc_item)) {
2515                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2516                         loi_list_maint(cli, loi);
2517                 }
2518         }
2519
2520         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2521                         oap->oap_async_flags);
2522 out:
2523         osc_check_rpcs(cli);
2524         client_obd_list_unlock(&cli->cl_loi_list_lock);
2525         RETURN(rc);
2526 }
2527
2528 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2529                              struct lov_oinfo *loi,
2530                              struct obd_io_group *oig, void *cookie,
2531                              int cmd, obd_off off, int count,
2532                              obd_flag brw_flags,
2533                              obd_flag async_flags)
2534 {
2535         struct client_obd *cli = &exp->exp_obd->u.cli;
2536         struct osc_async_page *oap;
2537         struct loi_oap_pages *lop;
2538         int rc = 0;
2539         ENTRY;
2540
2541         oap = oap_from_cookie(cookie);
2542         if (IS_ERR(oap))
2543                 RETURN(PTR_ERR(oap));
2544
2545         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2546                 RETURN(-EIO);
2547
2548         if (!list_empty(&oap->oap_pending_item) ||
2549             !list_empty(&oap->oap_urgent_item) ||
2550             !list_empty(&oap->oap_rpc_item))
2551                 RETURN(-EBUSY);
2552
2553         if (loi == NULL)
2554                 loi = lsm->lsm_oinfo[0];
2555
2556         client_obd_list_lock(&cli->cl_loi_list_lock);
2557
2558         oap->oap_cmd = cmd;
2559         oap->oap_page_off = off;
2560         oap->oap_count = count;
2561         oap->oap_brw_flags = brw_flags;
2562         oap->oap_async_flags = async_flags;
2563
2564         if (cmd & OBD_BRW_WRITE)
2565                 lop = &loi->loi_write_lop;
2566         else
2567                 lop = &loi->loi_read_lop;
2568
2569         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2570         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2571                 oap->oap_oig = oig;
2572                 rc = oig_add_one(oig, &oap->oap_occ);
2573         }
2574
2575         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2576                   oap, oap->oap_page, rc);
2577
2578         client_obd_list_unlock(&cli->cl_loi_list_lock);
2579
2580         RETURN(rc);
2581 }
2582
2583 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2584                                  struct loi_oap_pages *lop, int cmd)
2585 {
2586         struct list_head *pos, *tmp;
2587         struct osc_async_page *oap;
2588
2589         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2590                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2591                 list_del(&oap->oap_pending_item);
2592                 osc_oap_to_pending(oap);
2593         }
2594         loi_list_maint(cli, loi);
2595 }
2596
2597 static int osc_trigger_group_io(struct obd_export *exp,
2598                                 struct lov_stripe_md *lsm,
2599                                 struct lov_oinfo *loi,
2600                                 struct obd_io_group *oig)
2601 {
2602         struct client_obd *cli = &exp->exp_obd->u.cli;
2603         ENTRY;
2604
2605         if (loi == NULL)
2606                 loi = lsm->lsm_oinfo[0];
2607
2608         client_obd_list_lock(&cli->cl_loi_list_lock);
2609
2610         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2611         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2612
2613         osc_check_rpcs(cli);
2614         client_obd_list_unlock(&cli->cl_loi_list_lock);
2615
2616         RETURN(0);
2617 }
2618
2619 static int osc_teardown_async_page(struct obd_export *exp,
2620                                    struct lov_stripe_md *lsm,
2621                                    struct lov_oinfo *loi, void *cookie)
2622 {
2623         struct client_obd *cli = &exp->exp_obd->u.cli;
2624         struct loi_oap_pages *lop;
2625         struct osc_async_page *oap;
2626         int rc = 0;
2627         ENTRY;
2628
2629         oap = oap_from_cookie(cookie);
2630         if (IS_ERR(oap))
2631                 RETURN(PTR_ERR(oap));
2632
2633         if (loi == NULL)
2634                 loi = lsm->lsm_oinfo[0];
2635
2636         if (oap->oap_cmd & OBD_BRW_WRITE) {
2637                 lop = &loi->loi_write_lop;
2638         } else {
2639                 lop = &loi->loi_read_lop;
2640         }
2641
2642         client_obd_list_lock(&cli->cl_loi_list_lock);
2643
2644         if (!list_empty(&oap->oap_rpc_item))
2645                 GOTO(out, rc = -EBUSY);
2646
2647         osc_exit_cache(cli, oap, 0);
2648         osc_wake_cache_waiters(cli);
2649
2650         if (!list_empty(&oap->oap_urgent_item)) {
2651                 list_del_init(&oap->oap_urgent_item);
2652                 oap->oap_async_flags &= ~ASYNC_URGENT;
2653         }
2654         if (!list_empty(&oap->oap_pending_item)) {
2655                 list_del_init(&oap->oap_pending_item);
2656                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2657         }
2658         loi_list_maint(cli, loi);
2659
2660         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2661 out:
2662         client_obd_list_unlock(&cli->cl_loi_list_lock);
2663         RETURN(rc);
2664 }
2665
2666 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2667                                     int flags)
2668 {
2669         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2670
2671         if (lock == NULL) {
2672                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2673                 return;
2674         }
2675         lock_res_and_lock(lock);
2676 #ifdef __KERNEL__
2677 #ifdef __LINUX__
2678         /* Liang XXX: Darwin and Winnt checking should be added */
2679         if (lock->l_ast_data && lock->l_ast_data != data) {
2680                 struct inode *new_inode = data;
2681                 struct inode *old_inode = lock->l_ast_data;
2682                 if (!(old_inode->i_state & I_FREEING))
2683                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2684                 LASSERTF(old_inode->i_state & I_FREEING,
2685                          "Found existing inode %p/%lu/%u state %lu in lock: "
2686                          "setting data to %p/%lu/%u\n", old_inode,
2687                          old_inode->i_ino, old_inode->i_generation,
2688                          old_inode->i_state,
2689                          new_inode, new_inode->i_ino, new_inode->i_generation);
2690         }
2691 #endif
2692 #endif
2693         lock->l_ast_data = data;
2694         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2695         unlock_res_and_lock(lock);
2696         LDLM_LOCK_PUT(lock);
2697 }
2698
2699 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2700                              ldlm_iterator_t replace, void *data)
2701 {
2702         struct ldlm_res_id res_id = { .name = {0} };
2703         struct obd_device *obd = class_exp2obd(exp);
2704
2705         res_id.name[0] = lsm->lsm_object_id;
2706         res_id.name[2] = lsm->lsm_object_gr;
2707
2708         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2709         return 0;
2710 }
2711
2712 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2713                             int intent, int rc)
2714 {
2715         ENTRY;
2716
2717         if (intent) {
2718                 /* The request was created before ldlm_cli_enqueue call. */
2719                 if (rc == ELDLM_LOCK_ABORTED) {
2720                         struct ldlm_reply *rep;
2721
2722                         /* swabbed by ldlm_cli_enqueue() */
2723                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2724                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2725                                              sizeof(*rep));
2726                         LASSERT(rep != NULL);
2727                         if (rep->lock_policy_res1)
2728                                 rc = rep->lock_policy_res1;
2729                 }
2730         }
2731
2732         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2733                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2734                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2735                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2736                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2737         }
2738
2739         /* Call the update callback. */
2740         rc = oinfo->oi_cb_up(oinfo, rc);
2741         RETURN(rc);
2742 }
2743
2744 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2745                                  struct osc_enqueue_args *aa, int rc)
2746 {
2747         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2748         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2749         struct ldlm_lock *lock;
2750
2751         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2752          * be valid. */
2753         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2754
2755         /* Complete obtaining the lock procedure. */
2756         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2757                                    aa->oa_ei->ei_mode,
2758                                    &aa->oa_ei->ei_flags,
2759                                    &lsm->lsm_oinfo[0]->loi_lvb,
2760                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2761                                    lustre_swab_ost_lvb,
2762                                    aa->oa_oi->oi_lockh, rc);
2763
2764         /* Complete osc stuff. */
2765         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2766
2767         /* Release the lock for async request. */
2768         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2769                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2770
2771         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2772                  aa->oa_oi->oi_lockh, req, aa);
2773         LDLM_LOCK_PUT(lock);
2774         return rc;
2775 }
2776
2777 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2778  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2779  * other synchronous requests, however keeping some locks and trying to obtain
2780  * others may take a considerable amount of time in a case of ost failure; and
2781  * when other sync requests do not get released lock from a client, the client
2782  * is excluded from the cluster -- such scenarious make the life difficult, so
2783  * release locks just after they are obtained. */
2784 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2785                        struct obd_enqueue_info *einfo)
2786 {
2787         struct ldlm_res_id res_id = { .name = {0} };
2788         struct obd_device *obd = exp->exp_obd;
2789         struct ldlm_reply *rep;
2790         struct ptlrpc_request *req = NULL;
2791         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2792         int rc;
2793         ENTRY;
2794
2795         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2796         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2797
2798         /* Filesystem lock extents are extended to page boundaries so that
2799          * dealing with the page cache is a little smoother.  */
2800         oinfo->oi_policy.l_extent.start -=
2801                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2802         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2803
2804         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2805                 goto no_match;
2806
2807         /* Next, search for already existing extent locks that will cover us */
2808         rc = ldlm_lock_match(obd->obd_namespace,
2809                              einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2810                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2811                              oinfo->oi_lockh);
2812         if (rc == 1) {
2813                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2814                                         einfo->ei_flags);
2815                 if (intent) {
2816                         /* I would like to be able to ASSERT here that rss <=
2817                          * kms, but I can't, for reasons which are explained in
2818                          * lov_enqueue() */
2819                 }
2820
2821                 /* We already have a lock, and it's referenced */
2822                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2823
2824                 /* For async requests, decref the lock. */
2825                 if (einfo->ei_rqset)
2826                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2827
2828                 RETURN(ELDLM_OK);
2829         }
2830
2831         /* If we're trying to read, we also search for an existing PW lock.  The
2832          * VFS and page cache already protect us locally, so lots of readers/
2833          * writers can share a single PW lock.
2834          *
2835          * There are problems with conversion deadlocks, so instead of
2836          * converting a read lock to a write lock, we'll just enqueue a new
2837          * one.
2838          *
2839          * At some point we should cancel the read lock instead of making them
2840          * send us a blocking callback, but there are problems with canceling
2841          * locks out from other users right now, too. */
2842
2843         if (einfo->ei_mode == LCK_PR) {
2844                 rc = ldlm_lock_match(obd->obd_namespace,
2845                                      einfo->ei_flags | LDLM_FL_LVB_READY,
2846                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2847                                      LCK_PW, oinfo->oi_lockh);
2848                 if (rc == 1) {
2849                         /* FIXME: This is not incredibly elegant, but it might
2850                          * be more elegant than adding another parameter to
2851                          * lock_match.  I want a second opinion. */
2852                         /* addref the lock only if not async requests. */
2853                         if (!einfo->ei_rqset)
2854                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2855                         osc_set_data_with_check(oinfo->oi_lockh,
2856                                                 einfo->ei_cbdata,
2857                                                 einfo->ei_flags);
2858                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2859                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2860                         RETURN(ELDLM_OK);
2861                 }
2862         }
2863
2864  no_match:
2865         if (intent) {
2866                 int size[3] = {
2867                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2868                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
2869
2870                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2871                                       LDLM_ENQUEUE, 2, size, NULL);
2872                 if (req == NULL)
2873                         RETURN(-ENOMEM);
2874
2875                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2876                 size[DLM_REPLY_REC_OFF] =
2877                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2878                 ptlrpc_req_set_repsize(req, 3, size);
2879         }
2880
2881         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2882         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2883
2884         rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
2885                               &oinfo->oi_policy, einfo->ei_mode,
2886                               &einfo->ei_flags, einfo->ei_cb_bl,
2887                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2888                               einfo->ei_cbdata,
2889                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2890                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2891                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2892                               einfo->ei_rqset ? 1 : 0);
2893         if (einfo->ei_rqset) {
2894                 if (!rc) {
2895                         struct osc_enqueue_args *aa;
2896                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2897                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2898                         aa->oa_oi = oinfo;
2899                         aa->oa_ei = einfo;
2900                         aa->oa_exp = exp;
2901
2902                         req->rq_interpret_reply = osc_enqueue_interpret;
2903                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2904                 } else if (intent) {
2905                         ptlrpc_req_finished(req);
2906                 }
2907                 RETURN(rc);
2908         }
2909
2910         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2911         if (intent)
2912                 ptlrpc_req_finished(req);
2913
2914         RETURN(rc);
2915 }
2916
2917 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2918                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2919                      int *flags, void *data, struct lustre_handle *lockh)
2920 {
2921         struct ldlm_res_id res_id = { .name = {0} };
2922         struct obd_device *obd = exp->exp_obd;
2923         int rc;
2924         int lflags = *flags;
2925         ENTRY;
2926
2927         res_id.name[0] = lsm->lsm_object_id;
2928         res_id.name[2] = lsm->lsm_object_gr;
2929
2930         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2931
2932         /* Filesystem lock extents are extended to page boundaries so that
2933          * dealing with the page cache is a little smoother */
2934         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2935         policy->l_extent.end |= ~CFS_PAGE_MASK;
2936
2937         /* Next, search for already existing extent locks that will cover us */
2938         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2939                              &res_id, type, policy, mode, lockh);
2940         if (rc) {
2941                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2942                         osc_set_data_with_check(lockh, data, lflags);
2943                 RETURN(rc);
2944         }
2945         /* If we're trying to read, we also search for an existing PW lock.  The
2946          * VFS and page cache already protect us locally, so lots of readers/
2947          * writers can share a single PW lock. */
2948         if (mode == LCK_PR) {
2949                 rc = ldlm_lock_match(obd->obd_namespace,
2950                                      lflags | LDLM_FL_LVB_READY, &res_id,
2951                                      type, policy, LCK_PW, lockh);
2952                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2953                         /* FIXME: This is not incredibly elegant, but it might
2954                          * be more elegant than adding another parameter to
2955                          * lock_match.  I want a second opinion. */
2956                         osc_set_data_with_check(lockh, data, lflags);
2957                         ldlm_lock_addref(lockh, LCK_PR);
2958                         ldlm_lock_decref(lockh, LCK_PW);
2959                 }
2960         }
2961         RETURN(rc);
2962 }
2963
2964 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2965                       __u32 mode, struct lustre_handle *lockh)
2966 {
2967         ENTRY;
2968
2969         if (unlikely(mode == LCK_GROUP))
2970                 ldlm_lock_decref_and_cancel(lockh, mode);
2971         else
2972                 ldlm_lock_decref(lockh, mode);
2973
2974         RETURN(0);
2975 }
2976
2977 static int osc_cancel_unused(struct obd_export *exp,
2978                              struct lov_stripe_md *lsm, int flags,
2979                              void *opaque)
2980 {
2981         struct obd_device *obd = class_exp2obd(exp);
2982         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2983
2984         if (lsm != NULL) {
2985                 res_id.name[0] = lsm->lsm_object_id;
2986                 res_id.name[2] = lsm->lsm_object_gr;
2987                 resp = &res_id;
2988         }
2989
2990         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags,
2991                                       opaque);
2992 }
2993
2994 static int osc_join_lru(struct obd_export *exp,
2995                         struct lov_stripe_md *lsm, int join)
2996 {
2997         struct obd_device *obd = class_exp2obd(exp);
2998         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2999
3000         if (lsm != NULL) {
3001                 res_id.name[0] = lsm->lsm_object_id;
3002                 res_id.name[2] = lsm->lsm_object_gr;
3003                 resp = &res_id;
3004         }
3005
3006         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3007 }
3008
3009 static int osc_statfs_interpret(struct ptlrpc_request *req,
3010                                 struct osc_async_args *aa, int rc)
3011 {
3012         struct obd_statfs *msfs;
3013         ENTRY;
3014
3015         if (rc != 0)
3016                 GOTO(out, rc);
3017
3018         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3019                                   lustre_swab_obd_statfs);
3020         if (msfs == NULL) {
3021                 CERROR("Can't unpack obd_statfs\n");
3022                 GOTO(out, rc = -EPROTO);
3023         }
3024
3025         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3026 out:
3027         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3028         RETURN(rc);
3029 }
3030
3031 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3032                             __u64 max_age, struct ptlrpc_request_set *rqset)
3033 {
3034         struct ptlrpc_request *req;
3035         struct osc_async_args *aa;
3036         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3037         ENTRY;
3038
3039         /* We could possibly pass max_age in the request (as an absolute
3040          * timestamp or a "seconds.usec ago") so the target can avoid doing
3041          * extra calls into the filesystem if that isn't necessary (e.g.
3042          * during mount that would help a bit).  Having relative timestamps
3043          * is not so great if request processing is slow, while absolute
3044          * timestamps are not ideal because they need time synchronization. */
3045         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3046                               OST_STATFS, 1, NULL, NULL);
3047         if (!req)
3048                 RETURN(-ENOMEM);
3049
3050         ptlrpc_req_set_repsize(req, 2, size);
3051         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3052
3053         req->rq_interpret_reply = osc_statfs_interpret;
3054         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3055         aa = (struct osc_async_args *)&req->rq_async_args;
3056         aa->aa_oi = oinfo;
3057
3058         ptlrpc_set_add_req(rqset, req);
3059         RETURN(0);
3060 }
3061
3062 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3063                       __u64 max_age)
3064 {
3065         struct obd_statfs *msfs;
3066         struct ptlrpc_request *req;
3067         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3068         ENTRY;
3069
3070         /* We could possibly pass max_age in the request (as an absolute
3071          * timestamp or a "seconds.usec ago") so the target can avoid doing
3072          * extra calls into the filesystem if that isn't necessary (e.g.
3073          * during mount that would help a bit).  Having relative timestamps
3074          * is not so great if request processing is slow, while absolute
3075          * timestamps are not ideal because they need time synchronization. */
3076         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3077                               OST_STATFS, 1, NULL, NULL);
3078         if (!req)
3079                 RETURN(-ENOMEM);
3080
3081         ptlrpc_req_set_repsize(req, 2, size);
3082         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3083
3084         rc = ptlrpc_queue_wait(req);
3085         if (rc)
3086                 GOTO(out, rc);
3087
3088         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3089                                   lustre_swab_obd_statfs);
3090         if (msfs == NULL) {
3091                 CERROR("Can't unpack obd_statfs\n");
3092                 GOTO(out, rc = -EPROTO);
3093         }
3094
3095         memcpy(osfs, msfs, sizeof(*osfs));
3096
3097         EXIT;
3098  out:
3099         ptlrpc_req_finished(req);
3100         return rc;
3101 }
3102
3103 /* Retrieve object striping information.
3104  *
3105  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3106  * the maximum number of OST indices which will fit in the user buffer.
3107  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3108  */
3109 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3110 {
3111         struct lov_user_md lum, *lumk;
3112         int rc = 0, lum_size;
3113         ENTRY;
3114
3115         if (!lsm)
3116                 RETURN(-ENODATA);
3117
3118         if (copy_from_user(&lum, lump, sizeof(lum)))
3119                 RETURN(-EFAULT);
3120
3121         if (lum.lmm_magic != LOV_USER_MAGIC)
3122                 RETURN(-EINVAL);
3123
3124         if (lum.lmm_stripe_count > 0) {
3125                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3126                 OBD_ALLOC(lumk, lum_size);
3127                 if (!lumk)
3128                         RETURN(-ENOMEM);
3129
3130                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3131                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3132         } else {
3133                 lum_size = sizeof(lum);
3134                 lumk = &lum;
3135         }
3136
3137         lumk->lmm_object_id = lsm->lsm_object_id;
3138         lumk->lmm_object_gr = lsm->lsm_object_gr;
3139         lumk->lmm_stripe_count = 1;
3140
3141         if (copy_to_user(lump, lumk, lum_size))
3142                 rc = -EFAULT;
3143
3144         if (lumk != &lum)
3145                 OBD_FREE(lumk, lum_size);
3146
3147         RETURN(rc);
3148 }
3149
3150
3151 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3152                          void *karg, void *uarg)
3153 {
3154         struct obd_device *obd = exp->exp_obd;
3155         struct obd_ioctl_data *data = karg;
3156         int err = 0;
3157         ENTRY;
3158
3159 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3160         MOD_INC_USE_COUNT;
3161 #else
3162         if (!try_module_get(THIS_MODULE)) {
3163                 CERROR("Can't get module. Is it alive?");
3164                 return -EINVAL;
3165         }
3166 #endif
3167         switch (cmd) {
3168         case OBD_IOC_LOV_GET_CONFIG: {
3169                 char *buf;
3170                 struct lov_desc *desc;
3171                 struct obd_uuid uuid;
3172
3173                 buf = NULL;
3174                 len = 0;
3175                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3176                         GOTO(out, err = -EINVAL);
3177
3178                 data = (struct obd_ioctl_data *)buf;
3179
3180                 if (sizeof(*desc) > data->ioc_inllen1) {
3181                         obd_ioctl_freedata(buf, len);
3182                         GOTO(out, err = -EINVAL);
3183                 }
3184
3185                 if (data->ioc_inllen2 < sizeof(uuid)) {
3186                         obd_ioctl_freedata(buf, len);
3187                         GOTO(out, err = -EINVAL);
3188                 }
3189
3190                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3191                 desc->ld_tgt_count = 1;
3192                 desc->ld_active_tgt_count = 1;
3193                 desc->ld_default_stripe_count = 1;
3194                 desc->ld_default_stripe_size = 0;
3195                 desc->ld_default_stripe_offset = 0;
3196                 desc->ld_pattern = 0;
3197                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3198
3199                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3200
3201                 err = copy_to_user((void *)uarg, buf, len);
3202                 if (err)
3203                         err = -EFAULT;
3204                 obd_ioctl_freedata(buf, len);
3205                 GOTO(out, err);
3206         }
3207         case LL_IOC_LOV_SETSTRIPE:
3208                 err = obd_alloc_memmd(exp, karg);
3209                 if (err > 0)
3210                         err = 0;
3211                 GOTO(out, err);
3212         case LL_IOC_LOV_GETSTRIPE:
3213                 err = osc_getstripe(karg, uarg);
3214                 GOTO(out, err);
3215         case OBD_IOC_CLIENT_RECOVER:
3216                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3217                                             data->ioc_inlbuf1);
3218                 if (err > 0)
3219                         err = 0;
3220                 GOTO(out, err);
3221         case IOC_OSC_SET_ACTIVE:
3222                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3223                                                data->ioc_offset);
3224                 GOTO(out, err);
3225         case OBD_IOC_POLL_QUOTACHECK:
3226                 err = lquota_poll_check(quota_interface, exp,
3227                                         (struct if_quotacheck *)karg);
3228                 GOTO(out, err);
3229         default:
3230                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3231                        cmd, cfs_curproc_comm());
3232                 GOTO(out, err = -ENOTTY);
3233         }
3234 out:
3235 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3236         MOD_DEC_USE_COUNT;
3237 #else
3238         module_put(THIS_MODULE);
3239 #endif
3240         return err;
3241 }
3242
3243 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3244                         void *key, __u32 *vallen, void *val)
3245 {
3246         ENTRY;
3247         if (!vallen || !val)
3248                 RETURN(-EFAULT);
3249
3250         if (keylen > strlen("lock_to_stripe") &&
3251             strcmp(key, "lock_to_stripe") == 0) {
3252                 __u32 *stripe = val;
3253                 *vallen = sizeof(*stripe);
3254                 *stripe = 0;
3255                 RETURN(0);
3256         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3257                 struct ptlrpc_request *req;
3258                 obd_id *reply;
3259                 char *bufs[2] = { NULL, key };
3260                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3261
3262                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3263                                       OST_GET_INFO, 2, size, bufs);
3264                 if (req == NULL)
3265                         RETURN(-ENOMEM);
3266
3267                 size[REPLY_REC_OFF] = *vallen;
3268                 ptlrpc_req_set_repsize(req, 2, size);
3269                 rc = ptlrpc_queue_wait(req);
3270                 if (rc)
3271                         GOTO(out, rc);
3272
3273                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3274                                            lustre_swab_ost_last_id);
3275                 if (reply == NULL) {
3276                         CERROR("Can't unpack OST last ID\n");
3277                         GOTO(out, rc = -EPROTO);
3278                 }
3279                 *((obd_id *)val) = *reply;
3280         out:
3281                 ptlrpc_req_finished(req);
3282                 RETURN(rc);
3283         }
3284         RETURN(-EINVAL);
3285 }
3286
3287 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3288                                           void *aa, int rc)
3289 {
3290         struct llog_ctxt *ctxt;
3291         struct obd_import *imp = req->rq_import;
3292         ENTRY;
3293
3294         if (rc != 0)
3295                 RETURN(rc);
3296
3297         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3298         if (ctxt) {
3299                 if (rc == 0)
3300                         rc = llog_initiator_connect(ctxt);
3301                 else
3302                         CERROR("cannot establish connection for "
3303                                "ctxt %p: %d\n", ctxt, rc);
3304         }
3305
3306         spin_lock(&imp->imp_lock);
3307         imp->imp_server_timeout = 1;
3308         imp->imp_pingable = 1;
3309         spin_unlock(&imp->imp_lock);
3310         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3311
3312         RETURN(rc);
3313 }
3314
3315 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3316                               void *key, obd_count vallen, void *val,
3317                               struct ptlrpc_request_set *set)
3318 {
3319         struct ptlrpc_request *req;
3320         struct obd_device  *obd = exp->exp_obd;
3321         struct obd_import *imp = class_exp2cliimp(exp);
3322         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3323         char *bufs[3] = { NULL, key, val };
3324         ENTRY;
3325
3326         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3327
3328         if (KEY_IS(KEY_NEXT_ID)) {
3329                 if (vallen != sizeof(obd_id))
3330                         RETURN(-EINVAL);
3331                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3332                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3333                        exp->exp_obd->obd_name,
3334                        obd->u.cli.cl_oscc.oscc_next_id);
3335
3336                 RETURN(0);
3337         }
3338
3339         if (KEY_IS("unlinked")) {
3340                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3341                 spin_lock(&oscc->oscc_lock);
3342                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3343                 spin_unlock(&oscc->oscc_lock);
3344                 RETURN(0);
3345         }
3346
3347         if (KEY_IS(KEY_INIT_RECOV)) {
3348                 if (vallen != sizeof(int))
3349                         RETURN(-EINVAL);
3350                 spin_lock(&imp->imp_lock);
3351                 imp->imp_initial_recov = *(int *)val;
3352                 spin_unlock(&imp->imp_lock);
3353                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3354                        exp->exp_obd->obd_name,
3355                        imp->imp_initial_recov);
3356                 RETURN(0);
3357         }
3358
3359         if (KEY_IS("checksum")) {
3360                 if (vallen != sizeof(int))
3361                         RETURN(-EINVAL);
3362                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3363                 RETURN(0);
3364         }
3365
3366         if (KEY_IS(KEY_FLUSH_CTX)) {
3367                 sptlrpc_import_flush_my_ctx(imp);
3368                 RETURN(0);
3369         }
3370
3371         if (!set)
3372                 RETURN(-EINVAL);
3373
3374         /* We pass all other commands directly to OST. Since nobody calls osc
3375            methods directly and everybody is supposed to go through LOV, we
3376            assume lov checked invalid values for us.
3377            The only recognised values so far are evict_by_nid and mds_conn.
3378            Even if something bad goes through, we'd get a -EINVAL from OST
3379            anyway. */
3380
3381         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3382                               bufs);
3383         if (req == NULL)
3384                 RETURN(-ENOMEM);
3385
3386         if (KEY_IS("mds_conn")) {
3387                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3388
3389                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3390                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3391                 LASSERT(oscc->oscc_oa.o_gr > 0);
3392                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3393         }
3394
3395         ptlrpc_req_set_repsize(req, 1, NULL);
3396         ptlrpc_set_add_req(set, req);
3397         ptlrpc_check_set(set);
3398
3399         RETURN(0);
3400 }
3401
3402
3403 static struct llog_operations osc_size_repl_logops = {
3404         lop_cancel: llog_obd_repl_cancel
3405 };
3406
3407 static struct llog_operations osc_mds_ost_orig_logops;
3408 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3409                          struct obd_device *tgt, int count,
3410                          struct llog_catid *catid, struct obd_uuid *uuid)
3411 {
3412         int rc;
3413         ENTRY;
3414
3415         spin_lock(&obd->obd_dev_lock);
3416         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3417                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3418                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3419                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3420                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3421                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3422         }
3423         spin_unlock(&obd->obd_dev_lock);
3424
3425         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3426                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3427         if (rc) {
3428                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3429                 GOTO (out, rc);
3430         }
3431
3432         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3433                         &osc_size_repl_logops);
3434         if (rc)
3435                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3436 out:
3437         if (rc) {
3438                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3439                        obd->obd_name, tgt->obd_name, count, catid, rc);
3440                 CERROR("logid "LPX64":0x%x\n",
3441                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3442         }
3443         RETURN(rc);
3444 }
3445
3446 static int osc_llog_finish(struct obd_device *obd, int count)
3447 {
3448         struct llog_ctxt *ctxt;
3449         int rc = 0, rc2 = 0;
3450         ENTRY;
3451
3452         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3453         if (ctxt)
3454                 rc = llog_cleanup(ctxt);
3455
3456         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3457         if (ctxt)
3458                 rc2 = llog_cleanup(ctxt);
3459         if (!rc)
3460                 rc = rc2;
3461
3462         RETURN(rc);
3463 }
3464
3465 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3466                          struct obd_uuid *cluuid,
3467                          struct obd_connect_data *data)
3468 {
3469         struct client_obd *cli = &obd->u.cli;
3470
3471         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3472                 long lost_grant;
3473
3474                 client_obd_list_lock(&cli->cl_loi_list_lock);
3475                 data->ocd_grant = cli->cl_avail_grant ?:
3476                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3477                 lost_grant = cli->cl_lost_grant;
3478                 cli->cl_lost_grant = 0;
3479                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3480
3481                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3482                        "cl_lost_grant: %ld\n", data->ocd_grant,
3483                        cli->cl_avail_grant, lost_grant);
3484                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3485                        " ocd_grant: %d\n", data->ocd_connect_flags,
3486                        data->ocd_version, data->ocd_grant);
3487         }
3488
3489         RETURN(0);
3490 }
3491
3492 static int osc_disconnect(struct obd_export *exp)
3493 {
3494         struct obd_device *obd = class_exp2obd(exp);
3495         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3496         int rc;
3497
3498         if (obd->u.cli.cl_conn_count == 1)
3499                 /* flush any remaining cancel messages out to the target */
3500                 llog_sync(ctxt, exp);
3501
3502         rc = client_disconnect_export(exp);
3503         return rc;
3504 }
3505
3506 static int osc_import_event(struct obd_device *obd,
3507                             struct obd_import *imp,
3508                             enum obd_import_event event)
3509 {
3510         struct client_obd *cli;
3511         int rc = 0;
3512
3513         ENTRY;
3514         LASSERT(imp->imp_obd == obd);
3515
3516         switch (event) {
3517         case IMP_EVENT_DISCON: {
3518                 /* Only do this on the MDS OSC's */
3519                 if (imp->imp_server_timeout) {
3520                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3521
3522                         spin_lock(&oscc->oscc_lock);
3523                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3524                         spin_unlock(&oscc->oscc_lock);
3525                 }
3526                 cli = &obd->u.cli;
3527                 client_obd_list_lock(&cli->cl_loi_list_lock);
3528                 cli->cl_avail_grant = 0;
3529                 cli->cl_lost_grant = 0;
3530                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3531                 break;
3532         }
3533         case IMP_EVENT_INACTIVE: {
3534                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3535                 break;
3536         }
3537         case IMP_EVENT_INVALIDATE: {
3538                 struct ldlm_namespace *ns = obd->obd_namespace;
3539
3540                 /* Reset grants */
3541                 cli = &obd->u.cli;
3542                 client_obd_list_lock(&cli->cl_loi_list_lock);
3543                 /* all pages go to failing rpcs due to the invalid import */
3544                 osc_check_rpcs(cli);
3545                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3546
3547                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3548
3549                 break;
3550         }
3551         case IMP_EVENT_ACTIVE: {
3552                 /* Only do this on the MDS OSC's */
3553                 if (imp->imp_server_timeout) {
3554                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3555
3556                         spin_lock(&oscc->oscc_lock);
3557                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3558                         spin_unlock(&oscc->oscc_lock);
3559                 }
3560                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3561                 break;
3562         }
3563         case IMP_EVENT_OCD: {
3564                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3565
3566                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3567                         osc_init_grant(&obd->u.cli, ocd);
3568
3569                 /* See bug 7198 */
3570                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3571                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3572
3573                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3574                 break;
3575         }
3576         default:
3577                 CERROR("Unknown import event %d\n", event);
3578                 LBUG();
3579         }
3580         RETURN(rc);
3581 }
3582
3583 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3584 {
3585         int rc;
3586         ENTRY;
3587
3588         ENTRY;
3589         rc = ptlrpcd_addref();
3590         if (rc)
3591                 RETURN(rc);
3592
3593         rc = client_obd_setup(obd, lcfg);
3594         if (rc) {
3595                 ptlrpcd_decref();
3596         } else {
3597                 struct lprocfs_static_vars lvars;
3598                 struct client_obd *cli = &obd->u.cli;
3599
3600                 lprocfs_init_vars(osc, &lvars);
3601                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3602                         lproc_osc_attach_seqstat(obd);
3603                         ptlrpc_lprocfs_register_obd(obd);
3604                 }
3605
3606                 oscc_init(obd);
3607                 /* We need to allocate a few requests more, because
3608                    brw_interpret_oap tries to create new requests before freeing
3609                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3610                    reserved, but I afraid that might be too much wasted RAM
3611                    in fact, so 2 is just my guess and still should work. */
3612                 cli->cl_import->imp_rq_pool =
3613                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3614                                             OST_MAXREQSIZE,
3615                                             ptlrpc_add_rqs_to_pool);
3616         }
3617
3618         RETURN(rc);
3619 }
3620
3621 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3622 {
3623         int rc = 0;
3624         ENTRY;
3625
3626         switch (stage) {
3627         case OBD_CLEANUP_EARLY: {
3628                 struct obd_import *imp;
3629                 imp = obd->u.cli.cl_import;
3630                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3631                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3632                 ptlrpc_deactivate_import(imp);
3633                 spin_lock(&imp->imp_lock);
3634                 imp->imp_pingable = 0;
3635                 spin_unlock(&imp->imp_lock);
3636                 break;
3637         }
3638         case OBD_CLEANUP_EXPORTS: {
3639                 /* If we set up but never connected, the
3640                    client import will not have been cleaned. */
3641                 if (obd->u.cli.cl_import) {
3642                         struct obd_import *imp;
3643                         imp = obd->u.cli.cl_import;
3644                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3645                                obd->obd_name);
3646                         ptlrpc_invalidate_import(imp);
3647                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3648                         class_destroy_import(imp);
3649                         obd->u.cli.cl_import = NULL;
3650                 }
3651                 break;
3652         }
3653         case OBD_CLEANUP_SELF_EXP:
3654                 rc = obd_llog_finish(obd, 0);
3655                 if (rc != 0)
3656                         CERROR("failed to cleanup llogging subsystems\n");
3657                 break;
3658         case OBD_CLEANUP_OBD:
3659                 break;
3660         }
3661         RETURN(rc);
3662 }
3663
3664 int osc_cleanup(struct obd_device *obd)
3665 {
3666         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3667         int rc;
3668
3669         ENTRY;
3670         ptlrpc_lprocfs_unregister_obd(obd);
3671         lprocfs_obd_cleanup(obd);
3672
3673         spin_lock(&oscc->oscc_lock);
3674         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3675         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3676         spin_unlock(&oscc->oscc_lock);
3677
3678         /* free memory of osc quota cache */
3679         lquota_cleanup(quota_interface, obd);
3680
3681         rc = client_obd_cleanup(obd);
3682
3683         ptlrpcd_decref();
3684         RETURN(rc);
3685 }
3686
3687 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3688 {
3689         struct lustre_cfg *lcfg = buf;
3690         struct lprocfs_static_vars lvars;
3691         int rc = 0;
3692
3693         lprocfs_init_vars(osc, &lvars);
3694
3695         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3696         return(rc);
3697 }
3698
3699 struct obd_ops osc_obd_ops = {
3700         .o_owner                = THIS_MODULE,
3701         .o_setup                = osc_setup,
3702         .o_precleanup           = osc_precleanup,
3703         .o_cleanup              = osc_cleanup,
3704         .o_add_conn             = client_import_add_conn,
3705         .o_del_conn             = client_import_del_conn,
3706         .o_connect              = client_connect_import,
3707         .o_reconnect            = osc_reconnect,
3708         .o_disconnect           = osc_disconnect,
3709         .o_statfs               = osc_statfs,
3710         .o_statfs_async         = osc_statfs_async,
3711         .o_packmd               = osc_packmd,
3712         .o_unpackmd             = osc_unpackmd,
3713         .o_create               = osc_create,
3714         .o_destroy              = osc_destroy,
3715         .o_getattr              = osc_getattr,
3716         .o_getattr_async        = osc_getattr_async,
3717         .o_setattr              = osc_setattr,
3718         .o_setattr_async        = osc_setattr_async,
3719         .o_brw                  = osc_brw,
3720         .o_brw_async            = osc_brw_async,
3721         .o_prep_async_page      = osc_prep_async_page,
3722         .o_queue_async_io       = osc_queue_async_io,
3723         .o_set_async_flags      = osc_set_async_flags,
3724         .o_queue_group_io       = osc_queue_group_io,
3725         .o_trigger_group_io     = osc_trigger_group_io,
3726         .o_teardown_async_page  = osc_teardown_async_page,
3727         .o_punch                = osc_punch,
3728         .o_sync                 = osc_sync,
3729         .o_enqueue              = osc_enqueue,
3730         .o_match                = osc_match,
3731         .o_change_cbdata        = osc_change_cbdata,
3732         .o_cancel               = osc_cancel,
3733         .o_cancel_unused        = osc_cancel_unused,
3734         .o_join_lru             = osc_join_lru,
3735         .o_iocontrol            = osc_iocontrol,
3736         .o_get_info             = osc_get_info,
3737         .o_set_info_async       = osc_set_info_async,
3738         .o_import_event         = osc_import_event,
3739         .o_llog_init            = osc_llog_init,
3740         .o_llog_finish          = osc_llog_finish,
3741         .o_process_config       = osc_process_config,
3742 };
3743
3744 int __init osc_init(void)
3745 {
3746         struct lprocfs_static_vars lvars;
3747         int rc;
3748         ENTRY;
3749
3750         lprocfs_init_vars(osc, &lvars);
3751
3752         request_module("lquota");
3753         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3754         lquota_init(quota_interface);
3755         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3756
3757         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3758                                  LUSTRE_OSC_NAME, NULL);
3759         if (rc) {
3760                 if (quota_interface)
3761                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3762                 RETURN(rc);
3763         }
3764
3765         RETURN(rc);
3766 }
3767
3768 #ifdef __KERNEL__
3769 static void /*__exit*/ osc_exit(void)
3770 {
3771         lquota_exit(quota_interface);
3772         if (quota_interface)
3773                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3774
3775         class_unregister_type(LUSTRE_OSC_NAME);
3776 }
3777
3778 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3779 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3780 MODULE_LICENSE("GPL");
3781
3782 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3783 #endif