Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
178         LASSERT(body);
179
180         body->oa = *oinfo->oi_oa;
181         osc_pack_capa(req, body, oinfo->oi_capa);
182 }
183
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185                                      const struct req_msg_field *field,
186                                      struct obd_capa *oc)
187 {
188         if (oc == NULL)
189                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
190         else
191                 /* it is already calculated as sizeof struct obd_capa */
192                 ;
193 }
194
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196                                  struct osc_async_args *aa, int rc)
197 {
198         struct ost_body *body;
199         ENTRY;
200
201         if (rc != 0)
202                 GOTO(out, rc);
203
204         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205                                   lustre_swab_ost_body);
206         if (body) {
207                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
209
210                 /* This should really be sent by the OST */
211                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
213         } else {
214                 CDEBUG(D_INFO, "can't unpack ost_body\n");
215                 rc = -EPROTO;
216                 aa->aa_oi->oi_oa->o_valid = 0;
217         }
218 out:
219         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
220         RETURN(rc);
221 }
222
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224                              struct ptlrpc_request_set *set)
225 {
226         struct ptlrpc_request *req;
227         struct osc_async_args *aa;
228         int                    rc;
229         ENTRY;
230
231         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
232         if (req == NULL)
233                 RETURN(-ENOMEM);
234
235         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
237         if (rc) {
238                 ptlrpc_request_free(req);
239                 RETURN(rc);
240         }
241
242         osc_pack_req_body(req, oinfo);
243
244         ptlrpc_request_set_replen(req);
245         req->rq_interpret_reply = osc_getattr_interpret;
246
247         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248         aa = (struct osc_async_args *)&req->rq_async_args;
249         aa->aa_oi = oinfo;
250
251         ptlrpc_set_add_req(set, req);
252         RETURN(0);
253 }
254
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
256 {
257         struct ptlrpc_request *req;
258         struct ost_body       *body;
259         int                    rc;
260         ENTRY;
261
262         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
263         if (req == NULL)
264                 RETURN(-ENOMEM);
265
266         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
268         if (rc) {
269                 ptlrpc_request_free(req);
270                 RETURN(rc);
271         }
272
273         osc_pack_req_body(req, oinfo);
274
275         ptlrpc_request_set_replen(req);
276  
277         rc = ptlrpc_queue_wait(req);
278         if (rc)
279                 GOTO(out, rc);
280
281         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
282         if (body == NULL)
283                 GOTO(out, rc = -EPROTO);
284
285         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286         *oinfo->oi_oa = body->oa;
287
288         /* This should really be sent by the OST */
289         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
291
292         EXIT;
293  out:
294         ptlrpc_req_finished(req);
295         return rc;
296 }
297
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299                        struct obd_trans_info *oti)
300 {
301         struct ptlrpc_request *req;
302         struct ost_body       *body;
303         int                    rc;
304         ENTRY;
305
306         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307                                         oinfo->oi_oa->o_gr > 0);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
310         if (req == NULL)
311                 RETURN(-ENOMEM);
312
313         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
315         if (rc) {
316                 ptlrpc_request_free(req);
317                 RETURN(rc);
318         }
319
320         osc_pack_req_body(req, oinfo);
321
322         ptlrpc_request_set_replen(req);
323  
324
325         rc = ptlrpc_queue_wait(req);
326         if (rc)
327                 GOTO(out, rc);
328
329         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
330         if (body == NULL)
331                 GOTO(out, rc = -EPROTO);
332
333         *oinfo->oi_oa = body->oa;
334
335         EXIT;
336 out:
337         ptlrpc_req_finished(req);
338         RETURN(rc);
339 }
340
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342                                  struct osc_async_args *aa, int rc)
343 {
344         struct ost_body *body;
345         ENTRY;
346
347         if (rc != 0)
348                 GOTO(out, rc);
349
350         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
351         if (body == NULL)
352                 GOTO(out, rc = -EPROTO);
353
354         *aa->aa_oi->oi_oa = body->oa;
355 out:
356         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361                              struct obd_trans_info *oti,
362                              struct ptlrpc_request_set *rqset)
363 {
364         struct ptlrpc_request *req;
365         struct osc_async_args *aa;
366         int                    rc;
367         ENTRY;
368
369         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
370         if (req == NULL)
371                 RETURN(-ENOMEM);
372
373         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 RETURN(rc);
378         }
379
380         osc_pack_req_body(req, oinfo);
381
382         ptlrpc_request_set_replen(req);
383  
384         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
385                 LASSERT(oti);
386                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
387         }
388
389         /* do mds to ost setattr asynchronouly */
390         if (!rqset) {
391                 /* Do not wait for response. */
392                 ptlrpcd_add_req(req);
393         } else {
394                 req->rq_interpret_reply = osc_setattr_interpret;
395
396                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397                 aa = (struct osc_async_args *)&req->rq_async_args;
398                 aa->aa_oi = oinfo;
399
400                 ptlrpc_set_add_req(rqset, req);
401         }
402
403         RETURN(0);
404 }
405
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
408 {
409         struct ptlrpc_request *req;
410         struct ost_body       *body;
411         struct lov_stripe_md  *lsm;
412         int                    rc;
413         ENTRY;
414
415         LASSERT(oa);
416         LASSERT(ea);
417
418         lsm = *ea;
419         if (!lsm) {
420                 rc = obd_alloc_memmd(exp, &lsm);
421                 if (rc < 0)
422                         RETURN(rc);
423         }
424
425         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
426         if (req == NULL)
427                 GOTO(out, rc = -ENOMEM);
428
429         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
430         if (rc) {
431                 ptlrpc_request_free(req);
432                 GOTO(out, rc);
433         }
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         body->oa = *oa;
438
439         ptlrpc_request_set_replen(req);
440
441         if (oa->o_valid & OBD_MD_FLINLINE) {
442                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443                         oa->o_flags == OBD_FL_DELORPHAN);
444                 DEBUG_REQ(D_HA, req,
445                           "delorphan from OST integration");
446                 /* Don't resend the delorphan req */
447                 req->rq_no_resend = req->rq_no_delay = 1;
448         }
449
450         rc = ptlrpc_queue_wait(req);
451         if (rc)
452                 GOTO(out_req, rc);
453
454         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
455         if (body == NULL)
456                 GOTO(out_req, rc = -EPROTO);
457
458         *oa = body->oa;
459
460         /* This should really be sent by the OST */
461         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462         oa->o_valid |= OBD_MD_FLBLKSZ;
463
464         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465          * have valid lsm_oinfo data structs, so don't go touching that.
466          * This needs to be fixed in a big way.
467          */
468         lsm->lsm_object_id = oa->o_id;
469         lsm->lsm_object_gr = oa->o_gr;
470         *ea = lsm;
471
472         if (oti != NULL) {
473                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
474
475                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476                         if (!oti->oti_logcookies)
477                                 oti_alloc_cookies(oti, 1);
478                         *oti->oti_logcookies = *obdo_logcookie(oa);
479                 }
480         }
481
482         CDEBUG(D_HA, "transno: "LPD64"\n",
483                lustre_msg_get_transno(req->rq_repmsg));
484 out_req:
485         ptlrpc_req_finished(req);
486 out:
487         if (rc && !*ea)
488                 obd_free_memmd(exp, &lsm);
489         RETURN(rc);
490 }
491
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493                                struct osc_async_args *aa, int rc)
494 {
495         struct ost_body *body;
496         ENTRY;
497
498         if (rc != 0)
499                 GOTO(out, rc);
500
501         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
502         if (body == NULL)
503                 GOTO(out, rc = -EPROTO);
504
505         *aa->aa_oi->oi_oa = body->oa;
506 out:
507         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
508         RETURN(rc);
509 }
510
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512                      struct obd_trans_info *oti,
513                      struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request *req;
516         struct osc_async_args *aa;
517         struct ost_body       *body;
518         int                    rc;
519         ENTRY;
520
521         if (!oinfo->oi_oa) {
522                 CDEBUG(D_INFO, "oa NULL\n");
523                 RETURN(-EINVAL);
524         }
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         osc_pack_req_body(req, oinfo);
538
539         /* overload the size and blocks fields in the oa with start/end */
540         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
541         LASSERT(body);
542         body->oa.o_size = oinfo->oi_policy.l_extent.start;
543         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545         ptlrpc_request_set_replen(req);
546
547
548         req->rq_interpret_reply = osc_punch_interpret;
549         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550         aa = (struct osc_async_args *)&req->rq_async_args;
551         aa->aa_oi = oinfo;
552         ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558                     struct lov_stripe_md *md, obd_size start, obd_size end,
559                     void *capa)
560 {
561         struct ptlrpc_request *req;
562         struct ost_body       *body;
563         int                    rc;
564         ENTRY;
565
566         if (!oa) {
567                 CDEBUG(D_INFO, "oa NULL\n");
568                 RETURN(-EINVAL);
569         }
570
571         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
572         if (req == NULL)
573                 RETURN(-ENOMEM);
574
575         osc_set_capa_size(req, &RMF_CAPA1, capa);
576         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
577         if (rc) {
578                 ptlrpc_request_free(req);
579                 RETURN(rc);
580         }
581
582         /* overload the size and blocks fields in the oa with start/end */
583         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
584         LASSERT(body);
585         body->oa = *oa;
586         body->oa.o_size = start;
587         body->oa.o_blocks = end;
588         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589         osc_pack_capa(req, body, capa);
590
591         ptlrpc_request_set_replen(req);
592
593         rc = ptlrpc_queue_wait(req);
594         if (rc)
595                 GOTO(out, rc);
596
597         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
598         if (body == NULL)
599                 GOTO(out, rc = -EPROTO);
600
601         *oa = body->oa;
602
603         EXIT;
604  out:
605         ptlrpc_req_finished(req);
606         return rc;
607 }
608
609 /* Find and cancel locally locks matched by @mode in the resource found by
610  * @objid. Found locks are added into @cancel list. Returns the amount of
611  * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613                                    struct list_head *cancels, ldlm_mode_t mode,
614                                    int lock_flags)
615 {
616         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
619         int count;
620         ENTRY;
621
622         if (res == NULL)
623                 RETURN(0);
624
625         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626                                            lock_flags, 0, NULL);
627         ldlm_resource_putref(res);
628         RETURN(count);
629 }
630
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
632                                  int rc)
633 {
634         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
635
636         atomic_dec(&cli->cl_destroy_in_flight);
637         cfs_waitq_signal(&cli->cl_destroy_waitq);
638         return 0;
639 }
640
641 static int osc_can_send_destroy(struct client_obd *cli)
642 {
643         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644             cli->cl_max_rpcs_in_flight) {
645                 /* The destroy request can be sent */
646                 return 1;
647         }
648         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649             cli->cl_max_rpcs_in_flight) {
650                 /*
651                  * The counter has been modified between the two atomic
652                  * operations.
653                  */
654                 cfs_waitq_signal(&cli->cl_destroy_waitq);
655         }
656         return 0;
657 }
658
659 /* Destroy requests can be async always on the client, and we don't even really
660  * care about the return code since the client cannot do anything at all about
661  * a destroy failure.
662  * When the MDS is unlinking a filename, it saves the file objects into a
663  * recovery llog, and these object records are cancelled when the OST reports
664  * they were destroyed and sync'd to disk (i.e. transaction committed).
665  * If the client dies, or the OST is down when the object should be destroyed,
666  * the records are not cancelled, and when the OST reconnects to the MDS next,
667  * it will retrieve the llog unlink logs and then sends the log cancellation
668  * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
671                        struct obd_export *md_export)
672 {
673         struct client_obd     *cli = &exp->exp_obd->u.cli;
674         struct ptlrpc_request *req;
675         struct ost_body       *body;
676         CFS_LIST_HEAD(cancels);
677         int rc, count;
678         ENTRY;
679
680         if (!oa) {
681                 CDEBUG(D_INFO, "oa NULL\n");
682                 RETURN(-EINVAL);
683         }
684
685         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686                                         LDLM_FL_DISCARD_DATA);
687
688         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
689         if (req == NULL) {
690                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
691                 RETURN(-ENOMEM);
692         }
693
694         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
695                                0, &cancels, count);
696         if (rc) {
697                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
698                 ptlrpc_request_free(req);
699                 RETURN(rc);
700         }
701
702         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
703         req->rq_interpret_reply = osc_destroy_interpret;
704
705         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
706                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
707                        sizeof(*oti->oti_logcookies));
708         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
709         LASSERT(body);
710         body->oa = *oa;
711
712         ptlrpc_request_set_replen(req);
713
714         if (!osc_can_send_destroy(cli)) {
715                 struct l_wait_info lwi = { 0 };
716
717                 /*
718                  * Wait until the number of on-going destroy RPCs drops
719                  * under max_rpc_in_flight
720                  */
721                 l_wait_event_exclusive(cli->cl_destroy_waitq,
722                                        osc_can_send_destroy(cli), &lwi);
723         }
724
725         /* Do not wait for response */
726         ptlrpcd_add_req(req);
727         RETURN(0);
728 }
729
730 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
731                                 long writing_bytes)
732 {
733         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
734
735         LASSERT(!(oa->o_valid & bits));
736
737         oa->o_valid |= bits;
738         client_obd_list_lock(&cli->cl_loi_list_lock);
739         oa->o_dirty = cli->cl_dirty;
740         if (cli->cl_dirty > cli->cl_dirty_max) {
741                 CERROR("dirty %lu > dirty_max %lu\n",
742                        cli->cl_dirty, cli->cl_dirty_max);
743                 oa->o_undirty = 0;
744         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
745                 CERROR("dirty %d > system dirty_max %d\n",
746                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
747                 oa->o_undirty = 0;
748         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
749                 CERROR("dirty %lu - dirty_max %lu too big???\n",
750                        cli->cl_dirty, cli->cl_dirty_max);
751                 oa->o_undirty = 0;
752         } else {
753                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
754                                 (cli->cl_max_rpcs_in_flight + 1);
755                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
756         }
757         oa->o_grant = cli->cl_avail_grant;
758         oa->o_dropped = cli->cl_lost_grant;
759         cli->cl_lost_grant = 0;
760         client_obd_list_unlock(&cli->cl_loi_list_lock);
761         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
762                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
763 }
764
765 /* caller must hold loi_list_lock */
766 static void osc_consume_write_grant(struct client_obd *cli,
767                                     struct brw_page *pga)
768 {
769         atomic_inc(&obd_dirty_pages);
770         cli->cl_dirty += CFS_PAGE_SIZE;
771         cli->cl_avail_grant -= CFS_PAGE_SIZE;
772         pga->flag |= OBD_BRW_FROM_GRANT;
773         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
774                CFS_PAGE_SIZE, pga, pga->pg);
775         LASSERT(cli->cl_avail_grant >= 0);
776 }
777
778 /* the companion to osc_consume_write_grant, called when a brw has completed.
779  * must be called with the loi lock held. */
780 static void osc_release_write_grant(struct client_obd *cli,
781                                     struct brw_page *pga, int sent)
782 {
783         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
784         ENTRY;
785
786         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
787                 EXIT;
788                 return;
789         }
790
791         pga->flag &= ~OBD_BRW_FROM_GRANT;
792         atomic_dec(&obd_dirty_pages);
793         cli->cl_dirty -= CFS_PAGE_SIZE;
794         if (!sent) {
795                 cli->cl_lost_grant += CFS_PAGE_SIZE;
796                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
797                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
798         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
799                 /* For short writes we shouldn't count parts of pages that
800                  * span a whole block on the OST side, or our accounting goes
801                  * wrong.  Should match the code in filter_grant_check. */
802                 int offset = pga->off & ~CFS_PAGE_MASK;
803                 int count = pga->count + (offset & (blocksize - 1));
804                 int end = (offset + pga->count) & (blocksize - 1);
805                 if (end)
806                         count += blocksize - end;
807
808                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
809                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
810                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
811                        cli->cl_avail_grant, cli->cl_dirty);
812         }
813
814         EXIT;
815 }
816
817 static unsigned long rpcs_in_flight(struct client_obd *cli)
818 {
819         return cli->cl_r_in_flight + cli->cl_w_in_flight;
820 }
821
822 /* caller must hold loi_list_lock */
823 void osc_wake_cache_waiters(struct client_obd *cli)
824 {
825         struct list_head *l, *tmp;
826         struct osc_cache_waiter *ocw;
827
828         ENTRY;
829         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
830                 /* if we can't dirty more, we must wait until some is written */
831                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
832                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
833                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
834                                "osc max %ld, sys max %d\n", cli->cl_dirty,
835                                cli->cl_dirty_max, obd_max_dirty_pages);
836                         return;
837                 }
838
839                 /* if still dirty cache but no grant wait for pending RPCs that
840                  * may yet return us some grant before doing sync writes */
841                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
842                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
843                                cli->cl_w_in_flight);
844                         return;
845                 }
846
847                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
848                 list_del_init(&ocw->ocw_entry);
849                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
850                         /* no more RPCs in flight to return grant, do sync IO */
851                         ocw->ocw_rc = -EDQUOT;
852                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
853                 } else {
854                         osc_consume_write_grant(cli,
855                                                 &ocw->ocw_oap->oap_brw_page);
856                 }
857
858                 cfs_waitq_signal(&ocw->ocw_waitq);
859         }
860
861         EXIT;
862 }
863
864 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
865 {
866         client_obd_list_lock(&cli->cl_loi_list_lock);
867         cli->cl_avail_grant = ocd->ocd_grant;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869
870         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
871                cli->cl_avail_grant, cli->cl_lost_grant);
872         LASSERT(cli->cl_avail_grant >= 0);
873 }
874
875 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
879         if (body->oa.o_valid & OBD_MD_FLGRANT)
880                 cli->cl_avail_grant += body->oa.o_grant;
881         /* waiters are woken in brw_interpret_oap */
882         client_obd_list_unlock(&cli->cl_loi_list_lock);
883 }
884
885 /* We assume that the reason this OSC got a short read is because it read
886  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
887  * via the LOV, and it _knows_ it's reading inside the file, it's just that
888  * this stripe never got written at or beyond this stripe offset yet. */
889 static void handle_short_read(int nob_read, obd_count page_count,
890                               struct brw_page **pga)
891 {
892         char *ptr;
893         int i = 0;
894
895         /* skip bytes read OK */
896         while (nob_read > 0) {
897                 LASSERT (page_count > 0);
898
899                 if (pga[i]->count > nob_read) {
900                         /* EOF inside this page */
901                         ptr = cfs_kmap(pga[i]->pg) +
902                                 (pga[i]->off & ~CFS_PAGE_MASK);
903                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
904                         cfs_kunmap(pga[i]->pg);
905                         page_count--;
906                         i++;
907                         break;
908                 }
909
910                 nob_read -= pga[i]->count;
911                 page_count--;
912                 i++;
913         }
914
915         /* zero remaining pages */
916         while (page_count-- > 0) {
917                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
918                 memset(ptr, 0, pga[i]->count);
919                 cfs_kunmap(pga[i]->pg);
920                 i++;
921         }
922 }
923
924 static int check_write_rcs(struct ptlrpc_request *req,
925                            int requested_nob, int niocount,
926                            obd_count page_count, struct brw_page **pga)
927 {
928         int    *remote_rcs, i;
929
930         /* return error if any niobuf was in error */
931         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
932                                         sizeof(*remote_rcs) * niocount, NULL);
933         if (remote_rcs == NULL) {
934                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
935                 return(-EPROTO);
936         }
937         if (lustre_msg_swabbed(req->rq_repmsg))
938                 for (i = 0; i < niocount; i++)
939                         __swab32s(&remote_rcs[i]);
940
941         for (i = 0; i < niocount; i++) {
942                 if (remote_rcs[i] < 0)
943                         return(remote_rcs[i]);
944
945                 if (remote_rcs[i] != 0) {
946                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
947                                 i, remote_rcs[i], req);
948                         return(-EPROTO);
949                 }
950         }
951
952         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
953                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
954                        requested_nob, req->rq_bulk->bd_nob_transferred);
955                 return(-EPROTO);
956         }
957
958         return (0);
959 }
960
961 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
962 {
963         if (p1->flag != p2->flag) {
964                 unsigned mask = ~OBD_BRW_FROM_GRANT;
965
966                 /* warn if we try to combine flags that we don't know to be
967                  * safe to combine */
968                 if ((p1->flag & mask) != (p2->flag & mask))
969                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
970                                "same brw?\n", p1->flag, p2->flag);
971                 return 0;
972         }
973
974         return (p1->off + p1->count == p2->off);
975 }
976
977 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
978                                    struct brw_page **pga, int opc)
979 {
980         __u32 cksum = ~0;
981         int i = 0;
982
983         LASSERT (pg_count > 0);
984         while (nob > 0 && pg_count > 0) {
985                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
986                 int off = pga[i]->off & ~CFS_PAGE_MASK;
987                 int count = pga[i]->count > nob ? nob : pga[i]->count;
988
989                 /* corrupt the data before we compute the checksum, to
990                  * simulate an OST->client data error */
991                 if (i == 0 && opc == OST_READ &&
992                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
993                         memcpy(ptr + off, "bad1", min(4, nob));
994                 cksum = crc32_le(cksum, ptr + off, count);
995                 cfs_kunmap(pga[i]->pg);
996                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
997                                off, cksum);
998
999                 nob -= pga[i]->count;
1000                 pg_count--;
1001                 i++;
1002         }
1003         /* For sending we only compute the wrong checksum instead
1004          * of corrupting the data so it is still correct on a redo */
1005         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1006                 cksum++;
1007
1008         return cksum;
1009 }
1010
1011 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1012                                 struct lov_stripe_md *lsm, obd_count page_count,
1013                                 struct brw_page **pga, 
1014                                 struct ptlrpc_request **reqp,
1015                                 struct obd_capa *ocapa)
1016 {
1017         struct ptlrpc_request   *req;
1018         struct ptlrpc_bulk_desc *desc;
1019         struct ost_body         *body;
1020         struct obd_ioobj        *ioobj;
1021         struct niobuf_remote    *niobuf;
1022         int niocount, i, requested_nob, opc, rc;
1023         struct osc_brw_async_args *aa;
1024         struct req_capsule      *pill;
1025
1026         ENTRY;
1027         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1028                 RETURN(-ENOMEM); /* Recoverable */
1029         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1030                 RETURN(-EINVAL); /* Fatal */
1031
1032         if ((cmd & OBD_BRW_WRITE) != 0) {
1033                 opc = OST_WRITE;
1034                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1035                                                 cli->cl_import->imp_rq_pool,
1036                                                 &RQF_OST_BRW);
1037         } else {
1038                 opc = OST_READ;
1039                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1040         }
1041
1042         if (req == NULL)
1043                 RETURN(-ENOMEM);
1044
1045         for (niocount = i = 1; i < page_count; i++) {
1046                 if (!can_merge_pages(pga[i - 1], pga[i]))
1047                         niocount++;
1048         }
1049
1050         pill = &req->rq_pill;
1051         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1052                              niocount * sizeof(*niobuf));
1053         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1054
1055         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1056         if (rc) {
1057                 ptlrpc_request_free(req);
1058                 RETURN(rc);
1059         }
1060         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1061
1062         if (opc == OST_WRITE)
1063                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1064                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1065         else
1066                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1067                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1068
1069         if (desc == NULL)
1070                 GOTO(out, rc = -ENOMEM);
1071         /* NB request now owns desc and will free it when it gets freed */
1072
1073         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1074         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1075         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1076         LASSERT(body && ioobj && niobuf);
1077
1078         body->oa = *oa;
1079
1080         obdo_to_ioobj(oa, ioobj);
1081         ioobj->ioo_bufcnt = niocount;
1082         osc_pack_capa(req, body, ocapa);
1083         LASSERT (page_count > 0);
1084         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1085                 struct brw_page *pg = pga[i];
1086                 struct brw_page *pg_prev = pga[i - 1];
1087
1088                 LASSERT(pg->count > 0);
1089                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1090                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1091                          pg->off, pg->count);
1092 #ifdef __linux__
1093                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1094                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1095                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1096                          i, page_count,
1097                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1098                          pg_prev->pg, page_private(pg_prev->pg),
1099                          pg_prev->pg->index, pg_prev->off);
1100 #else
1101                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1102                          "i %d p_c %u\n", i, page_count);
1103 #endif
1104                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1105                         (pg->flag & OBD_BRW_SRVLOCK));
1106
1107                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1108                                       pg->count);
1109                 requested_nob += pg->count;
1110
1111                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1112                         niobuf--;
1113                         niobuf->len += pg->count;
1114                 } else {
1115                         niobuf->offset = pg->off;
1116                         niobuf->len    = pg->count;
1117                         niobuf->flags  = pg->flag;
1118                 }
1119         }
1120
1121         LASSERT((void *)(niobuf - niocount) ==
1122                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1123                                niocount * sizeof(*niobuf)));
1124         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1125
1126         /* size[REQ_REC_OFF] still sizeof (*body) */
1127         if (opc == OST_WRITE) {
1128                 if (unlikely(cli->cl_checksum) &&
1129                     req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL) {
1130                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1131                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1132                                                              page_count, pga,
1133                                                              OST_WRITE);
1134                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1135                                body->oa.o_cksum);
1136                         /* save this in 'oa', too, for later checking */
1137                         oa->o_valid |= OBD_MD_FLCKSUM;
1138                 } else {
1139                         /* clear out the checksum flag, in case this is a
1140                          * resend but cl_checksum is no longer set. b=11238 */
1141                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1142                 }
1143                 oa->o_cksum = body->oa.o_cksum;
1144                 /* 1 RC per niobuf */
1145                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1146                                      sizeof(__u32) * niocount);
1147         } else {
1148                 if (unlikely(cli->cl_checksum) &&
1149                     req->rq_flvr.sf_bulk_csum == BULK_CSUM_ALG_NULL)
1150                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1151                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1152                 /* 1 RC for the whole I/O */
1153         }
1154         ptlrpc_request_set_replen(req);
1155
1156         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1157         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1158         aa->aa_oa = oa;
1159         aa->aa_requested_nob = requested_nob;
1160         aa->aa_nio_count = niocount;
1161         aa->aa_page_count = page_count;
1162         aa->aa_resends = 0;
1163         aa->aa_ppga = pga;
1164         aa->aa_cli = cli;
1165         INIT_LIST_HEAD(&aa->aa_oaps);
1166
1167         *reqp = req;
1168         RETURN(0);
1169
1170  out:
1171         ptlrpc_req_finished(req);
1172         RETURN(rc);
1173 }
1174
1175 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1176                                 __u32 client_cksum, __u32 server_cksum,
1177                                 int nob, obd_count page_count,
1178                                 struct brw_page **pga)
1179 {
1180         __u32 new_cksum;
1181         char *msg;
1182
1183         if (server_cksum == client_cksum) {
1184                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1185                 return 0;
1186         }
1187
1188         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1189
1190         if (new_cksum == server_cksum)
1191                 msg = "changed on the client after we checksummed it - "
1192                       "likely false positive due to mmap IO (bug 11742)";
1193         else if (new_cksum == client_cksum)
1194                 msg = "changed in transit before arrival at OST";
1195         else
1196                 msg = "changed in transit AND doesn't match the original - "
1197                       "likely false positive due to mmap IO (bug 11742)";
1198
1199         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1200                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1201                            "["LPU64"-"LPU64"]\n",
1202                            msg, libcfs_nid2str(peer->nid),
1203                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1204                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1205                                                         (__u64)0,
1206                            oa->o_id,
1207                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1208                            pga[0]->off,
1209                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1210         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1211                client_cksum, server_cksum, new_cksum);
1212         return 1;        
1213 }
1214
1215 /* Note rc enters this function as number of bytes transferred */
1216 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1217 {
1218         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1219         const lnet_process_id_t *peer =
1220                         &req->rq_import->imp_connection->c_peer;
1221         struct client_obd *cli = aa->aa_cli;
1222         struct ost_body *body;
1223         __u32 client_cksum = 0;
1224         ENTRY;
1225
1226         if (rc < 0 && rc != -EDQUOT)
1227                 RETURN(rc);
1228
1229         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1230         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1231                                   lustre_swab_ost_body);
1232         if (body == NULL) {
1233                 CDEBUG(D_INFO, "Can't unpack body\n");
1234                 RETURN(-EPROTO);
1235         }
1236
1237         /* set/clear over quota flag for a uid/gid */
1238         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1239             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1240                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1241                              body->oa.o_gid, body->oa.o_valid,
1242                              body->oa.o_flags);
1243
1244         if (rc < 0)
1245                 RETURN(rc);
1246
1247         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1248                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1249
1250         osc_update_grant(cli, body);
1251
1252         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1253                 if (rc > 0) {
1254                         CERROR("Unexpected +ve rc %d\n", rc);
1255                         RETURN(-EPROTO);
1256                 }
1257                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1258
1259                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1260                              client_cksum &&
1261                              check_write_checksum(&body->oa, peer, client_cksum,
1262                                                   body->oa.o_cksum,
1263                                                   aa->aa_requested_nob,
1264                                                   aa->aa_page_count,
1265                                                   aa->aa_ppga)))
1266                         RETURN(-EAGAIN);
1267
1268                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1269                         RETURN(-EAGAIN);
1270
1271                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1272                                      aa->aa_page_count, aa->aa_ppga);
1273                 GOTO(out, rc);
1274         }
1275
1276         /* The rest of this function executes only for OST_READs */
1277         if (rc > aa->aa_requested_nob) {
1278                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1279                        aa->aa_requested_nob);
1280                 RETURN(-EPROTO);
1281         }
1282
1283         if (rc != req->rq_bulk->bd_nob_transferred) {
1284                 CERROR ("Unexpected rc %d (%d transferred)\n",
1285                         rc, req->rq_bulk->bd_nob_transferred);
1286                 return (-EPROTO);
1287         }
1288
1289         if (rc < aa->aa_requested_nob)
1290                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1291
1292         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1293                                          aa->aa_ppga))
1294                 GOTO(out, rc = -EAGAIN);
1295
1296         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1297                 static int cksum_counter;
1298                 __u32      server_cksum = body->oa.o_cksum;
1299                 char      *via;
1300                 char      *router;
1301
1302                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1303                                                  aa->aa_ppga, OST_READ);
1304
1305                 if (peer->nid == req->rq_bulk->bd_sender) {
1306                         via = router = "";
1307                 } else {
1308                         via = " via ";
1309                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1310                 }
1311
1312                 if (server_cksum == ~0 && rc > 0) {
1313                         CERROR("Protocol error: server %s set the 'checksum' "
1314                                "bit, but didn't send a checksum.  Not fatal, "
1315                                "but please tell CFS.\n",
1316                                libcfs_nid2str(peer->nid));
1317                 } else if (server_cksum != client_cksum) {
1318                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1319                                            "%s%s%s inum "LPU64"/"LPU64" object "
1320                                            LPU64"/"LPU64" extent "
1321                                            "["LPU64"-"LPU64"]\n",
1322                                            req->rq_import->imp_obd->obd_name,
1323                                            libcfs_nid2str(peer->nid),
1324                                            via, router,
1325                                            body->oa.o_valid & OBD_MD_FLFID ?
1326                                                 body->oa.o_fid : (__u64)0,
1327                                            body->oa.o_valid & OBD_MD_FLFID ?
1328                                                 body->oa.o_generation :(__u64)0,
1329                                            body->oa.o_id,
1330                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1331                                                 body->oa.o_gr : (__u64)0,
1332                                            aa->aa_ppga[0]->off,
1333                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1334                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1335                                                                         1);
1336                         CERROR("client %x, server %x\n",
1337                                client_cksum, server_cksum);
1338                         cksum_counter = 0;
1339                         aa->aa_oa->o_cksum = client_cksum;
1340                         rc = -EAGAIN;
1341                 } else {
1342                         cksum_counter++;
1343                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1344                         rc = 0;
1345                 }
1346         } else if (unlikely(client_cksum)) {
1347                 static int cksum_missed;
1348
1349                 cksum_missed++;
1350                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1351                         CERROR("Checksum %u requested from %s but not sent\n",
1352                                cksum_missed, libcfs_nid2str(peer->nid));
1353         } else {
1354                 rc = 0;
1355         }
1356 out:
1357         if (rc >= 0)
1358                 *aa->aa_oa = body->oa;
1359
1360         RETURN(rc);
1361 }
1362
1363 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1364                             struct lov_stripe_md *lsm,
1365                             obd_count page_count, struct brw_page **pga,
1366                             struct obd_capa *ocapa)
1367 {
1368         struct ptlrpc_request *req;
1369         int                    rc;
1370         cfs_waitq_t            waitq;
1371         int                    resends = 0;
1372         struct l_wait_info     lwi;
1373
1374         ENTRY;
1375
1376         cfs_waitq_init(&waitq);
1377
1378 restart_bulk:
1379         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1380                                   page_count, pga, &req, ocapa);
1381         if (rc != 0)
1382                 return (rc);
1383
1384         rc = ptlrpc_queue_wait(req);
1385
1386         if (rc == -ETIMEDOUT && req->rq_resend) {
1387                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1388                 ptlrpc_req_finished(req);
1389                 goto restart_bulk;
1390         }
1391
1392         rc = osc_brw_fini_request(req, rc);
1393
1394         ptlrpc_req_finished(req);
1395         if (osc_recoverable_error(rc)) {
1396                 resends++;
1397                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1398                         CERROR("too many resend retries, returning error\n");
1399                         RETURN(-EIO);
1400                 }
1401
1402                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1403                 l_wait_event(waitq, 0, &lwi);
1404
1405                 goto restart_bulk;
1406         }
1407         
1408         RETURN (rc);
1409 }
1410
1411 int osc_brw_redo_request(struct ptlrpc_request *request,
1412                          struct osc_brw_async_args *aa)
1413 {
1414         struct ptlrpc_request *new_req;
1415         struct ptlrpc_request_set *set = request->rq_set;
1416         struct osc_brw_async_args *new_aa;
1417         struct osc_async_page *oap;
1418         int rc = 0;
1419         ENTRY;
1420
1421         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1422                 CERROR("too many resend retries, returning error\n");
1423                 RETURN(-EIO);
1424         }
1425         
1426         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1427 /*
1428         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1429         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1430                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1431                                            REQ_REC_OFF + 3);
1432 */
1433         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1434                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1435                                   aa->aa_cli, aa->aa_oa,
1436                                   NULL /* lsm unused by osc currently */,
1437                                   aa->aa_page_count, aa->aa_ppga, 
1438                                   &new_req, NULL /* ocapa */);
1439         if (rc)
1440                 RETURN(rc);
1441
1442         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1443    
1444         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1445                 if (oap->oap_request != NULL) {
1446                         LASSERTF(request == oap->oap_request,
1447                                  "request %p != oap_request %p\n",
1448                                  request, oap->oap_request);
1449                         if (oap->oap_interrupted) {
1450                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1451                                 ptlrpc_req_finished(new_req);                        
1452                                 RETURN(-EINTR);
1453                         }
1454                 }
1455         }
1456         /* New request takes over pga and oaps from old request.
1457          * Note that copying a list_head doesn't work, need to move it... */
1458         aa->aa_resends++;
1459         new_req->rq_interpret_reply = request->rq_interpret_reply;
1460         new_req->rq_async_args = request->rq_async_args;
1461         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1462
1463         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1464
1465         INIT_LIST_HEAD(&new_aa->aa_oaps);
1466         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1467         INIT_LIST_HEAD(&aa->aa_oaps);
1468
1469         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1470                 if (oap->oap_request) {
1471                         ptlrpc_req_finished(oap->oap_request);
1472                         oap->oap_request = ptlrpc_request_addref(new_req);
1473                 }
1474         }
1475
1476         /* use ptlrpc_set_add_req is safe because interpret functions work 
1477          * in check_set context. only one way exist with access to request 
1478          * from different thread got -EINTR - this way protected with 
1479          * cl_loi_list_lock */
1480         ptlrpc_set_add_req(set, new_req);
1481
1482         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1483
1484         DEBUG_REQ(D_INFO, new_req, "new request");
1485         RETURN(0);
1486 }
1487
1488 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1489 {
1490         struct osc_brw_async_args *aa = data;
1491         int                        i;
1492         ENTRY;
1493
1494         rc = osc_brw_fini_request(req, rc);
1495         if (osc_recoverable_error(rc)) {
1496                 rc = osc_brw_redo_request(req, aa);
1497                 if (rc == 0)
1498                         RETURN(0);
1499         }
1500
1501         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1502         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1503                 aa->aa_cli->cl_w_in_flight--;
1504         else
1505                 aa->aa_cli->cl_r_in_flight--;
1506         for (i = 0; i < aa->aa_page_count; i++)
1507                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1508         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1509
1510         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1511
1512         RETURN(rc);
1513 }
1514
1515 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1516                           struct lov_stripe_md *lsm, obd_count page_count,
1517                           struct brw_page **pga, struct ptlrpc_request_set *set,
1518                           struct obd_capa *ocapa)
1519 {
1520         struct ptlrpc_request     *req;
1521         struct client_obd         *cli = &exp->exp_obd->u.cli;
1522         int                        rc, i;
1523         struct osc_brw_async_args *aa;
1524         ENTRY;
1525
1526         /* Consume write credits even if doing a sync write -
1527          * otherwise we may run out of space on OST due to grant. */
1528         if (cmd == OBD_BRW_WRITE) {
1529                 spin_lock(&cli->cl_loi_list_lock);
1530                 for (i = 0; i < page_count; i++) {
1531                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1532                                 osc_consume_write_grant(cli, pga[i]);
1533                 }
1534                 spin_unlock(&cli->cl_loi_list_lock);
1535         }
1536
1537         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1538                                   &req, ocapa);
1539
1540         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1541         if (cmd == OBD_BRW_READ) {
1542                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1543                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1544                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1545         } else {
1546                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1547                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1548                                  cli->cl_w_in_flight);
1549                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1550         }
1551
1552         if (rc == 0) {
1553                 req->rq_interpret_reply = brw_interpret;
1554                 ptlrpc_set_add_req(set, req);
1555                 client_obd_list_lock(&cli->cl_loi_list_lock);
1556                 if (cmd == OBD_BRW_READ)
1557                         cli->cl_r_in_flight++;
1558                 else
1559                         cli->cl_w_in_flight++;
1560                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1561         } else if (cmd == OBD_BRW_WRITE) {
1562                 client_obd_list_lock(&cli->cl_loi_list_lock);
1563                 for (i = 0; i < page_count; i++)
1564                         osc_release_write_grant(cli, pga[i], 0);
1565                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1566         }
1567         RETURN (rc);
1568 }
1569
1570 /*
1571  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1572  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1573  * fine for our small page arrays and doesn't require allocation.  its an
1574  * insertion sort that swaps elements that are strides apart, shrinking the
1575  * stride down until its '1' and the array is sorted.
1576  */
1577 static void sort_brw_pages(struct brw_page **array, int num)
1578 {
1579         int stride, i, j;
1580         struct brw_page *tmp;
1581
1582         if (num == 1)
1583                 return;
1584         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1585                 ;
1586
1587         do {
1588                 stride /= 3;
1589                 for (i = stride ; i < num ; i++) {
1590                         tmp = array[i];
1591                         j = i;
1592                         while (j >= stride && array[j - stride]->off > tmp->off) {
1593                                 array[j] = array[j - stride];
1594                                 j -= stride;
1595                         }
1596                         array[j] = tmp;
1597                 }
1598         } while (stride > 1);
1599 }
1600
1601 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1602 {
1603         int count = 1;
1604         int offset;
1605         int i = 0;
1606
1607         LASSERT (pages > 0);
1608         offset = pg[i]->off & ~CFS_PAGE_MASK;
1609
1610         for (;;) {
1611                 pages--;
1612                 if (pages == 0)         /* that's all */
1613                         return count;
1614
1615                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1616                         return count;   /* doesn't end on page boundary */
1617
1618                 i++;
1619                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1620                 if (offset != 0)        /* doesn't start on page boundary */
1621                         return count;
1622
1623                 count++;
1624         }
1625 }
1626
1627 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1628 {
1629         struct brw_page **ppga;
1630         int i;
1631
1632         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1633         if (ppga == NULL)
1634                 return NULL;
1635
1636         for (i = 0; i < count; i++)
1637                 ppga[i] = pga + i;
1638         return ppga;
1639 }
1640
1641 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1642 {
1643         LASSERT(ppga != NULL);
1644         OBD_FREE(ppga, sizeof(*ppga) * count);
1645 }
1646
1647 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1648                    obd_count page_count, struct brw_page *pga,
1649                    struct obd_trans_info *oti)
1650 {
1651         struct obdo *saved_oa = NULL;
1652         struct brw_page **ppga, **orig;
1653         struct obd_import *imp = class_exp2cliimp(exp);
1654         struct client_obd *cli = &imp->imp_obd->u.cli;
1655         int rc, page_count_orig;
1656         ENTRY;
1657
1658         if (cmd & OBD_BRW_CHECK) {
1659                 /* The caller just wants to know if there's a chance that this
1660                  * I/O can succeed */
1661
1662                 if (imp == NULL || imp->imp_invalid)
1663                         RETURN(-EIO);
1664                 RETURN(0);
1665         }
1666
1667         /* test_brw with a failed create can trip this, maybe others. */
1668         LASSERT(cli->cl_max_pages_per_rpc);
1669
1670         rc = 0;
1671
1672         orig = ppga = osc_build_ppga(pga, page_count);
1673         if (ppga == NULL)
1674                 RETURN(-ENOMEM);
1675         page_count_orig = page_count;
1676
1677         sort_brw_pages(ppga, page_count);
1678         while (page_count) {
1679                 obd_count pages_per_brw;
1680
1681                 if (page_count > cli->cl_max_pages_per_rpc)
1682                         pages_per_brw = cli->cl_max_pages_per_rpc;
1683                 else
1684                         pages_per_brw = page_count;
1685
1686                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1687
1688                 if (saved_oa != NULL) {
1689                         /* restore previously saved oa */
1690                         *oinfo->oi_oa = *saved_oa;
1691                 } else if (page_count > pages_per_brw) {
1692                         /* save a copy of oa (brw will clobber it) */
1693                         OBDO_ALLOC(saved_oa);
1694                         if (saved_oa == NULL)
1695                                 GOTO(out, rc = -ENOMEM);
1696                         *saved_oa = *oinfo->oi_oa;
1697                 }
1698
1699                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1700                                       pages_per_brw, ppga, oinfo->oi_capa);
1701
1702                 if (rc != 0)
1703                         break;
1704
1705                 page_count -= pages_per_brw;
1706                 ppga += pages_per_brw;
1707         }
1708
1709 out:
1710         osc_release_ppga(orig, page_count_orig);
1711
1712         if (saved_oa != NULL)
1713                 OBDO_FREE(saved_oa);
1714
1715         RETURN(rc);
1716 }
1717
1718 static int osc_brw_async(int cmd, struct obd_export *exp,
1719                          struct obd_info *oinfo, obd_count page_count,
1720                          struct brw_page *pga, struct obd_trans_info *oti,
1721                          struct ptlrpc_request_set *set)
1722 {
1723         struct brw_page **ppga, **orig;
1724         struct client_obd *cli = &exp->exp_obd->u.cli;
1725         int page_count_orig;
1726         int rc = 0;
1727         ENTRY;
1728
1729         if (cmd & OBD_BRW_CHECK) {
1730                 struct obd_import *imp = class_exp2cliimp(exp);
1731                 /* The caller just wants to know if there's a chance that this
1732                  * I/O can succeed */
1733
1734                 if (imp == NULL || imp->imp_invalid)
1735                         RETURN(-EIO);
1736                 RETURN(0);
1737         }
1738
1739         orig = ppga = osc_build_ppga(pga, page_count);
1740         if (ppga == NULL)
1741                 RETURN(-ENOMEM);
1742         page_count_orig = page_count;
1743
1744         sort_brw_pages(ppga, page_count);
1745         while (page_count) {
1746                 struct brw_page **copy;
1747                 obd_count pages_per_brw;
1748
1749                 pages_per_brw = min_t(obd_count, page_count,
1750                                       cli->cl_max_pages_per_rpc);
1751
1752                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1753
1754                 /* use ppga only if single RPC is going to fly */
1755                 if (pages_per_brw != page_count_orig || ppga != orig) {
1756                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1757                         if (copy == NULL)
1758                                 GOTO(out, rc = -ENOMEM);
1759                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1760                 } else
1761                         copy = ppga;
1762
1763                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1764                                     pages_per_brw, copy, set, oinfo->oi_capa);
1765
1766                 if (rc != 0) {
1767                         if (copy != ppga)
1768                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1769                         break;
1770                 }
1771                 if (copy == orig) {
1772                         /* we passed it to async_internal() which is
1773                          * now responsible for releasing memory */
1774                         orig = NULL;
1775                 }
1776
1777                 page_count -= pages_per_brw;
1778                 ppga += pages_per_brw;
1779         }
1780 out:
1781         if (orig)
1782                 osc_release_ppga(orig, page_count_orig);
1783         RETURN(rc);
1784 }
1785
1786 static void osc_check_rpcs(struct client_obd *cli);
1787
1788 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1789  * the dirty accounting.  Writeback completes or truncate happens before
1790  * writing starts.  Must be called with the loi lock held. */
1791 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1792                            int sent)
1793 {
1794         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1795 }
1796
1797
1798 /* This maintains the lists of pending pages to read/write for a given object
1799  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1800  * to quickly find objects that are ready to send an RPC. */
1801 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1802                          int cmd)
1803 {
1804         int optimal;
1805         ENTRY;
1806
1807         if (lop->lop_num_pending == 0)
1808                 RETURN(0);
1809
1810         /* if we have an invalid import we want to drain the queued pages
1811          * by forcing them through rpcs that immediately fail and complete
1812          * the pages.  recovery relies on this to empty the queued pages
1813          * before canceling the locks and evicting down the llite pages */
1814         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1815                 RETURN(1);
1816
1817         /* stream rpcs in queue order as long as as there is an urgent page
1818          * queued.  this is our cheap solution for good batching in the case
1819          * where writepage marks some random page in the middle of the file
1820          * as urgent because of, say, memory pressure */
1821         if (!list_empty(&lop->lop_urgent)) {
1822                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1823                 RETURN(1);
1824         }
1825         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1826         optimal = cli->cl_max_pages_per_rpc;
1827         if (cmd & OBD_BRW_WRITE) {
1828                 /* trigger a write rpc stream as long as there are dirtiers
1829                  * waiting for space.  as they're waiting, they're not going to
1830                  * create more pages to coallesce with what's waiting.. */
1831                 if (!list_empty(&cli->cl_cache_waiters)) {
1832                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1833                         RETURN(1);
1834                 }
1835                 /* +16 to avoid triggering rpcs that would want to include pages
1836                  * that are being queued but which can't be made ready until
1837                  * the queuer finishes with the page. this is a wart for
1838                  * llite::commit_write() */
1839                 optimal += 16;
1840         }
1841         if (lop->lop_num_pending >= optimal)
1842                 RETURN(1);
1843
1844         RETURN(0);
1845 }
1846
1847 static void on_list(struct list_head *item, struct list_head *list,
1848                     int should_be_on)
1849 {
1850         if (list_empty(item) && should_be_on)
1851                 list_add_tail(item, list);
1852         else if (!list_empty(item) && !should_be_on)
1853                 list_del_init(item);
1854 }
1855
1856 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1857  * can find pages to build into rpcs quickly */
1858 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1859 {
1860         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1861                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1862                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1863
1864         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1865                 loi->loi_write_lop.lop_num_pending);
1866
1867         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1868                 loi->loi_read_lop.lop_num_pending);
1869 }
1870
1871 static void lop_update_pending(struct client_obd *cli,
1872                                struct loi_oap_pages *lop, int cmd, int delta)
1873 {
1874         lop->lop_num_pending += delta;
1875         if (cmd & OBD_BRW_WRITE)
1876                 cli->cl_pending_w_pages += delta;
1877         else
1878                 cli->cl_pending_r_pages += delta;
1879 }
1880
1881 /* this is called when a sync waiter receives an interruption.  Its job is to
1882  * get the caller woken as soon as possible.  If its page hasn't been put in an
1883  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1884  * desiring interruption which will forcefully complete the rpc once the rpc
1885  * has timed out */
1886 static void osc_occ_interrupted(struct oig_callback_context *occ)
1887 {
1888         struct osc_async_page *oap;
1889         struct loi_oap_pages *lop;
1890         struct lov_oinfo *loi;
1891         ENTRY;
1892
1893         /* XXX member_of() */
1894         oap = list_entry(occ, struct osc_async_page, oap_occ);
1895
1896         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1897
1898         oap->oap_interrupted = 1;
1899
1900         /* ok, it's been put in an rpc. only one oap gets a request reference */
1901         if (oap->oap_request != NULL) {
1902                 ptlrpc_mark_interrupted(oap->oap_request);
1903                 ptlrpcd_wake(oap->oap_request);
1904                 GOTO(unlock, 0);
1905         }
1906
1907         /* we don't get interruption callbacks until osc_trigger_group_io()
1908          * has been called and put the sync oaps in the pending/urgent lists.*/
1909         if (!list_empty(&oap->oap_pending_item)) {
1910                 list_del_init(&oap->oap_pending_item);
1911                 list_del_init(&oap->oap_urgent_item);
1912
1913                 loi = oap->oap_loi;
1914                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1915                         &loi->loi_write_lop : &loi->loi_read_lop;
1916                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1917                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1918
1919                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1920                 oap->oap_oig = NULL;
1921         }
1922
1923 unlock:
1924         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1925 }
1926
1927 /* this is trying to propogate async writeback errors back up to the
1928  * application.  As an async write fails we record the error code for later if
1929  * the app does an fsync.  As long as errors persist we force future rpcs to be
1930  * sync so that the app can get a sync error and break the cycle of queueing
1931  * pages for which writeback will fail. */
1932 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1933                            int rc)
1934 {
1935         if (rc) {
1936                 if (!ar->ar_rc)
1937                         ar->ar_rc = rc;
1938
1939                 ar->ar_force_sync = 1;
1940                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1941                 return;
1942
1943         }
1944
1945         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1946                 ar->ar_force_sync = 0;
1947 }
1948
1949 static void osc_oap_to_pending(struct osc_async_page *oap)
1950 {
1951         struct loi_oap_pages *lop;
1952
1953         if (oap->oap_cmd & OBD_BRW_WRITE)
1954                 lop = &oap->oap_loi->loi_write_lop;
1955         else
1956                 lop = &oap->oap_loi->loi_read_lop;
1957
1958         if (oap->oap_async_flags & ASYNC_URGENT)
1959                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1960         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1961         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1962 }
1963
1964 /* this must be called holding the loi list lock to give coverage to exit_cache,
1965  * async_flag maintenance, and oap_request */
1966 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1967                               struct osc_async_page *oap, int sent, int rc)
1968 {
1969         __u64 xid = 0;
1970
1971         ENTRY;
1972         if (oap->oap_request != NULL) {
1973                 xid = ptlrpc_req_xid(oap->oap_request);
1974                 ptlrpc_req_finished(oap->oap_request);
1975                 oap->oap_request = NULL;
1976         }
1977
1978         oap->oap_async_flags = 0;
1979         oap->oap_interrupted = 0;
1980
1981         if (oap->oap_cmd & OBD_BRW_WRITE) {
1982                 osc_process_ar(&cli->cl_ar, xid, rc);
1983                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1984         }
1985
1986         if (rc == 0 && oa != NULL) {
1987                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1988                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1989                 if (oa->o_valid & OBD_MD_FLMTIME)
1990                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1991                 if (oa->o_valid & OBD_MD_FLATIME)
1992                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1993                 if (oa->o_valid & OBD_MD_FLCTIME)
1994                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1995         }
1996
1997         if (oap->oap_oig) {
1998                 osc_exit_cache(cli, oap, sent);
1999                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2000                 oap->oap_oig = NULL;
2001                 EXIT;
2002                 return;
2003         }
2004
2005         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2006                                                 oap->oap_cmd, oa, rc);
2007
2008         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2009          * I/O on the page could start, but OSC calls it under lock
2010          * and thus we can add oap back to pending safely */
2011         if (rc)
2012                 /* upper layer wants to leave the page on pending queue */
2013                 osc_oap_to_pending(oap);
2014         else
2015                 osc_exit_cache(cli, oap, sent);
2016         EXIT;
2017 }
2018
2019 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2020 {
2021         struct osc_async_page *oap, *tmp;
2022         struct osc_brw_async_args *aa = data;
2023         struct client_obd *cli;
2024         ENTRY;
2025
2026         rc = osc_brw_fini_request(req, rc);
2027         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2028         if (osc_recoverable_error(rc)) {
2029                 rc = osc_brw_redo_request(req, aa);
2030                 if (rc == 0)
2031                         RETURN(0);
2032         }
2033
2034         cli = aa->aa_cli;
2035
2036         client_obd_list_lock(&cli->cl_loi_list_lock);
2037
2038         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2039          * is called so we know whether to go to sync BRWs or wait for more
2040          * RPCs to complete */
2041         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2042                 cli->cl_w_in_flight--;
2043         else
2044                 cli->cl_r_in_flight--;
2045
2046         /* the caller may re-use the oap after the completion call so
2047          * we need to clean it up a little */
2048         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2049                 list_del_init(&oap->oap_rpc_item);
2050                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2051         }
2052
2053         osc_wake_cache_waiters(cli);
2054         osc_check_rpcs(cli);
2055
2056         client_obd_list_unlock(&cli->cl_loi_list_lock);
2057
2058         OBDO_FREE(aa->aa_oa);
2059         
2060         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2061         RETURN(rc);
2062 }
2063
2064 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2065                                             struct list_head *rpc_list,
2066                                             int page_count, int cmd)
2067 {
2068         struct ptlrpc_request *req;
2069         struct brw_page **pga = NULL;
2070         struct osc_brw_async_args *aa;
2071         struct obdo *oa = NULL;
2072         struct obd_async_page_ops *ops = NULL;
2073         void *caller_data = NULL;
2074         struct obd_capa *ocapa;
2075         struct osc_async_page *oap;
2076         int i, rc;
2077
2078         ENTRY;
2079         LASSERT(!list_empty(rpc_list));
2080
2081         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2082         if (pga == NULL)
2083                 RETURN(ERR_PTR(-ENOMEM));
2084
2085         OBDO_ALLOC(oa);
2086         if (oa == NULL)
2087                 GOTO(out, req = ERR_PTR(-ENOMEM));
2088
2089         i = 0;
2090         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2091                 if (ops == NULL) {
2092                         ops = oap->oap_caller_ops;
2093                         caller_data = oap->oap_caller_data;
2094                 }
2095                 pga[i] = &oap->oap_brw_page;
2096                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2097                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2098                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2099                 i++;
2100         }
2101
2102         /* always get the data for the obdo for the rpc */
2103         LASSERT(ops != NULL);
2104         ops->ap_fill_obdo(caller_data, cmd, oa);
2105         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2106
2107         sort_brw_pages(pga, page_count);
2108         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2109                                   pga, &req, ocapa);
2110         capa_put(ocapa);
2111         if (rc != 0) {
2112                 CERROR("prep_req failed: %d\n", rc);
2113                 GOTO(out, req = ERR_PTR(rc));
2114         }
2115
2116         /* Need to update the timestamps after the request is built in case
2117          * we race with setattr (locally or in queue at OST).  If OST gets
2118          * later setattr before earlier BRW (as determined by the request xid),
2119          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2120          * way to do this in a single call.  bug 10150 */
2121         ops->ap_update_obdo(caller_data, cmd, oa,
2122                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2123
2124         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2125         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2126         INIT_LIST_HEAD(&aa->aa_oaps);
2127         list_splice(rpc_list, &aa->aa_oaps);
2128         INIT_LIST_HEAD(rpc_list);
2129
2130 out:
2131         if (IS_ERR(req)) {
2132                 if (oa)
2133                         OBDO_FREE(oa);
2134                 if (pga)
2135                         OBD_FREE(pga, sizeof(*pga) * page_count);
2136         }
2137         RETURN(req);
2138 }
2139
2140 /* the loi lock is held across this function but it's allowed to release
2141  * and reacquire it during its work */
2142 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2143                             int cmd, struct loi_oap_pages *lop)
2144 {
2145         struct ptlrpc_request *req;
2146         obd_count page_count = 0;
2147         struct osc_async_page *oap = NULL, *tmp;
2148         struct osc_brw_async_args *aa;
2149         struct obd_async_page_ops *ops;
2150         CFS_LIST_HEAD(rpc_list);
2151         unsigned int ending_offset;
2152         unsigned  starting_offset = 0;
2153         ENTRY;
2154
2155         /* first we find the pages we're allowed to work with */
2156         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2157                                  oap_pending_item) {
2158                 ops = oap->oap_caller_ops;
2159
2160                 LASSERT(oap->oap_magic == OAP_MAGIC);
2161
2162                 /* in llite being 'ready' equates to the page being locked
2163                  * until completion unlocks it.  commit_write submits a page
2164                  * as not ready because its unlock will happen unconditionally
2165                  * as the call returns.  if we race with commit_write giving
2166                  * us that page we dont' want to create a hole in the page
2167                  * stream, so we stop and leave the rpc to be fired by
2168                  * another dirtier or kupdated interval (the not ready page
2169                  * will still be on the dirty list).  we could call in
2170                  * at the end of ll_file_write to process the queue again. */
2171                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2172                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2173                         if (rc < 0)
2174                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2175                                                 "instead of ready\n", oap,
2176                                                 oap->oap_page, rc);
2177                         switch (rc) {
2178                         case -EAGAIN:
2179                                 /* llite is telling us that the page is still
2180                                  * in commit_write and that we should try
2181                                  * and put it in an rpc again later.  we
2182                                  * break out of the loop so we don't create
2183                                  * a hole in the sequence of pages in the rpc
2184                                  * stream.*/
2185                                 oap = NULL;
2186                                 break;
2187                         case -EINTR:
2188                                 /* the io isn't needed.. tell the checks
2189                                  * below to complete the rpc with EINTR */
2190                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2191                                 oap->oap_count = -EINTR;
2192                                 break;
2193                         case 0:
2194                                 oap->oap_async_flags |= ASYNC_READY;
2195                                 break;
2196                         default:
2197                                 LASSERTF(0, "oap %p page %p returned %d "
2198                                             "from make_ready\n", oap,
2199                                             oap->oap_page, rc);
2200                                 break;
2201                         }
2202                 }
2203                 if (oap == NULL)
2204                         break;
2205                 /*
2206                  * Page submitted for IO has to be locked. Either by
2207                  * ->ap_make_ready() or by higher layers.
2208                  *
2209                  * XXX nikita: this assertion should be adjusted when lustre
2210                  * starts using PG_writeback for pages being written out.
2211                  */
2212 #if defined(__KERNEL__) && defined(__linux__)
2213                 LASSERT(PageLocked(oap->oap_page));
2214 #endif
2215                 /* If there is a gap at the start of this page, it can't merge
2216                  * with any previous page, so we'll hand the network a
2217                  * "fragmented" page array that it can't transfer in 1 RDMA */
2218                 if (page_count != 0 && oap->oap_page_off != 0)
2219                         break;
2220
2221                 /* take the page out of our book-keeping */
2222                 list_del_init(&oap->oap_pending_item);
2223                 lop_update_pending(cli, lop, cmd, -1);
2224                 list_del_init(&oap->oap_urgent_item);
2225
2226                 if (page_count == 0)
2227                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2228                                           (PTLRPC_MAX_BRW_SIZE - 1);
2229
2230                 /* ask the caller for the size of the io as the rpc leaves. */
2231                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2232                         oap->oap_count =
2233                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2234                 if (oap->oap_count <= 0) {
2235                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2236                                oap->oap_count);
2237                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2238                         continue;
2239                 }
2240
2241                 /* now put the page back in our accounting */
2242                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2243                 if (++page_count >= cli->cl_max_pages_per_rpc)
2244                         break;
2245
2246                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2247                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2248                  * have the same alignment as the initial writes that allocated
2249                  * extents on the server. */
2250                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2251                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2252                 if (ending_offset == 0)
2253                         break;
2254
2255                 /* If there is a gap at the end of this page, it can't merge
2256                  * with any subsequent pages, so we'll hand the network a
2257                  * "fragmented" page array that it can't transfer in 1 RDMA */
2258                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2259                         break;
2260         }
2261
2262         osc_wake_cache_waiters(cli);
2263
2264         if (page_count == 0)
2265                 RETURN(0);
2266
2267         loi_list_maint(cli, loi);
2268
2269         client_obd_list_unlock(&cli->cl_loi_list_lock);
2270
2271         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2272         if (IS_ERR(req)) {
2273                 /* this should happen rarely and is pretty bad, it makes the
2274                  * pending list not follow the dirty order */
2275                 client_obd_list_lock(&cli->cl_loi_list_lock);
2276                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2277                         list_del_init(&oap->oap_rpc_item);
2278
2279                         /* queued sync pages can be torn down while the pages
2280                          * were between the pending list and the rpc */
2281                         if (oap->oap_interrupted) {
2282                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2283                                 osc_ap_completion(cli, NULL, oap, 0,
2284                                                   oap->oap_count);
2285                                 continue;
2286                         }
2287                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2288                 }
2289                 loi_list_maint(cli, loi);
2290                 RETURN(PTR_ERR(req));
2291         }
2292
2293         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2294
2295         if (cmd == OBD_BRW_READ) {
2296                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2297                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2298                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2299                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2300                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2301         } else {
2302                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2303                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2304                                  cli->cl_w_in_flight);
2305                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2306                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2307                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2308         }
2309
2310         client_obd_list_lock(&cli->cl_loi_list_lock);
2311
2312         if (cmd == OBD_BRW_READ)
2313                 cli->cl_r_in_flight++;
2314         else
2315                 cli->cl_w_in_flight++;
2316
2317         /* queued sync pages can be torn down while the pages
2318          * were between the pending list and the rpc */
2319         tmp = NULL;
2320         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2321                 /* only one oap gets a request reference */
2322                 if (tmp == NULL)
2323                         tmp = oap;
2324                 if (oap->oap_interrupted && !req->rq_intr) {
2325                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2326                                oap, req);
2327                         ptlrpc_mark_interrupted(req);
2328                 }
2329         }
2330         if (tmp != NULL)
2331                 tmp->oap_request = ptlrpc_request_addref(req);
2332
2333         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2334                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2335
2336         req->rq_interpret_reply = brw_interpret_oap;
2337         ptlrpcd_add_req(req);
2338         RETURN(1);
2339 }
2340
2341 #define LOI_DEBUG(LOI, STR, args...)                                     \
2342         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2343                !list_empty(&(LOI)->loi_cli_item),                        \
2344                (LOI)->loi_write_lop.lop_num_pending,                     \
2345                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2346                (LOI)->loi_read_lop.lop_num_pending,                      \
2347                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2348                args)                                                     \
2349
2350 /* This is called by osc_check_rpcs() to find which objects have pages that
2351  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2352 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2353 {
2354         ENTRY;
2355         /* first return all objects which we already know to have
2356          * pages ready to be stuffed into rpcs */
2357         if (!list_empty(&cli->cl_loi_ready_list))
2358                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2359                                   struct lov_oinfo, loi_cli_item));
2360
2361         /* then if we have cache waiters, return all objects with queued
2362          * writes.  This is especially important when many small files
2363          * have filled up the cache and not been fired into rpcs because
2364          * they don't pass the nr_pending/object threshhold */
2365         if (!list_empty(&cli->cl_cache_waiters) &&
2366             !list_empty(&cli->cl_loi_write_list))
2367                 RETURN(list_entry(cli->cl_loi_write_list.next,
2368                                   struct lov_oinfo, loi_write_item));
2369
2370         /* then return all queued objects when we have an invalid import
2371          * so that they get flushed */
2372         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2373                 if (!list_empty(&cli->cl_loi_write_list))
2374                         RETURN(list_entry(cli->cl_loi_write_list.next,
2375                                           struct lov_oinfo, loi_write_item));
2376                 if (!list_empty(&cli->cl_loi_read_list))
2377                         RETURN(list_entry(cli->cl_loi_read_list.next,
2378                                           struct lov_oinfo, loi_read_item));
2379         }
2380         RETURN(NULL);
2381 }
2382
2383 /* called with the loi list lock held */
2384 static void osc_check_rpcs(struct client_obd *cli)
2385 {
2386         struct lov_oinfo *loi;
2387         int rc = 0, race_counter = 0;
2388         ENTRY;
2389
2390         while ((loi = osc_next_loi(cli)) != NULL) {
2391                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2392
2393                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2394                         break;
2395
2396                 /* attempt some read/write balancing by alternating between
2397                  * reads and writes in an object.  The makes_rpc checks here
2398                  * would be redundant if we were getting read/write work items
2399                  * instead of objects.  we don't want send_oap_rpc to drain a
2400                  * partial read pending queue when we're given this object to
2401                  * do io on writes while there are cache waiters */
2402                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2403                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2404                                               &loi->loi_write_lop);
2405                         if (rc < 0)
2406                                 break;
2407                         if (rc > 0)
2408                                 race_counter = 0;
2409                         else
2410                                 race_counter++;
2411                 }
2412                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2413                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2414                                               &loi->loi_read_lop);
2415                         if (rc < 0)
2416                                 break;
2417                         if (rc > 0)
2418                                 race_counter = 0;
2419                         else
2420                                 race_counter++;
2421                 }
2422
2423                 /* attempt some inter-object balancing by issueing rpcs
2424                  * for each object in turn */
2425                 if (!list_empty(&loi->loi_cli_item))
2426                         list_del_init(&loi->loi_cli_item);
2427                 if (!list_empty(&loi->loi_write_item))
2428                         list_del_init(&loi->loi_write_item);
2429                 if (!list_empty(&loi->loi_read_item))
2430                         list_del_init(&loi->loi_read_item);
2431
2432                 loi_list_maint(cli, loi);
2433
2434                 /* send_oap_rpc fails with 0 when make_ready tells it to
2435                  * back off.  llite's make_ready does this when it tries
2436                  * to lock a page queued for write that is already locked.
2437                  * we want to try sending rpcs from many objects, but we
2438                  * don't want to spin failing with 0.  */
2439                 if (race_counter == 10)
2440                         break;
2441         }
2442         EXIT;
2443 }
2444
2445 /* we're trying to queue a page in the osc so we're subject to the
2446  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2447  * If the osc's queued pages are already at that limit, then we want to sleep
2448  * until there is space in the osc's queue for us.  We also may be waiting for
2449  * write credits from the OST if there are RPCs in flight that may return some
2450  * before we fall back to sync writes.
2451  *
2452  * We need this know our allocation was granted in the presence of signals */
2453 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2454 {
2455         int rc;
2456         ENTRY;
2457         client_obd_list_lock(&cli->cl_loi_list_lock);
2458         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2459         client_obd_list_unlock(&cli->cl_loi_list_lock);
2460         RETURN(rc);
2461 };
2462
2463 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2464  * grant or cache space. */
2465 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2466                            struct osc_async_page *oap)
2467 {
2468         struct osc_cache_waiter ocw;
2469         struct l_wait_info lwi = { 0 };
2470
2471         ENTRY;
2472
2473         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2474                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2475                cli->cl_dirty_max, obd_max_dirty_pages,
2476                cli->cl_lost_grant, cli->cl_avail_grant);
2477
2478         /* force the caller to try sync io.  this can jump the list
2479          * of queued writes and create a discontiguous rpc stream */
2480         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2481             loi->loi_ar.ar_force_sync)
2482                 RETURN(-EDQUOT);
2483
2484         /* Hopefully normal case - cache space and write credits available */
2485         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2486             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2487             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2488                 /* account for ourselves */
2489                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2490                 RETURN(0);
2491         }
2492
2493         /* Make sure that there are write rpcs in flight to wait for.  This
2494          * is a little silly as this object may not have any pending but
2495          * other objects sure might. */
2496         if (cli->cl_w_in_flight) {
2497                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2498                 cfs_waitq_init(&ocw.ocw_waitq);
2499                 ocw.ocw_oap = oap;
2500                 ocw.ocw_rc = 0;
2501
2502                 loi_list_maint(cli, loi);
2503                 osc_check_rpcs(cli);
2504                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2505
2506                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2507                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2508
2509                 client_obd_list_lock(&cli->cl_loi_list_lock);
2510                 if (!list_empty(&ocw.ocw_entry)) {
2511                         list_del(&ocw.ocw_entry);
2512                         RETURN(-EINTR);
2513                 }
2514                 RETURN(ocw.ocw_rc);
2515         }
2516
2517         RETURN(-EDQUOT);
2518 }
2519
2520 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2521                         struct lov_oinfo *loi, cfs_page_t *page,
2522                         obd_off offset, struct obd_async_page_ops *ops,
2523                         void *data, void **res)
2524 {
2525         struct osc_async_page *oap;
2526         ENTRY;
2527
2528         if (!page)
2529                 return size_round(sizeof(*oap));
2530
2531         oap = *res;
2532         oap->oap_magic = OAP_MAGIC;
2533         oap->oap_cli = &exp->exp_obd->u.cli;
2534         oap->oap_loi = loi;
2535
2536         oap->oap_caller_ops = ops;
2537         oap->oap_caller_data = data;
2538
2539         oap->oap_page = page;
2540         oap->oap_obj_off = offset;
2541
2542         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2543         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2544         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2545
2546         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2547
2548         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2549         RETURN(0);
2550 }
2551
2552 struct osc_async_page *oap_from_cookie(void *cookie)
2553 {
2554         struct osc_async_page *oap = cookie;
2555         if (oap->oap_magic != OAP_MAGIC)
2556                 return ERR_PTR(-EINVAL);
2557         return oap;
2558 };
2559
2560 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2561                               struct lov_oinfo *loi, void *cookie,
2562                               int cmd, obd_off off, int count,
2563                               obd_flag brw_flags, enum async_flags async_flags)
2564 {
2565         struct client_obd *cli = &exp->exp_obd->u.cli;
2566         struct osc_async_page *oap;
2567         int rc = 0;
2568         ENTRY;
2569
2570         oap = oap_from_cookie(cookie);
2571         if (IS_ERR(oap))
2572                 RETURN(PTR_ERR(oap));
2573
2574         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2575                 RETURN(-EIO);
2576
2577         if (!list_empty(&oap->oap_pending_item) ||
2578             !list_empty(&oap->oap_urgent_item) ||
2579             !list_empty(&oap->oap_rpc_item))
2580                 RETURN(-EBUSY);
2581
2582         /* check if the file's owner/group is over quota */
2583 #ifdef HAVE_QUOTA_SUPPORT
2584         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2585                 struct obd_async_page_ops *ops;
2586                 struct obdo *oa;
2587
2588                 OBDO_ALLOC(oa);
2589                 if (oa == NULL)
2590                         RETURN(-ENOMEM);
2591
2592                 ops = oap->oap_caller_ops;
2593                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2594                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2595                     NO_QUOTA)
2596                         rc = -EDQUOT;
2597
2598                 OBDO_FREE(oa);
2599                 if (rc)
2600                         RETURN(rc);
2601         }
2602 #endif
2603
2604         if (loi == NULL)
2605                 loi = lsm->lsm_oinfo[0];
2606
2607         client_obd_list_lock(&cli->cl_loi_list_lock);
2608
2609         oap->oap_cmd = cmd;
2610         oap->oap_page_off = off;
2611         oap->oap_count = count;
2612         oap->oap_brw_flags = brw_flags;
2613         oap->oap_async_flags = async_flags;
2614
2615         if (cmd & OBD_BRW_WRITE) {
2616                 rc = osc_enter_cache(cli, loi, oap);
2617                 if (rc) {
2618                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2619                         RETURN(rc);
2620                 }
2621         }
2622
2623         osc_oap_to_pending(oap);
2624         loi_list_maint(cli, loi);
2625
2626         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2627                   cmd);
2628
2629         osc_check_rpcs(cli);
2630         client_obd_list_unlock(&cli->cl_loi_list_lock);
2631
2632         RETURN(0);
2633 }
2634
2635 /* aka (~was & now & flag), but this is more clear :) */
2636 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2637
2638 static int osc_set_async_flags(struct obd_export *exp,
2639                                struct lov_stripe_md *lsm,
2640                                struct lov_oinfo *loi, void *cookie,
2641                                obd_flag async_flags)
2642 {
2643         struct client_obd *cli = &exp->exp_obd->u.cli;
2644         struct loi_oap_pages *lop;
2645         struct osc_async_page *oap;
2646         int rc = 0;
2647         ENTRY;
2648
2649         oap = oap_from_cookie(cookie);
2650         if (IS_ERR(oap))
2651                 RETURN(PTR_ERR(oap));
2652
2653         /*
2654          * bug 7311: OST-side locking is only supported for liblustre for now
2655          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2656          * implementation has to handle case where OST-locked page was picked
2657          * up by, e.g., ->writepage().
2658          */
2659         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2660         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2661                                      * tread here. */
2662
2663         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2664                 RETURN(-EIO);
2665
2666         if (loi == NULL)
2667                 loi = lsm->lsm_oinfo[0];
2668
2669         if (oap->oap_cmd & OBD_BRW_WRITE) {
2670                 lop = &loi->loi_write_lop;
2671         } else {
2672                 lop = &loi->loi_read_lop;
2673         }
2674
2675         client_obd_list_lock(&cli->cl_loi_list_lock);
2676
2677         if (list_empty(&oap->oap_pending_item))
2678                 GOTO(out, rc = -EINVAL);
2679
2680         if ((oap->oap_async_flags & async_flags) == async_flags)
2681                 GOTO(out, rc = 0);
2682
2683         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2684                 oap->oap_async_flags |= ASYNC_READY;
2685
2686         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2687                 if (list_empty(&oap->oap_rpc_item)) {
2688                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2689                         loi_list_maint(cli, loi);
2690                 }
2691         }
2692
2693         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2694                         oap->oap_async_flags);
2695 out:
2696         osc_check_rpcs(cli);
2697         client_obd_list_unlock(&cli->cl_loi_list_lock);
2698         RETURN(rc);
2699 }
2700
2701 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2702                              struct lov_oinfo *loi,
2703                              struct obd_io_group *oig, void *cookie,
2704                              int cmd, obd_off off, int count,
2705                              obd_flag brw_flags,
2706                              obd_flag async_flags)
2707 {
2708         struct client_obd *cli = &exp->exp_obd->u.cli;
2709         struct osc_async_page *oap;
2710         struct loi_oap_pages *lop;
2711         int rc = 0;
2712         ENTRY;
2713
2714         oap = oap_from_cookie(cookie);
2715         if (IS_ERR(oap))
2716                 RETURN(PTR_ERR(oap));
2717
2718         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2719                 RETURN(-EIO);
2720
2721         if (!list_empty(&oap->oap_pending_item) ||
2722             !list_empty(&oap->oap_urgent_item) ||
2723             !list_empty(&oap->oap_rpc_item))
2724                 RETURN(-EBUSY);
2725
2726         if (loi == NULL)
2727                 loi = lsm->lsm_oinfo[0];
2728
2729         client_obd_list_lock(&cli->cl_loi_list_lock);
2730
2731         oap->oap_cmd = cmd;
2732         oap->oap_page_off = off;
2733         oap->oap_count = count;
2734         oap->oap_brw_flags = brw_flags;
2735         oap->oap_async_flags = async_flags;
2736
2737         if (cmd & OBD_BRW_WRITE)
2738                 lop = &loi->loi_write_lop;
2739         else
2740                 lop = &loi->loi_read_lop;
2741
2742         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2743         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2744                 oap->oap_oig = oig;
2745                 rc = oig_add_one(oig, &oap->oap_occ);
2746         }
2747
2748         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2749                   oap, oap->oap_page, rc);
2750
2751         client_obd_list_unlock(&cli->cl_loi_list_lock);
2752
2753         RETURN(rc);
2754 }
2755
2756 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2757                                  struct loi_oap_pages *lop, int cmd)
2758 {
2759         struct list_head *pos, *tmp;
2760         struct osc_async_page *oap;
2761
2762         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2763                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2764                 list_del(&oap->oap_pending_item);
2765                 osc_oap_to_pending(oap);
2766         }
2767         loi_list_maint(cli, loi);
2768 }
2769
2770 static int osc_trigger_group_io(struct obd_export *exp,
2771                                 struct lov_stripe_md *lsm,
2772                                 struct lov_oinfo *loi,
2773                                 struct obd_io_group *oig)
2774 {
2775         struct client_obd *cli = &exp->exp_obd->u.cli;
2776         ENTRY;
2777
2778         if (loi == NULL)
2779                 loi = lsm->lsm_oinfo[0];
2780
2781         client_obd_list_lock(&cli->cl_loi_list_lock);
2782
2783         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2784         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2785
2786         osc_check_rpcs(cli);
2787         client_obd_list_unlock(&cli->cl_loi_list_lock);
2788
2789         RETURN(0);
2790 }
2791
2792 static int osc_teardown_async_page(struct obd_export *exp,
2793                                    struct lov_stripe_md *lsm,
2794                                    struct lov_oinfo *loi, void *cookie)
2795 {
2796         struct client_obd *cli = &exp->exp_obd->u.cli;
2797         struct loi_oap_pages *lop;
2798         struct osc_async_page *oap;
2799         int rc = 0;
2800         ENTRY;
2801
2802         oap = oap_from_cookie(cookie);
2803         if (IS_ERR(oap))
2804                 RETURN(PTR_ERR(oap));
2805
2806         if (loi == NULL)
2807                 loi = lsm->lsm_oinfo[0];
2808
2809         if (oap->oap_cmd & OBD_BRW_WRITE) {
2810                 lop = &loi->loi_write_lop;
2811         } else {
2812                 lop = &loi->loi_read_lop;
2813         }
2814
2815         client_obd_list_lock(&cli->cl_loi_list_lock);
2816
2817         if (!list_empty(&oap->oap_rpc_item))
2818                 GOTO(out, rc = -EBUSY);
2819
2820         osc_exit_cache(cli, oap, 0);
2821         osc_wake_cache_waiters(cli);
2822
2823         if (!list_empty(&oap->oap_urgent_item)) {
2824                 list_del_init(&oap->oap_urgent_item);
2825                 oap->oap_async_flags &= ~ASYNC_URGENT;
2826         }
2827         if (!list_empty(&oap->oap_pending_item)) {
2828                 list_del_init(&oap->oap_pending_item);
2829                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2830         }
2831         loi_list_maint(cli, loi);
2832
2833         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2834 out:
2835         client_obd_list_unlock(&cli->cl_loi_list_lock);
2836         RETURN(rc);
2837 }
2838
2839 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2840                                     int flags)
2841 {
2842         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2843
2844         if (lock == NULL) {
2845                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2846                 return;
2847         }
2848         lock_res_and_lock(lock);
2849 #if defined (__KERNEL__) && defined (__linux__)
2850         /* Liang XXX: Darwin and Winnt checking should be added */
2851         if (lock->l_ast_data && lock->l_ast_data != data) {
2852                 struct inode *new_inode = data;
2853                 struct inode *old_inode = lock->l_ast_data;
2854                 if (!(old_inode->i_state & I_FREEING))
2855                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2856                 LASSERTF(old_inode->i_state & I_FREEING,
2857                          "Found existing inode %p/%lu/%u state %lu in lock: "
2858                          "setting data to %p/%lu/%u\n", old_inode,
2859                          old_inode->i_ino, old_inode->i_generation,
2860                          old_inode->i_state,
2861                          new_inode, new_inode->i_ino, new_inode->i_generation);
2862         }
2863 #endif
2864         lock->l_ast_data = data;
2865         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2866         unlock_res_and_lock(lock);
2867         LDLM_LOCK_PUT(lock);
2868 }
2869
2870 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2871                              ldlm_iterator_t replace, void *data)
2872 {
2873         struct ldlm_res_id res_id = { .name = {0} };
2874         struct obd_device *obd = class_exp2obd(exp);
2875
2876         res_id.name[0] = lsm->lsm_object_id;
2877         res_id.name[2] = lsm->lsm_object_gr;
2878
2879         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2880         return 0;
2881 }
2882
2883 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2884                             int intent, int rc)
2885 {
2886         ENTRY;
2887
2888         if (intent) {
2889                 /* The request was created before ldlm_cli_enqueue call. */
2890                 if (rc == ELDLM_LOCK_ABORTED) {
2891                         struct ldlm_reply *rep;
2892                         rep = req_capsule_server_get(&req->rq_pill,
2893                                                      &RMF_DLM_REP);
2894
2895                         LASSERT(rep != NULL);
2896                         if (rep->lock_policy_res1)
2897                                 rc = rep->lock_policy_res1;
2898                 }
2899         }
2900
2901         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2902                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2903                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2904                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2905                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2906         }
2907
2908         /* Call the update callback. */
2909         rc = oinfo->oi_cb_up(oinfo, rc);
2910         RETURN(rc);
2911 }
2912
2913 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2914                                  struct osc_enqueue_args *aa, int rc)
2915 {
2916         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2917         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2918         struct ldlm_lock *lock;
2919
2920         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2921          * be valid. */
2922         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2923
2924         /* Complete obtaining the lock procedure. */
2925         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2926                                    aa->oa_ei->ei_mode,
2927                                    &aa->oa_oi->oi_flags,
2928                                    &lsm->lsm_oinfo[0]->loi_lvb,
2929                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2930                                    lustre_swab_ost_lvb,
2931                                    aa->oa_oi->oi_lockh, rc);
2932
2933         /* Complete osc stuff. */
2934         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2935
2936         /* Release the lock for async request. */
2937         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2938                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2939
2940         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2941                  aa->oa_oi->oi_lockh, req, aa);
2942         LDLM_LOCK_PUT(lock);
2943         return rc;
2944 }
2945
2946 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2947  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2948  * other synchronous requests, however keeping some locks and trying to obtain
2949  * others may take a considerable amount of time in a case of ost failure; and
2950  * when other sync requests do not get released lock from a client, the client
2951  * is excluded from the cluster -- such scenarious make the life difficult, so
2952  * release locks just after they are obtained. */
2953 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2954                        struct ldlm_enqueue_info *einfo,
2955                        struct ptlrpc_request_set *rqset)
2956 {
2957         struct ldlm_res_id res_id = { .name = {0} };
2958         struct obd_device *obd = exp->exp_obd;
2959         struct ptlrpc_request *req = NULL;
2960         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2961         ldlm_mode_t mode;
2962         int rc;
2963         ENTRY;
2964
2965         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2966         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2967
2968         /* Filesystem lock extents are extended to page boundaries so that
2969          * dealing with the page cache is a little smoother.  */
2970         oinfo->oi_policy.l_extent.start -=
2971                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2972         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2973
2974         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2975                 goto no_match;
2976
2977         /* Next, search for already existing extent locks that will cover us */
2978         /* If we're trying to read, we also search for an existing PW lock.  The
2979          * VFS and page cache already protect us locally, so lots of readers/
2980          * writers can share a single PW lock.
2981          *
2982          * There are problems with conversion deadlocks, so instead of
2983          * converting a read lock to a write lock, we'll just enqueue a new
2984          * one.
2985          *
2986          * At some point we should cancel the read lock instead of making them
2987          * send us a blocking callback, but there are problems with canceling
2988          * locks out from other users right now, too. */
2989         mode = einfo->ei_mode;
2990         if (einfo->ei_mode == LCK_PR)
2991                 mode |= LCK_PW;
2992         mode = ldlm_lock_match(obd->obd_namespace,
2993                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2994                                einfo->ei_type, &oinfo->oi_policy, mode,
2995                                oinfo->oi_lockh);
2996         if (mode) {
2997                 /* addref the lock only if not async requests and PW lock is
2998                  * matched whereas we asked for PR. */
2999                 if (!rqset && einfo->ei_mode != mode)
3000                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3001                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3002                                         oinfo->oi_flags);
3003                 if (intent) {
3004                         /* I would like to be able to ASSERT here that rss <=
3005                          * kms, but I can't, for reasons which are explained in
3006                          * lov_enqueue() */
3007                 }
3008
3009                 /* We already have a lock, and it's referenced */
3010                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3011
3012                 /* For async requests, decref the lock. */
3013                 if (einfo->ei_mode != mode)
3014                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3015                 else if (rqset)
3016                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3017
3018                 RETURN(ELDLM_OK);
3019         }
3020
3021  no_match:
3022         if (intent) {
3023                 CFS_LIST_HEAD(cancels);
3024                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3025                                            &RQF_LDLM_ENQUEUE_LVB);
3026                 if (req == NULL)
3027                         RETURN(-ENOMEM);
3028
3029                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3030                 if (rc)
3031                         RETURN(rc);
3032
3033                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3034                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3035                 ptlrpc_request_set_replen(req);
3036         }
3037
3038         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3039         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3040
3041         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3042                               &oinfo->oi_policy, &oinfo->oi_flags,
3043                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3044                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3045                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3046                               rqset ? 1 : 0);
3047         if (rqset) {
3048                 if (!rc) {
3049                         struct osc_enqueue_args *aa;
3050                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3051                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3052                         aa->oa_oi = oinfo;
3053                         aa->oa_ei = einfo;
3054                         aa->oa_exp = exp;
3055
3056                         req->rq_interpret_reply = osc_enqueue_interpret;
3057                         ptlrpc_set_add_req(rqset, req);
3058                 } else if (intent) {
3059                         ptlrpc_req_finished(req);
3060                 }
3061                 RETURN(rc);
3062         }
3063
3064         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3065         if (intent)
3066                 ptlrpc_req_finished(req);
3067
3068         RETURN(rc);
3069 }
3070
3071 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3072                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3073                      int *flags, void *data, struct lustre_handle *lockh)
3074 {
3075         struct ldlm_res_id res_id = { .name = {0} };
3076         struct obd_device *obd = exp->exp_obd;
3077         int lflags = *flags;
3078         ldlm_mode_t rc;
3079         ENTRY;
3080
3081         res_id.name[0] = lsm->lsm_object_id;
3082         res_id.name[2] = lsm->lsm_object_gr;
3083
3084         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3085                 RETURN(-EIO);
3086
3087         /* Filesystem lock extents are extended to page boundaries so that
3088          * dealing with the page cache is a little smoother */
3089         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3090         policy->l_extent.end |= ~CFS_PAGE_MASK;
3091
3092         /* Next, search for already existing extent locks that will cover us */
3093         /* If we're trying to read, we also search for an existing PW lock.  The
3094          * VFS and page cache already protect us locally, so lots of readers/
3095          * writers can share a single PW lock. */
3096         rc = mode;
3097         if (mode == LCK_PR)
3098                 rc |= LCK_PW;
3099         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3100                              &res_id, type, policy, rc, lockh);
3101         if (rc) {
3102                 osc_set_data_with_check(lockh, data, lflags);
3103                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3104                         ldlm_lock_addref(lockh, LCK_PR);
3105                         ldlm_lock_decref(lockh, LCK_PW);
3106                 }
3107                 RETURN(rc);
3108         }
3109         RETURN(rc);
3110 }
3111
3112 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3113                       __u32 mode, struct lustre_handle *lockh)
3114 {
3115         ENTRY;
3116
3117         if (unlikely(mode == LCK_GROUP))
3118                 ldlm_lock_decref_and_cancel(lockh, mode);
3119         else
3120                 ldlm_lock_decref(lockh, mode);
3121
3122         RETURN(0);
3123 }
3124
3125 static int osc_cancel_unused(struct obd_export *exp,
3126                              struct lov_stripe_md *lsm, int flags,
3127                              void *opaque)
3128 {
3129         struct obd_device *obd = class_exp2obd(exp);
3130         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3131
3132         if (lsm != NULL) {
3133                 res_id.name[0] = lsm->lsm_object_id;
3134                 res_id.name[2] = lsm->lsm_object_gr;
3135                 resp = &res_id;
3136         }
3137
3138         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3139 }
3140
3141 static int osc_join_lru(struct obd_export *exp,
3142                         struct lov_stripe_md *lsm, int join)
3143 {
3144         struct obd_device *obd = class_exp2obd(exp);
3145         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3146
3147         if (lsm != NULL) {
3148                 res_id.name[0] = lsm->lsm_object_id;
3149                 res_id.name[2] = lsm->lsm_object_gr;
3150                 resp = &res_id;
3151         }
3152
3153         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3154 }
3155
3156 static int osc_statfs_interpret(struct ptlrpc_request *req,
3157                                 struct osc_async_args *aa, int rc)
3158 {
3159         struct obd_statfs *msfs;
3160         ENTRY;
3161
3162         if (rc != 0)
3163                 GOTO(out, rc);
3164
3165         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3166         if (msfs == NULL) {
3167                 GOTO(out, rc = -EPROTO);
3168         }
3169
3170         *aa->aa_oi->oi_osfs = *msfs;
3171 out:
3172         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3173         RETURN(rc);
3174 }
3175
3176 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3177                             __u64 max_age, struct ptlrpc_request_set *rqset)
3178 {
3179         struct ptlrpc_request *req;
3180         struct osc_async_args *aa;
3181         int                    rc;
3182         ENTRY;
3183
3184         /* We could possibly pass max_age in the request (as an absolute
3185          * timestamp or a "seconds.usec ago") so the target can avoid doing
3186          * extra calls into the filesystem if that isn't necessary (e.g.
3187          * during mount that would help a bit).  Having relative timestamps
3188          * is not so great if request processing is slow, while absolute
3189          * timestamps are not ideal because they need time synchronization. */
3190         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3191         if (req == NULL)
3192                 RETURN(-ENOMEM);
3193
3194         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3195         if (rc) {
3196                 ptlrpc_request_free(req);
3197                 RETURN(rc);
3198         }
3199         ptlrpc_request_set_replen(req);
3200         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3201
3202         req->rq_interpret_reply = osc_statfs_interpret;
3203         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3204         aa = (struct osc_async_args *)&req->rq_async_args;
3205         aa->aa_oi = oinfo;
3206
3207         ptlrpc_set_add_req(rqset, req);
3208         RETURN(0);
3209 }
3210
3211 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3212                       __u64 max_age)
3213 {
3214         struct obd_statfs     *msfs;
3215         struct ptlrpc_request *req;
3216         int rc;
3217         ENTRY;
3218
3219         /* We could possibly pass max_age in the request (as an absolute
3220          * timestamp or a "seconds.usec ago") so the target can avoid doing
3221          * extra calls into the filesystem if that isn't necessary (e.g.
3222          * during mount that would help a bit).  Having relative timestamps
3223          * is not so great if request processing is slow, while absolute
3224          * timestamps are not ideal because they need time synchronization. */
3225         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3226         if (req == NULL)
3227                 RETURN(-ENOMEM);
3228
3229         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3230         if (rc) {
3231                 ptlrpc_request_free(req);
3232                 RETURN(rc);
3233         }
3234         ptlrpc_request_set_replen(req);
3235         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3236
3237         rc = ptlrpc_queue_wait(req);
3238         if (rc)
3239                 GOTO(out, rc);
3240
3241         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3242         if (msfs == NULL) {
3243                 GOTO(out, rc = -EPROTO);
3244         }
3245
3246         *osfs = *msfs;
3247
3248         EXIT;
3249  out:
3250         ptlrpc_req_finished(req);
3251         return rc;
3252 }
3253
3254 /* Retrieve object striping information.
3255  *
3256  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3257  * the maximum number of OST indices which will fit in the user buffer.
3258  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3259  */
3260 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3261 {
3262         struct lov_user_md lum, *lumk;
3263         int rc = 0, lum_size;
3264         ENTRY;
3265
3266         if (!lsm)
3267                 RETURN(-ENODATA);
3268
3269         if (copy_from_user(&lum, lump, sizeof(lum)))
3270                 RETURN(-EFAULT);
3271
3272         if (lum.lmm_magic != LOV_USER_MAGIC)
3273                 RETURN(-EINVAL);
3274
3275         if (lum.lmm_stripe_count > 0) {
3276                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3277                 OBD_ALLOC(lumk, lum_size);
3278                 if (!lumk)
3279                         RETURN(-ENOMEM);
3280
3281                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3282                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3283         } else {
3284                 lum_size = sizeof(lum);
3285                 lumk = &lum;
3286         }
3287
3288         lumk->lmm_object_id = lsm->lsm_object_id;
3289         lumk->lmm_object_gr = lsm->lsm_object_gr;
3290         lumk->lmm_stripe_count = 1;
3291
3292         if (copy_to_user(lump, lumk, lum_size))
3293                 rc = -EFAULT;
3294
3295         if (lumk != &lum)
3296                 OBD_FREE(lumk, lum_size);
3297
3298         RETURN(rc);
3299 }
3300
3301
3302 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3303                          void *karg, void *uarg)
3304 {
3305         struct obd_device *obd = exp->exp_obd;
3306         struct obd_ioctl_data *data = karg;
3307         int err = 0;
3308         ENTRY;
3309
3310         if (!try_module_get(THIS_MODULE)) {
3311                 CERROR("Can't get module. Is it alive?");
3312                 return -EINVAL;
3313         }
3314         switch (cmd) {
3315         case OBD_IOC_LOV_GET_CONFIG: {
3316                 char *buf;
3317                 struct lov_desc *desc;
3318                 struct obd_uuid uuid;
3319
3320                 buf = NULL;
3321                 len = 0;
3322                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3323                         GOTO(out, err = -EINVAL);
3324
3325                 data = (struct obd_ioctl_data *)buf;
3326
3327                 if (sizeof(*desc) > data->ioc_inllen1) {
3328                         obd_ioctl_freedata(buf, len);
3329                         GOTO(out, err = -EINVAL);
3330                 }
3331
3332                 if (data->ioc_inllen2 < sizeof(uuid)) {
3333                         obd_ioctl_freedata(buf, len);
3334                         GOTO(out, err = -EINVAL);
3335                 }
3336
3337                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3338                 desc->ld_tgt_count = 1;
3339                 desc->ld_active_tgt_count = 1;
3340                 desc->ld_default_stripe_count = 1;
3341                 desc->ld_default_stripe_size = 0;
3342                 desc->ld_default_stripe_offset = 0;
3343                 desc->ld_pattern = 0;
3344                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3345
3346                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3347
3348                 err = copy_to_user((void *)uarg, buf, len);
3349                 if (err)
3350                         err = -EFAULT;
3351                 obd_ioctl_freedata(buf, len);
3352                 GOTO(out, err);
3353         }
3354         case LL_IOC_LOV_SETSTRIPE:
3355                 err = obd_alloc_memmd(exp, karg);
3356                 if (err > 0)
3357                         err = 0;
3358                 GOTO(out, err);
3359         case LL_IOC_LOV_GETSTRIPE:
3360                 err = osc_getstripe(karg, uarg);
3361                 GOTO(out, err);
3362         case OBD_IOC_CLIENT_RECOVER:
3363                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3364                                             data->ioc_inlbuf1);
3365                 if (err > 0)
3366                         err = 0;
3367                 GOTO(out, err);
3368         case IOC_OSC_SET_ACTIVE:
3369                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3370                                                data->ioc_offset);
3371                 GOTO(out, err);
3372         case OBD_IOC_POLL_QUOTACHECK:
3373                 err = lquota_poll_check(quota_interface, exp,
3374                                         (struct if_quotacheck *)karg);
3375                 GOTO(out, err);
3376         default:
3377                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3378                        cmd, cfs_curproc_comm());
3379                 GOTO(out, err = -ENOTTY);
3380         }
3381 out:
3382         module_put(THIS_MODULE);
3383         return err;
3384 }
3385
3386 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3387                         void *key, __u32 *vallen, void *val)
3388 {
3389         ENTRY;
3390         if (!vallen || !val)
3391                 RETURN(-EFAULT);
3392
3393         if (KEY_IS("lock_to_stripe")) {
3394                 __u32 *stripe = val;
3395                 *vallen = sizeof(*stripe);
3396                 *stripe = 0;
3397                 RETURN(0);
3398         } else if (KEY_IS("last_id")) {
3399                 struct ptlrpc_request *req;
3400                 obd_id                *reply;
3401                 char                  *tmp;
3402                 int                    rc;
3403
3404                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3405                                            &RQF_OST_GET_INFO);
3406                 if (req == NULL)
3407                         RETURN(-ENOMEM);
3408
3409                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3410                                      RCL_CLIENT, keylen);
3411                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3412                 if (rc) {
3413                         ptlrpc_request_free(req);
3414                         RETURN(rc);
3415                 }
3416
3417                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3418                 LASSERT(tmp);
3419                 memcpy(tmp, key, keylen);
3420
3421                 req_capsule_set_size(&req->rq_pill, &RMF_OBD_ID,
3422                                      RCL_SERVER, *vallen);
3423                 ptlrpc_request_set_replen(req);
3424                 rc = ptlrpc_queue_wait(req);
3425                 if (rc)
3426                         GOTO(out, rc);
3427
3428                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3429                 if (reply == NULL)
3430                         GOTO(out, rc = -EPROTO);
3431
3432                 *((obd_id *)val) = *reply;
3433         out:
3434                 ptlrpc_req_finished(req);
3435                 RETURN(rc);
3436         }
3437         RETURN(-EINVAL);
3438 }
3439
3440 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3441                                           void *aa, int rc)
3442 {
3443         struct llog_ctxt *ctxt;
3444         struct obd_import *imp = req->rq_import;
3445         ENTRY;
3446
3447         if (rc != 0)
3448                 RETURN(rc);
3449
3450         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3451         if (ctxt) {
3452                 if (rc == 0)
3453                         rc = llog_initiator_connect(ctxt);
3454                 else
3455                         CERROR("cannot establish connection for "
3456                                "ctxt %p: %d\n", ctxt, rc);
3457         }
3458
3459         spin_lock(&imp->imp_lock);
3460         imp->imp_server_timeout = 1;
3461         imp->imp_pingable = 1;
3462         spin_unlock(&imp->imp_lock);
3463         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3464
3465         RETURN(rc);
3466 }
3467
3468 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3469                               void *key, obd_count vallen, void *val,
3470                               struct ptlrpc_request_set *set)
3471 {
3472         struct ptlrpc_request *req;
3473         struct obd_device     *obd = exp->exp_obd;
3474         struct obd_import     *imp = class_exp2cliimp(exp);
3475         char                  *tmp;
3476         int                    rc;
3477         ENTRY;
3478
3479         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3480
3481         if (KEY_IS(KEY_NEXT_ID)) {
3482                 if (vallen != sizeof(obd_id))
3483                         RETURN(-EINVAL);
3484                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3485                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3486                        exp->exp_obd->obd_name,
3487                        obd->u.cli.cl_oscc.oscc_next_id);
3488
3489                 RETURN(0);
3490         }
3491
3492         if (KEY_IS("unlinked")) {
3493                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3494                 spin_lock(&oscc->oscc_lock);
3495                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3496                 spin_unlock(&oscc->oscc_lock);
3497                 RETURN(0);
3498         }
3499
3500         if (KEY_IS(KEY_INIT_RECOV)) {
3501                 if (vallen != sizeof(int))
3502                         RETURN(-EINVAL);
3503                 spin_lock(&imp->imp_lock);
3504                 imp->imp_initial_recov = *(int *)val;
3505                 spin_unlock(&imp->imp_lock);
3506                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3507                        exp->exp_obd->obd_name,
3508                        imp->imp_initial_recov);
3509                 RETURN(0);
3510         }
3511
3512         if (KEY_IS("checksum")) {
3513                 if (vallen != sizeof(int))
3514                         RETURN(-EINVAL);
3515                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3516                 RETURN(0);
3517         }
3518
3519         if (KEY_IS(KEY_FLUSH_CTX)) {
3520                 sptlrpc_import_flush_my_ctx(imp);
3521                 RETURN(0);
3522         }
3523
3524         if (!set)
3525                 RETURN(-EINVAL);
3526
3527         /* We pass all other commands directly to OST. Since nobody calls osc
3528            methods directly and everybody is supposed to go through LOV, we
3529            assume lov checked invalid values for us.
3530            The only recognised values so far are evict_by_nid and mds_conn.
3531            Even if something bad goes through, we'd get a -EINVAL from OST
3532            anyway. */
3533
3534
3535         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3536         if (req == NULL)
3537                 RETURN(-ENOMEM);
3538
3539         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3540                              RCL_CLIENT, keylen);
3541         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3542                              RCL_CLIENT, vallen);
3543         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3544         if (rc) {
3545                 ptlrpc_request_free(req);
3546                 RETURN(rc);
3547         }
3548
3549         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3550         memcpy(tmp, key, keylen);
3551         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3552         memcpy(tmp, val, vallen);
3553
3554         if (KEY_IS(KEY_MDS_CONN)) {
3555                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556
3557                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3558                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3559                 LASSERT(oscc->oscc_oa.o_gr > 0);
3560                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3561         }
3562
3563         ptlrpc_request_set_replen(req);
3564         ptlrpc_set_add_req(set, req);
3565         ptlrpc_check_set(set);
3566
3567         RETURN(0);
3568 }
3569
3570
3571 static struct llog_operations osc_size_repl_logops = {
3572         lop_cancel: llog_obd_repl_cancel
3573 };
3574
3575 static struct llog_operations osc_mds_ost_orig_logops;
3576 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3577                          struct obd_device *tgt, int count,
3578                          struct llog_catid *catid, struct obd_uuid *uuid)
3579 {
3580         int rc;
3581         ENTRY;
3582
3583         spin_lock(&obd->obd_dev_lock);
3584         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3585                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3586                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3587                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3588                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3589                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3590         }
3591         spin_unlock(&obd->obd_dev_lock);
3592
3593         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3594                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3595         if (rc) {
3596                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3597                 GOTO (out, rc);
3598         }
3599
3600         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3601                         &osc_size_repl_logops);
3602         if (rc)
3603                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3604 out:
3605         if (rc) {
3606                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3607                        obd->obd_name, tgt->obd_name, count, catid, rc);
3608                 CERROR("logid "LPX64":0x%x\n",
3609                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3610         }
3611         RETURN(rc);
3612 }
3613
3614 static int osc_llog_finish(struct obd_device *obd, int count)
3615 {
3616         struct llog_ctxt *ctxt;
3617         int rc = 0, rc2 = 0;
3618         ENTRY;
3619
3620         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3621         if (ctxt)
3622                 rc = llog_cleanup(ctxt);
3623
3624         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3625         if (ctxt)
3626                 rc2 = llog_cleanup(ctxt);
3627         if (!rc)
3628                 rc = rc2;
3629
3630         RETURN(rc);
3631 }
3632
3633 static int osc_reconnect(const struct lu_env *env,
3634                          struct obd_export *exp, struct obd_device *obd,
3635                          struct obd_uuid *cluuid,
3636                          struct obd_connect_data *data)
3637 {
3638         struct client_obd *cli = &obd->u.cli;
3639
3640         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3641                 long lost_grant;
3642
3643                 client_obd_list_lock(&cli->cl_loi_list_lock);
3644                 data->ocd_grant = cli->cl_avail_grant ?:
3645                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3646                 lost_grant = cli->cl_lost_grant;
3647                 cli->cl_lost_grant = 0;
3648                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3649
3650                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3651                        "cl_lost_grant: %ld\n", data->ocd_grant,
3652                        cli->cl_avail_grant, lost_grant);
3653                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3654                        " ocd_grant: %d\n", data->ocd_connect_flags,
3655                        data->ocd_version, data->ocd_grant);
3656         }
3657
3658         RETURN(0);
3659 }
3660
3661 static int osc_disconnect(struct obd_export *exp)
3662 {
3663         struct obd_device *obd = class_exp2obd(exp);
3664         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3665         int rc;
3666
3667         if (obd->u.cli.cl_conn_count == 1)
3668                 /* flush any remaining cancel messages out to the target */
3669                 llog_sync(ctxt, exp);
3670
3671         rc = client_disconnect_export(exp);
3672         return rc;
3673 }
3674
3675 static int osc_import_event(struct obd_device *obd,
3676                             struct obd_import *imp,
3677                             enum obd_import_event event)
3678 {
3679         struct client_obd *cli;
3680         int rc = 0;
3681
3682         ENTRY;
3683         LASSERT(imp->imp_obd == obd);
3684
3685         switch (event) {
3686         case IMP_EVENT_DISCON: {
3687                 /* Only do this on the MDS OSC's */
3688                 if (imp->imp_server_timeout) {
3689                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3690
3691                         spin_lock(&oscc->oscc_lock);
3692                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3693                         spin_unlock(&oscc->oscc_lock);
3694                 }
3695                 cli = &obd->u.cli;
3696                 client_obd_list_lock(&cli->cl_loi_list_lock);
3697                 cli->cl_avail_grant = 0;
3698                 cli->cl_lost_grant = 0;
3699                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3700                 break;
3701         }
3702         case IMP_EVENT_INACTIVE: {
3703                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3704                 break;
3705         }
3706         case IMP_EVENT_INVALIDATE: {
3707                 struct ldlm_namespace *ns = obd->obd_namespace;
3708
3709                 /* Reset grants */
3710                 cli = &obd->u.cli;
3711                 client_obd_list_lock(&cli->cl_loi_list_lock);
3712                 /* all pages go to failing rpcs due to the invalid import */
3713                 osc_check_rpcs(cli);
3714                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3715
3716                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3717
3718                 break;
3719         }
3720         case IMP_EVENT_ACTIVE: {
3721                 /* Only do this on the MDS OSC's */
3722                 if (imp->imp_server_timeout) {
3723                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3724
3725                         spin_lock(&oscc->oscc_lock);
3726                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3727                         spin_unlock(&oscc->oscc_lock);
3728                 }
3729                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3730                 break;
3731         }
3732         case IMP_EVENT_OCD: {
3733                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3734
3735                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3736                         osc_init_grant(&obd->u.cli, ocd);
3737
3738                 /* See bug 7198 */
3739                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3740                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3741
3742                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3743                 break;
3744         }
3745         default:
3746                 CERROR("Unknown import event %d\n", event);
3747                 LBUG();
3748         }
3749         RETURN(rc);
3750 }
3751
3752 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3753 {
3754         int rc;
3755         ENTRY;
3756
3757         ENTRY;
3758         rc = ptlrpcd_addref();
3759         if (rc)
3760                 RETURN(rc);
3761
3762         rc = client_obd_setup(obd, lcfg);
3763         if (rc) {
3764                 ptlrpcd_decref();
3765         } else {
3766                 struct lprocfs_static_vars lvars = { 0 };
3767                 struct client_obd *cli = &obd->u.cli;
3768
3769                 lprocfs_osc_init_vars(&lvars);
3770                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3771                         lproc_osc_attach_seqstat(obd);
3772                         sptlrpc_lprocfs_cliobd_attach(obd);
3773                         ptlrpc_lprocfs_register_obd(obd);
3774                 }
3775
3776                 oscc_init(obd);
3777                 /* We need to allocate a few requests more, because
3778                    brw_interpret_oap tries to create new requests before freeing
3779                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3780                    reserved, but I afraid that might be too much wasted RAM
3781                    in fact, so 2 is just my guess and still should work. */
3782                 cli->cl_import->imp_rq_pool =
3783                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3784                                             OST_MAXREQSIZE,
3785                                             ptlrpc_add_rqs_to_pool);
3786         }
3787
3788         RETURN(rc);
3789 }
3790
3791 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3792 {
3793         int rc = 0;
3794         ENTRY;
3795
3796         switch (stage) {
3797         case OBD_CLEANUP_EARLY: {
3798                 struct obd_import *imp;
3799                 imp = obd->u.cli.cl_import;
3800                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3801                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3802                 ptlrpc_deactivate_import(imp);
3803                 spin_lock(&imp->imp_lock);
3804                 imp->imp_pingable = 0;
3805                 spin_unlock(&imp->imp_lock);
3806                 break;
3807         }
3808         case OBD_CLEANUP_EXPORTS: {
3809                 /* If we set up but never connected, the
3810                    client import will not have been cleaned. */
3811                 if (obd->u.cli.cl_import) {
3812                         struct obd_import *imp;
3813                         imp = obd->u.cli.cl_import;
3814                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3815                                obd->obd_name);
3816                         ptlrpc_invalidate_import(imp);
3817                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3818                         class_destroy_import(imp);
3819                         obd->u.cli.cl_import = NULL;
3820                 }
3821                 break;
3822         }
3823         case OBD_CLEANUP_SELF_EXP:
3824                 rc = obd_llog_finish(obd, 0);
3825                 if (rc != 0)
3826                         CERROR("failed to cleanup llogging subsystems\n");
3827                 break;
3828         case OBD_CLEANUP_OBD:
3829                 break;
3830         }
3831         RETURN(rc);
3832 }
3833
3834 int osc_cleanup(struct obd_device *obd)
3835 {
3836         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3837         int rc;
3838
3839         ENTRY;
3840         ptlrpc_lprocfs_unregister_obd(obd);
3841         lprocfs_obd_cleanup(obd);
3842
3843         spin_lock(&oscc->oscc_lock);
3844         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3845         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3846         spin_unlock(&oscc->oscc_lock);
3847
3848         /* free memory of osc quota cache */
3849         lquota_cleanup(quota_interface, obd);
3850
3851         rc = client_obd_cleanup(obd);
3852
3853         ptlrpcd_decref();
3854         RETURN(rc);
3855 }
3856
3857 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3858 {
3859         struct lustre_cfg *lcfg = buf;
3860         struct lprocfs_static_vars lvars = { 0 };
3861         int rc = 0;
3862
3863         lprocfs_osc_init_vars(&lvars);
3864
3865         switch (lcfg->lcfg_command) {
3866         case LCFG_SPTLRPC_CONF:
3867                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3868                 break;
3869         default:
3870                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3871                                               lcfg, obd);
3872                 break;
3873         }
3874
3875         return(rc);
3876 }
3877
3878 struct obd_ops osc_obd_ops = {
3879         .o_owner                = THIS_MODULE,
3880         .o_setup                = osc_setup,
3881         .o_precleanup           = osc_precleanup,
3882         .o_cleanup              = osc_cleanup,
3883         .o_add_conn             = client_import_add_conn,
3884         .o_del_conn             = client_import_del_conn,
3885         .o_connect              = client_connect_import,
3886         .o_reconnect            = osc_reconnect,
3887         .o_disconnect           = osc_disconnect,
3888         .o_statfs               = osc_statfs,
3889         .o_statfs_async         = osc_statfs_async,
3890         .o_packmd               = osc_packmd,
3891         .o_unpackmd             = osc_unpackmd,
3892         .o_precreate            = osc_precreate,
3893         .o_create               = osc_create,
3894         .o_destroy              = osc_destroy,
3895         .o_getattr              = osc_getattr,
3896         .o_getattr_async        = osc_getattr_async,
3897         .o_setattr              = osc_setattr,
3898         .o_setattr_async        = osc_setattr_async,
3899         .o_brw                  = osc_brw,
3900         .o_brw_async            = osc_brw_async,
3901         .o_prep_async_page      = osc_prep_async_page,
3902         .o_queue_async_io       = osc_queue_async_io,
3903         .o_set_async_flags      = osc_set_async_flags,
3904         .o_queue_group_io       = osc_queue_group_io,
3905         .o_trigger_group_io     = osc_trigger_group_io,
3906         .o_teardown_async_page  = osc_teardown_async_page,
3907         .o_punch                = osc_punch,
3908         .o_sync                 = osc_sync,
3909         .o_enqueue              = osc_enqueue,
3910         .o_match                = osc_match,
3911         .o_change_cbdata        = osc_change_cbdata,
3912         .o_cancel               = osc_cancel,
3913         .o_cancel_unused        = osc_cancel_unused,
3914         .o_join_lru             = osc_join_lru,
3915         .o_iocontrol            = osc_iocontrol,
3916         .o_get_info             = osc_get_info,
3917         .o_set_info_async       = osc_set_info_async,
3918         .o_import_event         = osc_import_event,
3919         .o_llog_init            = osc_llog_init,
3920         .o_llog_finish          = osc_llog_finish,
3921         .o_process_config       = osc_process_config,
3922 };
3923 int __init osc_init(void)
3924 {
3925         struct lprocfs_static_vars lvars = { 0 };
3926         int rc;
3927         ENTRY;
3928
3929         lprocfs_osc_init_vars(&lvars);
3930
3931         request_module("lquota");
3932         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3933         lquota_init(quota_interface);
3934         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3935
3936         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3937                                  LUSTRE_OSC_NAME, NULL);
3938         if (rc) {
3939                 if (quota_interface)
3940                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3941                 RETURN(rc);
3942         }
3943
3944         RETURN(rc);
3945 }
3946
3947 #ifdef __KERNEL__
3948 static void /*__exit*/ osc_exit(void)
3949 {
3950         lquota_exit(quota_interface);
3951         if (quota_interface)
3952                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3953
3954         class_unregister_type(LUSTRE_OSC_NAME);
3955 }
3956
3957 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3958 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3959 MODULE_LICENSE("GPL");
3960
3961 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3962 #endif