Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
178         LASSERT(body);
179
180         body->oa = *oinfo->oi_oa;
181         osc_pack_capa(req, body, oinfo->oi_capa);
182 }
183
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185                                      const struct req_msg_field *field,
186                                      struct obd_capa *oc)
187 {
188         if (oc == NULL)
189                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
190         else
191                 /* it is already calculated as sizeof struct obd_capa */
192                 ;
193 }
194
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196                                  struct osc_async_args *aa, int rc)
197 {
198         struct ost_body *body;
199         ENTRY;
200
201         if (rc != 0)
202                 GOTO(out, rc);
203
204         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205                                   lustre_swab_ost_body);
206         if (body) {
207                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
209
210                 /* This should really be sent by the OST */
211                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
213         } else {
214                 CDEBUG(D_INFO, "can't unpack ost_body\n");
215                 rc = -EPROTO;
216                 aa->aa_oi->oi_oa->o_valid = 0;
217         }
218 out:
219         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
220         RETURN(rc);
221 }
222
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224                              struct ptlrpc_request_set *set)
225 {
226         struct ptlrpc_request *req;
227         struct osc_async_args *aa;
228         int                    rc;
229         ENTRY;
230
231         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
232         if (req == NULL)
233                 RETURN(-ENOMEM);
234
235         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
237         if (rc) {
238                 ptlrpc_request_free(req);
239                 RETURN(rc);
240         }
241
242         osc_pack_req_body(req, oinfo);
243
244         ptlrpc_request_set_replen(req);
245         req->rq_interpret_reply = osc_getattr_interpret;
246
247         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248         aa = (struct osc_async_args *)&req->rq_async_args;
249         aa->aa_oi = oinfo;
250
251         ptlrpc_set_add_req(set, req);
252         RETURN(0);
253 }
254
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
256 {
257         struct ptlrpc_request *req;
258         struct ost_body       *body;
259         int                    rc;
260         ENTRY;
261
262         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
263         if (req == NULL)
264                 RETURN(-ENOMEM);
265
266         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
268         if (rc) {
269                 ptlrpc_request_free(req);
270                 RETURN(rc);
271         }
272
273         osc_pack_req_body(req, oinfo);
274
275         ptlrpc_request_set_replen(req);
276  
277         rc = ptlrpc_queue_wait(req);
278         if (rc)
279                 GOTO(out, rc);
280
281         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
282         if (body == NULL)
283                 GOTO(out, rc = -EPROTO);
284
285         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286         *oinfo->oi_oa = body->oa;
287
288         /* This should really be sent by the OST */
289         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
291
292         EXIT;
293  out:
294         ptlrpc_req_finished(req);
295         return rc;
296 }
297
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299                        struct obd_trans_info *oti)
300 {
301         struct ptlrpc_request *req;
302         struct ost_body       *body;
303         int                    rc;
304         ENTRY;
305
306         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307                                         oinfo->oi_oa->o_gr > 0);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
310         if (req == NULL)
311                 RETURN(-ENOMEM);
312
313         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
315         if (rc) {
316                 ptlrpc_request_free(req);
317                 RETURN(rc);
318         }
319
320         osc_pack_req_body(req, oinfo);
321
322         ptlrpc_request_set_replen(req);
323  
324
325         rc = ptlrpc_queue_wait(req);
326         if (rc)
327                 GOTO(out, rc);
328
329         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
330         if (body == NULL)
331                 GOTO(out, rc = -EPROTO);
332
333         *oinfo->oi_oa = body->oa;
334
335         EXIT;
336 out:
337         ptlrpc_req_finished(req);
338         RETURN(rc);
339 }
340
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342                                  struct osc_async_args *aa, int rc)
343 {
344         struct ost_body *body;
345         ENTRY;
346
347         if (rc != 0)
348                 GOTO(out, rc);
349
350         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
351         if (body == NULL)
352                 GOTO(out, rc = -EPROTO);
353
354         *aa->aa_oi->oi_oa = body->oa;
355 out:
356         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361                              struct obd_trans_info *oti,
362                              struct ptlrpc_request_set *rqset)
363 {
364         struct ptlrpc_request *req;
365         struct osc_async_args *aa;
366         int                    rc;
367         ENTRY;
368
369         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
370         if (req == NULL)
371                 RETURN(-ENOMEM);
372
373         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 RETURN(rc);
378         }
379
380         osc_pack_req_body(req, oinfo);
381
382         ptlrpc_request_set_replen(req);
383  
384         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
385                 LASSERT(oti);
386                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
387         }
388
389         /* do mds to ost setattr asynchronouly */
390         if (!rqset) {
391                 /* Do not wait for response. */
392                 ptlrpcd_add_req(req);
393         } else {
394                 req->rq_interpret_reply = osc_setattr_interpret;
395
396                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397                 aa = (struct osc_async_args *)&req->rq_async_args;
398                 aa->aa_oi = oinfo;
399
400                 ptlrpc_set_add_req(rqset, req);
401         }
402
403         RETURN(0);
404 }
405
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
408 {
409         struct ptlrpc_request *req;
410         struct ost_body       *body;
411         struct lov_stripe_md  *lsm;
412         int                    rc;
413         ENTRY;
414
415         LASSERT(oa);
416         LASSERT(ea);
417
418         lsm = *ea;
419         if (!lsm) {
420                 rc = obd_alloc_memmd(exp, &lsm);
421                 if (rc < 0)
422                         RETURN(rc);
423         }
424
425         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
426         if (req == NULL)
427                 GOTO(out, rc = -ENOMEM);
428
429         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
430         if (rc) {
431                 ptlrpc_request_free(req);
432                 GOTO(out, rc);
433         }
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         body->oa = *oa;
438
439         ptlrpc_request_set_replen(req);
440
441         if (oa->o_valid & OBD_MD_FLINLINE) {
442                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443                         oa->o_flags == OBD_FL_DELORPHAN);
444                 DEBUG_REQ(D_HA, req,
445                           "delorphan from OST integration");
446                 /* Don't resend the delorphan req */
447                 req->rq_no_resend = req->rq_no_delay = 1;
448         }
449
450         rc = ptlrpc_queue_wait(req);
451         if (rc)
452                 GOTO(out_req, rc);
453
454         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
455         if (body == NULL)
456                 GOTO(out_req, rc = -EPROTO);
457
458         *oa = body->oa;
459
460         /* This should really be sent by the OST */
461         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462         oa->o_valid |= OBD_MD_FLBLKSZ;
463
464         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465          * have valid lsm_oinfo data structs, so don't go touching that.
466          * This needs to be fixed in a big way.
467          */
468         lsm->lsm_object_id = oa->o_id;
469         lsm->lsm_object_gr = oa->o_gr;
470         *ea = lsm;
471
472         if (oti != NULL) {
473                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
474
475                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476                         if (!oti->oti_logcookies)
477                                 oti_alloc_cookies(oti, 1);
478                         *oti->oti_logcookies = *obdo_logcookie(oa);
479                 }
480         }
481
482         CDEBUG(D_HA, "transno: "LPD64"\n",
483                lustre_msg_get_transno(req->rq_repmsg));
484 out_req:
485         ptlrpc_req_finished(req);
486 out:
487         if (rc && !*ea)
488                 obd_free_memmd(exp, &lsm);
489         RETURN(rc);
490 }
491
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493                                struct osc_async_args *aa, int rc)
494 {
495         struct ost_body *body;
496         ENTRY;
497
498         if (rc != 0)
499                 GOTO(out, rc);
500
501         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
502         if (body == NULL)
503                 GOTO(out, rc = -EPROTO);
504
505         *aa->aa_oi->oi_oa = body->oa;
506 out:
507         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
508         RETURN(rc);
509 }
510
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512                      struct obd_trans_info *oti,
513                      struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request *req;
516         struct osc_async_args *aa;
517         struct ost_body       *body;
518         int                    rc;
519         ENTRY;
520
521         if (!oinfo->oi_oa) {
522                 CDEBUG(D_INFO, "oa NULL\n");
523                 RETURN(-EINVAL);
524         }
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         osc_pack_req_body(req, oinfo);
538
539         /* overload the size and blocks fields in the oa with start/end */
540         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
541         LASSERT(body);
542         body->oa.o_size = oinfo->oi_policy.l_extent.start;
543         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545         ptlrpc_request_set_replen(req);
546
547
548         req->rq_interpret_reply = osc_punch_interpret;
549         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550         aa = (struct osc_async_args *)&req->rq_async_args;
551         aa->aa_oi = oinfo;
552         ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558                     struct lov_stripe_md *md, obd_size start, obd_size end,
559                     void *capa)
560 {
561         struct ptlrpc_request *req;
562         struct ost_body       *body;
563         int                    rc;
564         ENTRY;
565
566         if (!oa) {
567                 CDEBUG(D_INFO, "oa NULL\n");
568                 RETURN(-EINVAL);
569         }
570
571         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
572         if (req == NULL)
573                 RETURN(-ENOMEM);
574
575         osc_set_capa_size(req, &RMF_CAPA1, capa);
576         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
577         if (rc) {
578                 ptlrpc_request_free(req);
579                 RETURN(rc);
580         }
581
582         /* overload the size and blocks fields in the oa with start/end */
583         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
584         LASSERT(body);
585         body->oa = *oa;
586         body->oa.o_size = start;
587         body->oa.o_blocks = end;
588         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589         osc_pack_capa(req, body, capa);
590
591         ptlrpc_request_set_replen(req);
592
593         rc = ptlrpc_queue_wait(req);
594         if (rc)
595                 GOTO(out, rc);
596
597         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
598         if (body == NULL)
599                 GOTO(out, rc = -EPROTO);
600
601         *oa = body->oa;
602
603         EXIT;
604  out:
605         ptlrpc_req_finished(req);
606         return rc;
607 }
608
609 /* Find and cancel locally locks matched by @mode in the resource found by
610  * @objid. Found locks are added into @cancel list. Returns the amount of
611  * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613                                    struct list_head *cancels, ldlm_mode_t mode,
614                                    int lock_flags)
615 {
616         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
619         int count;
620         ENTRY;
621
622         if (res == NULL)
623                 RETURN(0);
624
625         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626                                            lock_flags, 0, NULL);
627         ldlm_resource_putref(res);
628         RETURN(count);
629 }
630
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
632                                  int rc)
633 {
634         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
635
636         atomic_dec(&cli->cl_destroy_in_flight);
637         cfs_waitq_signal(&cli->cl_destroy_waitq);
638         return 0;
639 }
640
641 static int osc_can_send_destroy(struct client_obd *cli)
642 {
643         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644             cli->cl_max_rpcs_in_flight) {
645                 /* The destroy request can be sent */
646                 return 1;
647         }
648         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649             cli->cl_max_rpcs_in_flight) {
650                 /*
651                  * The counter has been modified between the two atomic
652                  * operations.
653                  */
654                 cfs_waitq_signal(&cli->cl_destroy_waitq);
655         }
656         return 0;
657 }
658
659 /* Destroy requests can be async always on the client, and we don't even really
660  * care about the return code since the client cannot do anything at all about
661  * a destroy failure.
662  * When the MDS is unlinking a filename, it saves the file objects into a
663  * recovery llog, and these object records are cancelled when the OST reports
664  * they were destroyed and sync'd to disk (i.e. transaction committed).
665  * If the client dies, or the OST is down when the object should be destroyed,
666  * the records are not cancelled, and when the OST reconnects to the MDS next,
667  * it will retrieve the llog unlink logs and then sends the log cancellation
668  * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
671                        struct obd_export *md_export)
672 {
673         struct client_obd     *cli = &exp->exp_obd->u.cli;
674         struct ptlrpc_request *req;
675         struct ost_body       *body;
676         CFS_LIST_HEAD(cancels);
677         int rc, count;
678         ENTRY;
679
680         if (!oa) {
681                 CDEBUG(D_INFO, "oa NULL\n");
682                 RETURN(-EINVAL);
683         }
684
685         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686                                         LDLM_FL_DISCARD_DATA);
687
688         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
689         if (req == NULL) {
690                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
691                 RETURN(-ENOMEM);
692         }
693
694         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
695                                0, &cancels, count);
696         if (rc) {
697                 ptlrpc_request_free(req);
698                 RETURN(rc);
699         }
700
701         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
702         req->rq_interpret_reply = osc_destroy_interpret;
703
704         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
705                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
706                        sizeof(*oti->oti_logcookies));
707         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
708         LASSERT(body);
709         body->oa = *oa;
710
711         ptlrpc_request_set_replen(req);
712
713         if (!osc_can_send_destroy(cli)) {
714                 struct l_wait_info lwi = { 0 };
715
716                 /*
717                  * Wait until the number of on-going destroy RPCs drops
718                  * under max_rpc_in_flight
719                  */
720                 l_wait_event_exclusive(cli->cl_destroy_waitq,
721                                        osc_can_send_destroy(cli), &lwi);
722         }
723
724         /* Do not wait for response */
725         ptlrpcd_add_req(req);
726         RETURN(0);
727 }
728
729 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
730                                 long writing_bytes)
731 {
732         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
733
734         LASSERT(!(oa->o_valid & bits));
735
736         oa->o_valid |= bits;
737         client_obd_list_lock(&cli->cl_loi_list_lock);
738         oa->o_dirty = cli->cl_dirty;
739         if (cli->cl_dirty > cli->cl_dirty_max) {
740                 CERROR("dirty %lu > dirty_max %lu\n",
741                        cli->cl_dirty, cli->cl_dirty_max);
742                 oa->o_undirty = 0;
743         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
744                 CERROR("dirty %d > system dirty_max %d\n",
745                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
746                 oa->o_undirty = 0;
747         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
748                 CERROR("dirty %lu - dirty_max %lu too big???\n",
749                        cli->cl_dirty, cli->cl_dirty_max);
750                 oa->o_undirty = 0;
751         } else {
752                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
753                                 (cli->cl_max_rpcs_in_flight + 1);
754                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
755         }
756         oa->o_grant = cli->cl_avail_grant;
757         oa->o_dropped = cli->cl_lost_grant;
758         cli->cl_lost_grant = 0;
759         client_obd_list_unlock(&cli->cl_loi_list_lock);
760         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
761                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
762 }
763
764 /* caller must hold loi_list_lock */
765 static void osc_consume_write_grant(struct client_obd *cli,
766                                     struct brw_page *pga)
767 {
768         atomic_inc(&obd_dirty_pages);
769         cli->cl_dirty += CFS_PAGE_SIZE;
770         cli->cl_avail_grant -= CFS_PAGE_SIZE;
771         pga->flag |= OBD_BRW_FROM_GRANT;
772         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
773                CFS_PAGE_SIZE, pga, pga->pg);
774         LASSERT(cli->cl_avail_grant >= 0);
775 }
776
777 /* the companion to osc_consume_write_grant, called when a brw has completed.
778  * must be called with the loi lock held. */
779 static void osc_release_write_grant(struct client_obd *cli,
780                                     struct brw_page *pga, int sent)
781 {
782         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
783         ENTRY;
784
785         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
786                 EXIT;
787                 return;
788         }
789
790         pga->flag &= ~OBD_BRW_FROM_GRANT;
791         atomic_dec(&obd_dirty_pages);
792         cli->cl_dirty -= CFS_PAGE_SIZE;
793         if (!sent) {
794                 cli->cl_lost_grant += CFS_PAGE_SIZE;
795                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
796                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
797         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
798                 /* For short writes we shouldn't count parts of pages that
799                  * span a whole block on the OST side, or our accounting goes
800                  * wrong.  Should match the code in filter_grant_check. */
801                 int offset = pga->off & ~CFS_PAGE_MASK;
802                 int count = pga->count + (offset & (blocksize - 1));
803                 int end = (offset + pga->count) & (blocksize - 1);
804                 if (end)
805                         count += blocksize - end;
806
807                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
808                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
809                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
810                        cli->cl_avail_grant, cli->cl_dirty);
811         }
812
813         EXIT;
814 }
815
816 static unsigned long rpcs_in_flight(struct client_obd *cli)
817 {
818         return cli->cl_r_in_flight + cli->cl_w_in_flight;
819 }
820
821 /* caller must hold loi_list_lock */
822 void osc_wake_cache_waiters(struct client_obd *cli)
823 {
824         struct list_head *l, *tmp;
825         struct osc_cache_waiter *ocw;
826
827         ENTRY;
828         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
829                 /* if we can't dirty more, we must wait until some is written */
830                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
831                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
832                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
833                                "osc max %ld, sys max %d\n", cli->cl_dirty,
834                                cli->cl_dirty_max, obd_max_dirty_pages);
835                         return;
836                 }
837
838                 /* if still dirty cache but no grant wait for pending RPCs that
839                  * may yet return us some grant before doing sync writes */
840                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
841                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
842                                cli->cl_w_in_flight);
843                         return;
844                 }
845
846                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
847                 list_del_init(&ocw->ocw_entry);
848                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
849                         /* no more RPCs in flight to return grant, do sync IO */
850                         ocw->ocw_rc = -EDQUOT;
851                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
852                 } else {
853                         osc_consume_write_grant(cli,
854                                                 &ocw->ocw_oap->oap_brw_page);
855                 }
856
857                 cfs_waitq_signal(&ocw->ocw_waitq);
858         }
859
860         EXIT;
861 }
862
863 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
864 {
865         client_obd_list_lock(&cli->cl_loi_list_lock);
866         cli->cl_avail_grant = ocd->ocd_grant;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868
869         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
870                cli->cl_avail_grant, cli->cl_lost_grant);
871         LASSERT(cli->cl_avail_grant >= 0);
872 }
873
874 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
878         if (body->oa.o_valid & OBD_MD_FLGRANT)
879                 cli->cl_avail_grant += body->oa.o_grant;
880         /* waiters are woken in brw_interpret_oap */
881         client_obd_list_unlock(&cli->cl_loi_list_lock);
882 }
883
884 /* We assume that the reason this OSC got a short read is because it read
885  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
886  * via the LOV, and it _knows_ it's reading inside the file, it's just that
887  * this stripe never got written at or beyond this stripe offset yet. */
888 static void handle_short_read(int nob_read, obd_count page_count,
889                               struct brw_page **pga)
890 {
891         char *ptr;
892         int i = 0;
893
894         /* skip bytes read OK */
895         while (nob_read > 0) {
896                 LASSERT (page_count > 0);
897
898                 if (pga[i]->count > nob_read) {
899                         /* EOF inside this page */
900                         ptr = cfs_kmap(pga[i]->pg) +
901                                 (pga[i]->off & ~CFS_PAGE_MASK);
902                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
903                         cfs_kunmap(pga[i]->pg);
904                         page_count--;
905                         i++;
906                         break;
907                 }
908
909                 nob_read -= pga[i]->count;
910                 page_count--;
911                 i++;
912         }
913
914         /* zero remaining pages */
915         while (page_count-- > 0) {
916                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
917                 memset(ptr, 0, pga[i]->count);
918                 cfs_kunmap(pga[i]->pg);
919                 i++;
920         }
921 }
922
923 static int check_write_rcs(struct ptlrpc_request *req,
924                            int requested_nob, int niocount,
925                            obd_count page_count, struct brw_page **pga)
926 {
927         int    *remote_rcs, i;
928
929         /* return error if any niobuf was in error */
930         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
931                                         sizeof(*remote_rcs) * niocount, NULL);
932         if (remote_rcs == NULL) {
933                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
934                 return(-EPROTO);
935         }
936         if (lustre_msg_swabbed(req->rq_repmsg))
937                 for (i = 0; i < niocount; i++)
938                         __swab32s(&remote_rcs[i]);
939
940         for (i = 0; i < niocount; i++) {
941                 if (remote_rcs[i] < 0)
942                         return(remote_rcs[i]);
943
944                 if (remote_rcs[i] != 0) {
945                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
946                                 i, remote_rcs[i], req);
947                         return(-EPROTO);
948                 }
949         }
950
951         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
952                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
953                        requested_nob, req->rq_bulk->bd_nob_transferred);
954                 return(-EPROTO);
955         }
956
957         return (0);
958 }
959
960 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
961 {
962         if (p1->flag != p2->flag) {
963                 unsigned mask = ~OBD_BRW_FROM_GRANT;
964
965                 /* warn if we try to combine flags that we don't know to be
966                  * safe to combine */
967                 if ((p1->flag & mask) != (p2->flag & mask))
968                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
969                                "same brw?\n", p1->flag, p2->flag);
970                 return 0;
971         }
972
973         return (p1->off + p1->count == p2->off);
974 }
975
976 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
977                                    struct brw_page **pga, int opc)
978 {
979         __u32 cksum = ~0;
980         int i = 0;
981
982         LASSERT (pg_count > 0);
983         while (nob > 0 && pg_count > 0) {
984                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
985                 int off = pga[i]->off & ~CFS_PAGE_MASK;
986                 int count = pga[i]->count > nob ? nob : pga[i]->count;
987
988                 /* corrupt the data before we compute the checksum, to
989                  * simulate an OST->client data error */
990                 if (i == 0 && opc == OST_READ &&
991                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
992                         memcpy(ptr + off, "bad1", min(4, nob));
993                 cksum = crc32_le(cksum, ptr + off, count);
994                 cfs_kunmap(pga[i]->pg);
995                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
996                                off, cksum);
997
998                 nob -= pga[i]->count;
999                 pg_count--;
1000                 i++;
1001         }
1002         /* For sending we only compute the wrong checksum instead
1003          * of corrupting the data so it is still correct on a redo */
1004         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1005                 cksum++;
1006
1007         return cksum;
1008 }
1009
1010 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1011                                 struct lov_stripe_md *lsm, obd_count page_count,
1012                                 struct brw_page **pga, 
1013                                 struct ptlrpc_request **reqp,
1014                                 struct obd_capa *ocapa)
1015 {
1016         struct ptlrpc_request   *req;
1017         struct ptlrpc_bulk_desc *desc;
1018         struct ost_body         *body;
1019         struct obd_ioobj        *ioobj;
1020         struct niobuf_remote    *niobuf;
1021         int niocount, i, requested_nob, opc, rc;
1022         struct osc_brw_async_args *aa;
1023         struct req_capsule      *pill;
1024
1025         ENTRY;
1026         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1027                 RETURN(-ENOMEM); /* Recoverable */
1028         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1029                 RETURN(-EINVAL); /* Fatal */
1030
1031         if ((cmd & OBD_BRW_WRITE) != 0) {
1032                 opc = OST_WRITE;
1033                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1034                                                 cli->cl_import->imp_rq_pool,
1035                                                 &RQF_OST_BRW);
1036         } else {
1037                 opc = OST_READ;
1038                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1039         }
1040
1041         if (req == NULL)
1042                 RETURN(-ENOMEM);
1043
1044         for (niocount = i = 1; i < page_count; i++) {
1045                 if (!can_merge_pages(pga[i - 1], pga[i]))
1046                         niocount++;
1047         }
1048
1049         pill = &req->rq_pill;
1050         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1051                              niocount * sizeof(*niobuf));
1052         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1053
1054         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1055         if (rc) {
1056                 ptlrpc_request_free(req);
1057                 RETURN(rc);
1058         }
1059         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1060
1061         if (opc == OST_WRITE)
1062                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1063                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1064         else
1065                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1066                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1067
1068         if (desc == NULL)
1069                 GOTO(out, rc = -ENOMEM);
1070         /* NB request now owns desc and will free it when it gets freed */
1071
1072         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1073         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1074         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1075         LASSERT(body && ioobj && niobuf);
1076
1077         body->oa = *oa;
1078
1079         obdo_to_ioobj(oa, ioobj);
1080         ioobj->ioo_bufcnt = niocount;
1081         osc_pack_capa(req, body, ocapa);
1082         LASSERT (page_count > 0);
1083         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1084                 struct brw_page *pg = pga[i];
1085                 struct brw_page *pg_prev = pga[i - 1];
1086
1087                 LASSERT(pg->count > 0);
1088                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1089                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1090                          pg->off, pg->count);
1091 #ifdef __linux__
1092                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1093                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1094                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1095                          i, page_count,
1096                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1097                          pg_prev->pg, page_private(pg_prev->pg),
1098                          pg_prev->pg->index, pg_prev->off);
1099 #else
1100                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1101                          "i %d p_c %u\n", i, page_count);
1102 #endif
1103                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1104                         (pg->flag & OBD_BRW_SRVLOCK));
1105
1106                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1107                                       pg->count);
1108                 requested_nob += pg->count;
1109
1110                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1111                         niobuf--;
1112                         niobuf->len += pg->count;
1113                 } else {
1114                         niobuf->offset = pg->off;
1115                         niobuf->len    = pg->count;
1116                         niobuf->flags  = pg->flag;
1117                 }
1118         }
1119
1120         LASSERT((void *)(niobuf - niocount) ==
1121                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1122                                niocount * sizeof(*niobuf)));
1123         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1124
1125         /* size[REQ_REC_OFF] still sizeof (*body) */
1126         if (opc == OST_WRITE) {
1127                 if (unlikely(cli->cl_checksum) &&
1128                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1129                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1130                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1131                                                              page_count, pga,
1132                                                              OST_WRITE);
1133                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1134                                body->oa.o_cksum);
1135                         /* save this in 'oa', too, for later checking */
1136                         oa->o_valid |= OBD_MD_FLCKSUM;
1137                 } else {
1138                         /* clear out the checksum flag, in case this is a
1139                          * resend but cl_checksum is no longer set. b=11238 */
1140                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1141                 }
1142                 oa->o_cksum = body->oa.o_cksum;
1143                 /* 1 RC per niobuf */
1144                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1145                                      sizeof(__u32) * niocount);
1146         } else {
1147                 if (unlikely(cli->cl_checksum) &&
1148                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL)
1149                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1150                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1151                 /* 1 RC for the whole I/O */
1152         }
1153         ptlrpc_request_set_replen(req);
1154
1155         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1156         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1157         aa->aa_oa = oa;
1158         aa->aa_requested_nob = requested_nob;
1159         aa->aa_nio_count = niocount;
1160         aa->aa_page_count = page_count;
1161         aa->aa_resends = 0;
1162         aa->aa_ppga = pga;
1163         aa->aa_cli = cli;
1164         INIT_LIST_HEAD(&aa->aa_oaps);
1165
1166         *reqp = req;
1167         RETURN(0);
1168
1169  out:
1170         ptlrpc_req_finished(req);
1171         RETURN(rc);
1172 }
1173
1174 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1175                                 __u32 client_cksum, __u32 server_cksum,
1176                                 int nob, obd_count page_count,
1177                                 struct brw_page **pga)
1178 {
1179         __u32 new_cksum;
1180         char *msg;
1181
1182         if (server_cksum == client_cksum) {
1183                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1184                 return 0;
1185         }
1186
1187         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1188
1189         if (new_cksum == server_cksum)
1190                 msg = "changed on the client after we checksummed it - "
1191                       "likely false positive due to mmap IO (bug 11742)";
1192         else if (new_cksum == client_cksum)
1193                 msg = "changed in transit before arrival at OST";
1194         else
1195                 msg = "changed in transit AND doesn't match the original - "
1196                       "likely false positive due to mmap IO (bug 11742)";
1197
1198         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1199                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1200                            "["LPU64"-"LPU64"]\n",
1201                            msg, libcfs_nid2str(peer->nid),
1202                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1203                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1204                                                         (__u64)0,
1205                            oa->o_id,
1206                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1207                            pga[0]->off,
1208                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1209         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1210                client_cksum, server_cksum, new_cksum);
1211         return 1;        
1212 }
1213
1214 /* Note rc enters this function as number of bytes transferred */
1215 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1216 {
1217         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1218         const lnet_process_id_t *peer =
1219                         &req->rq_import->imp_connection->c_peer;
1220         struct client_obd *cli = aa->aa_cli;
1221         struct ost_body *body;
1222         __u32 client_cksum = 0;
1223         ENTRY;
1224
1225         if (rc < 0 && rc != -EDQUOT)
1226                 RETURN(rc);
1227
1228         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1229         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1230                                   lustre_swab_ost_body);
1231         if (body == NULL) {
1232                 CDEBUG(D_INFO, "Can't unpack body\n");
1233                 RETURN(-EPROTO);
1234         }
1235
1236         /* set/clear over quota flag for a uid/gid */
1237         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1238             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1239                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1240                              body->oa.o_gid, body->oa.o_valid,
1241                              body->oa.o_flags);
1242
1243         if (rc < 0)
1244                 RETURN(rc);
1245
1246         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1247                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1248
1249         osc_update_grant(cli, body);
1250
1251         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1252                 if (rc > 0) {
1253                         CERROR("Unexpected +ve rc %d\n", rc);
1254                         RETURN(-EPROTO);
1255                 }
1256                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1257
1258                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1259                              client_cksum &&
1260                              check_write_checksum(&body->oa, peer, client_cksum,
1261                                                   body->oa.o_cksum,
1262                                                   aa->aa_requested_nob,
1263                                                   aa->aa_page_count,
1264                                                   aa->aa_ppga)))
1265                         RETURN(-EAGAIN);
1266
1267                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1268                         RETURN(-EAGAIN);
1269
1270                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1271                                      aa->aa_page_count, aa->aa_ppga);
1272                 GOTO(out, rc);
1273         }
1274
1275         /* The rest of this function executes only for OST_READs */
1276         if (rc > aa->aa_requested_nob) {
1277                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1278                        aa->aa_requested_nob);
1279                 RETURN(-EPROTO);
1280         }
1281
1282         if (rc != req->rq_bulk->bd_nob_transferred) {
1283                 CERROR ("Unexpected rc %d (%d transferred)\n",
1284                         rc, req->rq_bulk->bd_nob_transferred);
1285                 return (-EPROTO);
1286         }
1287
1288         if (rc < aa->aa_requested_nob)
1289                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1290
1291         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1292                                          aa->aa_ppga))
1293                 GOTO(out, rc = -EAGAIN);
1294
1295         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1296                 static int cksum_counter;
1297                 __u32      server_cksum = body->oa.o_cksum;
1298                 char      *via;
1299                 char      *router;
1300
1301                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1302                                                  aa->aa_ppga, OST_READ);
1303
1304                 if (peer->nid == req->rq_bulk->bd_sender) {
1305                         via = router = "";
1306                 } else {
1307                         via = " via ";
1308                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1309                 }
1310
1311                 if (server_cksum == ~0 && rc > 0) {
1312                         CERROR("Protocol error: server %s set the 'checksum' "
1313                                "bit, but didn't send a checksum.  Not fatal, "
1314                                "but please tell CFS.\n",
1315                                libcfs_nid2str(peer->nid));
1316                 } else if (server_cksum != client_cksum) {
1317                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1318                                            "%s%s%s inum "LPU64"/"LPU64" object "
1319                                            LPU64"/"LPU64" extent "
1320                                            "["LPU64"-"LPU64"]\n",
1321                                            req->rq_import->imp_obd->obd_name,
1322                                            libcfs_nid2str(peer->nid),
1323                                            via, router,
1324                                            body->oa.o_valid & OBD_MD_FLFID ?
1325                                                 body->oa.o_fid : (__u64)0,
1326                                            body->oa.o_valid & OBD_MD_FLFID ?
1327                                                 body->oa.o_generation :(__u64)0,
1328                                            body->oa.o_id,
1329                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1330                                                 body->oa.o_gr : (__u64)0,
1331                                            aa->aa_ppga[0]->off,
1332                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1333                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1334                                                                         1);
1335                         CERROR("client %x, server %x\n",
1336                                client_cksum, server_cksum);
1337                         cksum_counter = 0;
1338                         aa->aa_oa->o_cksum = client_cksum;
1339                         rc = -EAGAIN;
1340                 } else {
1341                         cksum_counter++;
1342                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1343                         rc = 0;
1344                 }
1345         } else if (unlikely(client_cksum)) {
1346                 static int cksum_missed;
1347
1348                 cksum_missed++;
1349                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1350                         CERROR("Checksum %u requested from %s but not sent\n",
1351                                cksum_missed, libcfs_nid2str(peer->nid));
1352         } else {
1353                 rc = 0;
1354         }
1355 out:
1356         if (rc >= 0)
1357                 *aa->aa_oa = body->oa;
1358
1359         RETURN(rc);
1360 }
1361
1362 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1363                             struct lov_stripe_md *lsm,
1364                             obd_count page_count, struct brw_page **pga,
1365                             struct obd_capa *ocapa)
1366 {
1367         struct ptlrpc_request *req;
1368         int                    rc;
1369         cfs_waitq_t            waitq;
1370         int                    resends = 0;
1371         struct l_wait_info     lwi;
1372
1373         ENTRY;
1374
1375         cfs_waitq_init(&waitq);
1376
1377 restart_bulk:
1378         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1379                                   page_count, pga, &req, ocapa);
1380         if (rc != 0)
1381                 return (rc);
1382
1383         rc = ptlrpc_queue_wait(req);
1384
1385         if (rc == -ETIMEDOUT && req->rq_resend) {
1386                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1387                 ptlrpc_req_finished(req);
1388                 goto restart_bulk;
1389         }
1390
1391         rc = osc_brw_fini_request(req, rc);
1392
1393         ptlrpc_req_finished(req);
1394         if (osc_recoverable_error(rc)) {
1395                 resends++;
1396                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1397                         CERROR("too many resend retries, returning error\n");
1398                         RETURN(-EIO);
1399                 }
1400
1401                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1402                 l_wait_event(waitq, 0, &lwi);
1403
1404                 goto restart_bulk;
1405         }
1406         
1407         RETURN (rc);
1408 }
1409
1410 int osc_brw_redo_request(struct ptlrpc_request *request,
1411                          struct osc_brw_async_args *aa)
1412 {
1413         struct ptlrpc_request *new_req;
1414         struct ptlrpc_request_set *set = request->rq_set;
1415         struct osc_brw_async_args *new_aa;
1416         struct osc_async_page *oap;
1417         int rc = 0;
1418         ENTRY;
1419
1420         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1421                 CERROR("too many resend retries, returning error\n");
1422                 RETURN(-EIO);
1423         }
1424         
1425         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1426 /*
1427         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1428         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1429                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1430                                            REQ_REC_OFF + 3);
1431 */
1432         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1433                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1434                                   aa->aa_cli, aa->aa_oa,
1435                                   NULL /* lsm unused by osc currently */,
1436                                   aa->aa_page_count, aa->aa_ppga, 
1437                                   &new_req, NULL /* ocapa */);
1438         if (rc)
1439                 RETURN(rc);
1440
1441         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1442    
1443         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1444                 if (oap->oap_request != NULL) {
1445                         LASSERTF(request == oap->oap_request,
1446                                  "request %p != oap_request %p\n",
1447                                  request, oap->oap_request);
1448                         if (oap->oap_interrupted) {
1449                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1450                                 ptlrpc_req_finished(new_req);                        
1451                                 RETURN(-EINTR);
1452                         }
1453                 }
1454         }
1455         /* New request takes over pga and oaps from old request.
1456          * Note that copying a list_head doesn't work, need to move it... */
1457         aa->aa_resends++;
1458         new_req->rq_interpret_reply = request->rq_interpret_reply;
1459         new_req->rq_async_args = request->rq_async_args;
1460         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1461
1462         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1463
1464         INIT_LIST_HEAD(&new_aa->aa_oaps);
1465         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1466         INIT_LIST_HEAD(&aa->aa_oaps);
1467
1468         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1469                 if (oap->oap_request) {
1470                         ptlrpc_req_finished(oap->oap_request);
1471                         oap->oap_request = ptlrpc_request_addref(new_req);
1472                 }
1473         }
1474
1475         /* use ptlrpc_set_add_req is safe because interpret functions work 
1476          * in check_set context. only one way exist with access to request 
1477          * from different thread got -EINTR - this way protected with 
1478          * cl_loi_list_lock */
1479         ptlrpc_set_add_req(set, new_req);
1480
1481         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1482
1483         DEBUG_REQ(D_INFO, new_req, "new request");
1484         RETURN(0);
1485 }
1486
1487 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1488 {
1489         struct osc_brw_async_args *aa = data;
1490         int                        i;
1491         ENTRY;
1492
1493         rc = osc_brw_fini_request(req, rc);
1494         if (osc_recoverable_error(rc)) {
1495                 rc = osc_brw_redo_request(req, aa);
1496                 if (rc == 0)
1497                         RETURN(0);
1498         }
1499
1500         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1501         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1502                 aa->aa_cli->cl_w_in_flight--;
1503         else
1504                 aa->aa_cli->cl_r_in_flight--;
1505         for (i = 0; i < aa->aa_page_count; i++)
1506                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1507         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1508
1509         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1510
1511         RETURN(rc);
1512 }
1513
1514 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1515                           struct lov_stripe_md *lsm, obd_count page_count,
1516                           struct brw_page **pga, struct ptlrpc_request_set *set,
1517                           struct obd_capa *ocapa)
1518 {
1519         struct ptlrpc_request     *req;
1520         struct client_obd         *cli = &exp->exp_obd->u.cli;
1521         int                        rc, i;
1522         struct osc_brw_async_args *aa;
1523         ENTRY;
1524
1525         /* Consume write credits even if doing a sync write -
1526          * otherwise we may run out of space on OST due to grant. */
1527         if (cmd == OBD_BRW_WRITE) {
1528                 spin_lock(&cli->cl_loi_list_lock);
1529                 for (i = 0; i < page_count; i++) {
1530                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1531                                 osc_consume_write_grant(cli, pga[i]);
1532                 }
1533                 spin_unlock(&cli->cl_loi_list_lock);
1534         }
1535
1536         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1537                                   &req, ocapa);
1538
1539         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1540         if (cmd == OBD_BRW_READ) {
1541                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1542                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1543                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1544         } else {
1545                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1546                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1547                                  cli->cl_w_in_flight);
1548                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1549         }
1550
1551         if (rc == 0) {
1552                 req->rq_interpret_reply = brw_interpret;
1553                 ptlrpc_set_add_req(set, req);
1554                 client_obd_list_lock(&cli->cl_loi_list_lock);
1555                 if (cmd == OBD_BRW_READ)
1556                         cli->cl_r_in_flight++;
1557                 else
1558                         cli->cl_w_in_flight++;
1559                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1560         } else if (cmd == OBD_BRW_WRITE) {
1561                 client_obd_list_lock(&cli->cl_loi_list_lock);
1562                 for (i = 0; i < page_count; i++)
1563                         osc_release_write_grant(cli, pga[i], 0);
1564                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1565         }
1566         RETURN (rc);
1567 }
1568
1569 /*
1570  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1571  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1572  * fine for our small page arrays and doesn't require allocation.  its an
1573  * insertion sort that swaps elements that are strides apart, shrinking the
1574  * stride down until its '1' and the array is sorted.
1575  */
1576 static void sort_brw_pages(struct brw_page **array, int num)
1577 {
1578         int stride, i, j;
1579         struct brw_page *tmp;
1580
1581         if (num == 1)
1582                 return;
1583         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1584                 ;
1585
1586         do {
1587                 stride /= 3;
1588                 for (i = stride ; i < num ; i++) {
1589                         tmp = array[i];
1590                         j = i;
1591                         while (j >= stride && array[j - stride]->off > tmp->off) {
1592                                 array[j] = array[j - stride];
1593                                 j -= stride;
1594                         }
1595                         array[j] = tmp;
1596                 }
1597         } while (stride > 1);
1598 }
1599
1600 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1601 {
1602         int count = 1;
1603         int offset;
1604         int i = 0;
1605
1606         LASSERT (pages > 0);
1607         offset = pg[i]->off & ~CFS_PAGE_MASK;
1608
1609         for (;;) {
1610                 pages--;
1611                 if (pages == 0)         /* that's all */
1612                         return count;
1613
1614                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1615                         return count;   /* doesn't end on page boundary */
1616
1617                 i++;
1618                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1619                 if (offset != 0)        /* doesn't start on page boundary */
1620                         return count;
1621
1622                 count++;
1623         }
1624 }
1625
1626 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1627 {
1628         struct brw_page **ppga;
1629         int i;
1630
1631         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1632         if (ppga == NULL)
1633                 return NULL;
1634
1635         for (i = 0; i < count; i++)
1636                 ppga[i] = pga + i;
1637         return ppga;
1638 }
1639
1640 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1641 {
1642         LASSERT(ppga != NULL);
1643         OBD_FREE(ppga, sizeof(*ppga) * count);
1644 }
1645
1646 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1647                    obd_count page_count, struct brw_page *pga,
1648                    struct obd_trans_info *oti)
1649 {
1650         struct obdo *saved_oa = NULL;
1651         struct brw_page **ppga, **orig;
1652         struct obd_import *imp = class_exp2cliimp(exp);
1653         struct client_obd *cli = &imp->imp_obd->u.cli;
1654         int rc, page_count_orig;
1655         ENTRY;
1656
1657         if (cmd & OBD_BRW_CHECK) {
1658                 /* The caller just wants to know if there's a chance that this
1659                  * I/O can succeed */
1660
1661                 if (imp == NULL || imp->imp_invalid)
1662                         RETURN(-EIO);
1663                 RETURN(0);
1664         }
1665
1666         /* test_brw with a failed create can trip this, maybe others. */
1667         LASSERT(cli->cl_max_pages_per_rpc);
1668
1669         rc = 0;
1670
1671         orig = ppga = osc_build_ppga(pga, page_count);
1672         if (ppga == NULL)
1673                 RETURN(-ENOMEM);
1674         page_count_orig = page_count;
1675
1676         sort_brw_pages(ppga, page_count);
1677         while (page_count) {
1678                 obd_count pages_per_brw;
1679
1680                 if (page_count > cli->cl_max_pages_per_rpc)
1681                         pages_per_brw = cli->cl_max_pages_per_rpc;
1682                 else
1683                         pages_per_brw = page_count;
1684
1685                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1686
1687                 if (saved_oa != NULL) {
1688                         /* restore previously saved oa */
1689                         *oinfo->oi_oa = *saved_oa;
1690                 } else if (page_count > pages_per_brw) {
1691                         /* save a copy of oa (brw will clobber it) */
1692                         OBDO_ALLOC(saved_oa);
1693                         if (saved_oa == NULL)
1694                                 GOTO(out, rc = -ENOMEM);
1695                         *saved_oa = *oinfo->oi_oa;
1696                 }
1697
1698                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1699                                       pages_per_brw, ppga, oinfo->oi_capa);
1700
1701                 if (rc != 0)
1702                         break;
1703
1704                 page_count -= pages_per_brw;
1705                 ppga += pages_per_brw;
1706         }
1707
1708 out:
1709         osc_release_ppga(orig, page_count_orig);
1710
1711         if (saved_oa != NULL)
1712                 OBDO_FREE(saved_oa);
1713
1714         RETURN(rc);
1715 }
1716
1717 static int osc_brw_async(int cmd, struct obd_export *exp,
1718                          struct obd_info *oinfo, obd_count page_count,
1719                          struct brw_page *pga, struct obd_trans_info *oti,
1720                          struct ptlrpc_request_set *set)
1721 {
1722         struct brw_page **ppga, **orig;
1723         struct client_obd *cli = &exp->exp_obd->u.cli;
1724         int page_count_orig;
1725         int rc = 0;
1726         ENTRY;
1727
1728         if (cmd & OBD_BRW_CHECK) {
1729                 struct obd_import *imp = class_exp2cliimp(exp);
1730                 /* The caller just wants to know if there's a chance that this
1731                  * I/O can succeed */
1732
1733                 if (imp == NULL || imp->imp_invalid)
1734                         RETURN(-EIO);
1735                 RETURN(0);
1736         }
1737
1738         orig = ppga = osc_build_ppga(pga, page_count);
1739         if (ppga == NULL)
1740                 RETURN(-ENOMEM);
1741         page_count_orig = page_count;
1742
1743         sort_brw_pages(ppga, page_count);
1744         while (page_count) {
1745                 struct brw_page **copy;
1746                 obd_count pages_per_brw;
1747
1748                 pages_per_brw = min_t(obd_count, page_count,
1749                                       cli->cl_max_pages_per_rpc);
1750
1751                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1752
1753                 /* use ppga only if single RPC is going to fly */
1754                 if (pages_per_brw != page_count_orig || ppga != orig) {
1755                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1756                         if (copy == NULL)
1757                                 GOTO(out, rc = -ENOMEM);
1758                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1759                 } else
1760                         copy = ppga;
1761
1762                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1763                                     pages_per_brw, copy, set, oinfo->oi_capa);
1764
1765                 if (rc != 0) {
1766                         if (copy != ppga)
1767                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1768                         break;
1769                 }
1770                 if (copy == orig) {
1771                         /* we passed it to async_internal() which is
1772                          * now responsible for releasing memory */
1773                         orig = NULL;
1774                 }
1775
1776                 page_count -= pages_per_brw;
1777                 ppga += pages_per_brw;
1778         }
1779 out:
1780         if (orig)
1781                 osc_release_ppga(orig, page_count_orig);
1782         RETURN(rc);
1783 }
1784
1785 static void osc_check_rpcs(struct client_obd *cli);
1786
1787 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1788  * the dirty accounting.  Writeback completes or truncate happens before
1789  * writing starts.  Must be called with the loi lock held. */
1790 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1791                            int sent)
1792 {
1793         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1794 }
1795
1796
1797 /* This maintains the lists of pending pages to read/write for a given object
1798  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1799  * to quickly find objects that are ready to send an RPC. */
1800 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1801                          int cmd)
1802 {
1803         int optimal;
1804         ENTRY;
1805
1806         if (lop->lop_num_pending == 0)
1807                 RETURN(0);
1808
1809         /* if we have an invalid import we want to drain the queued pages
1810          * by forcing them through rpcs that immediately fail and complete
1811          * the pages.  recovery relies on this to empty the queued pages
1812          * before canceling the locks and evicting down the llite pages */
1813         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1814                 RETURN(1);
1815
1816         /* stream rpcs in queue order as long as as there is an urgent page
1817          * queued.  this is our cheap solution for good batching in the case
1818          * where writepage marks some random page in the middle of the file
1819          * as urgent because of, say, memory pressure */
1820         if (!list_empty(&lop->lop_urgent)) {
1821                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1822                 RETURN(1);
1823         }
1824         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1825         optimal = cli->cl_max_pages_per_rpc;
1826         if (cmd & OBD_BRW_WRITE) {
1827                 /* trigger a write rpc stream as long as there are dirtiers
1828                  * waiting for space.  as they're waiting, they're not going to
1829                  * create more pages to coallesce with what's waiting.. */
1830                 if (!list_empty(&cli->cl_cache_waiters)) {
1831                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1832                         RETURN(1);
1833                 }
1834                 /* +16 to avoid triggering rpcs that would want to include pages
1835                  * that are being queued but which can't be made ready until
1836                  * the queuer finishes with the page. this is a wart for
1837                  * llite::commit_write() */
1838                 optimal += 16;
1839         }
1840         if (lop->lop_num_pending >= optimal)
1841                 RETURN(1);
1842
1843         RETURN(0);
1844 }
1845
1846 static void on_list(struct list_head *item, struct list_head *list,
1847                     int should_be_on)
1848 {
1849         if (list_empty(item) && should_be_on)
1850                 list_add_tail(item, list);
1851         else if (!list_empty(item) && !should_be_on)
1852                 list_del_init(item);
1853 }
1854
1855 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1856  * can find pages to build into rpcs quickly */
1857 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1858 {
1859         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1860                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1861                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1862
1863         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1864                 loi->loi_write_lop.lop_num_pending);
1865
1866         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1867                 loi->loi_read_lop.lop_num_pending);
1868 }
1869
1870 static void lop_update_pending(struct client_obd *cli,
1871                                struct loi_oap_pages *lop, int cmd, int delta)
1872 {
1873         lop->lop_num_pending += delta;
1874         if (cmd & OBD_BRW_WRITE)
1875                 cli->cl_pending_w_pages += delta;
1876         else
1877                 cli->cl_pending_r_pages += delta;
1878 }
1879
1880 /* this is called when a sync waiter receives an interruption.  Its job is to
1881  * get the caller woken as soon as possible.  If its page hasn't been put in an
1882  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1883  * desiring interruption which will forcefully complete the rpc once the rpc
1884  * has timed out */
1885 static void osc_occ_interrupted(struct oig_callback_context *occ)
1886 {
1887         struct osc_async_page *oap;
1888         struct loi_oap_pages *lop;
1889         struct lov_oinfo *loi;
1890         ENTRY;
1891
1892         /* XXX member_of() */
1893         oap = list_entry(occ, struct osc_async_page, oap_occ);
1894
1895         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1896
1897         oap->oap_interrupted = 1;
1898
1899         /* ok, it's been put in an rpc. only one oap gets a request reference */
1900         if (oap->oap_request != NULL) {
1901                 ptlrpc_mark_interrupted(oap->oap_request);
1902                 ptlrpcd_wake(oap->oap_request);
1903                 GOTO(unlock, 0);
1904         }
1905
1906         /* we don't get interruption callbacks until osc_trigger_group_io()
1907          * has been called and put the sync oaps in the pending/urgent lists.*/
1908         if (!list_empty(&oap->oap_pending_item)) {
1909                 list_del_init(&oap->oap_pending_item);
1910                 list_del_init(&oap->oap_urgent_item);
1911
1912                 loi = oap->oap_loi;
1913                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1914                         &loi->loi_write_lop : &loi->loi_read_lop;
1915                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1916                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1917
1918                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1919                 oap->oap_oig = NULL;
1920         }
1921
1922 unlock:
1923         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1924 }
1925
1926 /* this is trying to propogate async writeback errors back up to the
1927  * application.  As an async write fails we record the error code for later if
1928  * the app does an fsync.  As long as errors persist we force future rpcs to be
1929  * sync so that the app can get a sync error and break the cycle of queueing
1930  * pages for which writeback will fail. */
1931 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1932                            int rc)
1933 {
1934         if (rc) {
1935                 if (!ar->ar_rc)
1936                         ar->ar_rc = rc;
1937
1938                 ar->ar_force_sync = 1;
1939                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1940                 return;
1941
1942         }
1943
1944         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1945                 ar->ar_force_sync = 0;
1946 }
1947
1948 static void osc_oap_to_pending(struct osc_async_page *oap)
1949 {
1950         struct loi_oap_pages *lop;
1951
1952         if (oap->oap_cmd & OBD_BRW_WRITE)
1953                 lop = &oap->oap_loi->loi_write_lop;
1954         else
1955                 lop = &oap->oap_loi->loi_read_lop;
1956
1957         if (oap->oap_async_flags & ASYNC_URGENT)
1958                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1959         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1960         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1961 }
1962
1963 /* this must be called holding the loi list lock to give coverage to exit_cache,
1964  * async_flag maintenance, and oap_request */
1965 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1966                               struct osc_async_page *oap, int sent, int rc)
1967 {
1968         __u64 xid = 0;
1969
1970         ENTRY;
1971         if (oap->oap_request != NULL) {
1972                 xid = ptlrpc_req_xid(oap->oap_request);
1973                 ptlrpc_req_finished(oap->oap_request);
1974                 oap->oap_request = NULL;
1975         }
1976
1977         oap->oap_async_flags = 0;
1978         oap->oap_interrupted = 0;
1979
1980         if (oap->oap_cmd & OBD_BRW_WRITE) {
1981                 osc_process_ar(&cli->cl_ar, xid, rc);
1982                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1983         }
1984
1985         if (rc == 0 && oa != NULL) {
1986                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1987                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1988                 if (oa->o_valid & OBD_MD_FLMTIME)
1989                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1990                 if (oa->o_valid & OBD_MD_FLATIME)
1991                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1992                 if (oa->o_valid & OBD_MD_FLCTIME)
1993                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1994         }
1995
1996         if (oap->oap_oig) {
1997                 osc_exit_cache(cli, oap, sent);
1998                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1999                 oap->oap_oig = NULL;
2000                 EXIT;
2001                 return;
2002         }
2003
2004         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2005                                                 oap->oap_cmd, oa, rc);
2006
2007         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2008          * I/O on the page could start, but OSC calls it under lock
2009          * and thus we can add oap back to pending safely */
2010         if (rc)
2011                 /* upper layer wants to leave the page on pending queue */
2012                 osc_oap_to_pending(oap);
2013         else
2014                 osc_exit_cache(cli, oap, sent);
2015         EXIT;
2016 }
2017
2018 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2019 {
2020         struct osc_async_page *oap, *tmp;
2021         struct osc_brw_async_args *aa = data;
2022         struct client_obd *cli;
2023         ENTRY;
2024
2025         rc = osc_brw_fini_request(req, rc);
2026         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2027         if (osc_recoverable_error(rc)) {
2028                 rc = osc_brw_redo_request(req, aa);
2029                 if (rc == 0)
2030                         RETURN(0);
2031         }
2032
2033         cli = aa->aa_cli;
2034
2035         client_obd_list_lock(&cli->cl_loi_list_lock);
2036
2037         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2038          * is called so we know whether to go to sync BRWs or wait for more
2039          * RPCs to complete */
2040         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2041                 cli->cl_w_in_flight--;
2042         else
2043                 cli->cl_r_in_flight--;
2044
2045         /* the caller may re-use the oap after the completion call so
2046          * we need to clean it up a little */
2047         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2048                 list_del_init(&oap->oap_rpc_item);
2049                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2050         }
2051
2052         osc_wake_cache_waiters(cli);
2053         osc_check_rpcs(cli);
2054
2055         client_obd_list_unlock(&cli->cl_loi_list_lock);
2056
2057         OBDO_FREE(aa->aa_oa);
2058         
2059         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2060         RETURN(rc);
2061 }
2062
2063 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2064                                             struct list_head *rpc_list,
2065                                             int page_count, int cmd)
2066 {
2067         struct ptlrpc_request *req;
2068         struct brw_page **pga = NULL;
2069         struct osc_brw_async_args *aa;
2070         struct obdo *oa = NULL;
2071         struct obd_async_page_ops *ops = NULL;
2072         void *caller_data = NULL;
2073         struct obd_capa *ocapa;
2074         struct osc_async_page *oap;
2075         int i, rc;
2076
2077         ENTRY;
2078         LASSERT(!list_empty(rpc_list));
2079
2080         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2081         if (pga == NULL)
2082                 RETURN(ERR_PTR(-ENOMEM));
2083
2084         OBDO_ALLOC(oa);
2085         if (oa == NULL)
2086                 GOTO(out, req = ERR_PTR(-ENOMEM));
2087
2088         i = 0;
2089         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2090                 if (ops == NULL) {
2091                         ops = oap->oap_caller_ops;
2092                         caller_data = oap->oap_caller_data;
2093                 }
2094                 pga[i] = &oap->oap_brw_page;
2095                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2096                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2097                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2098                 i++;
2099         }
2100
2101         /* always get the data for the obdo for the rpc */
2102         LASSERT(ops != NULL);
2103         ops->ap_fill_obdo(caller_data, cmd, oa);
2104         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2105
2106         sort_brw_pages(pga, page_count);
2107         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2108                                   pga, &req, ocapa);
2109         capa_put(ocapa);
2110         if (rc != 0) {
2111                 CERROR("prep_req failed: %d\n", rc);
2112                 GOTO(out, req = ERR_PTR(rc));
2113         }
2114
2115         /* Need to update the timestamps after the request is built in case
2116          * we race with setattr (locally or in queue at OST).  If OST gets
2117          * later setattr before earlier BRW (as determined by the request xid),
2118          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2119          * way to do this in a single call.  bug 10150 */
2120         ops->ap_update_obdo(caller_data, cmd, oa,
2121                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2122
2123         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2124         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2125         INIT_LIST_HEAD(&aa->aa_oaps);
2126         list_splice(rpc_list, &aa->aa_oaps);
2127         INIT_LIST_HEAD(rpc_list);
2128
2129 out:
2130         if (IS_ERR(req)) {
2131                 if (oa)
2132                         OBDO_FREE(oa);
2133                 if (pga)
2134                         OBD_FREE(pga, sizeof(*pga) * page_count);
2135         }
2136         RETURN(req);
2137 }
2138
2139 /* the loi lock is held across this function but it's allowed to release
2140  * and reacquire it during its work */
2141 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2142                             int cmd, struct loi_oap_pages *lop)
2143 {
2144         struct ptlrpc_request *req;
2145         obd_count page_count = 0;
2146         struct osc_async_page *oap = NULL, *tmp;
2147         struct osc_brw_async_args *aa;
2148         struct obd_async_page_ops *ops;
2149         CFS_LIST_HEAD(rpc_list);
2150         unsigned int ending_offset;
2151         unsigned  starting_offset = 0;
2152         ENTRY;
2153
2154         /* first we find the pages we're allowed to work with */
2155         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2156                                  oap_pending_item) {
2157                 ops = oap->oap_caller_ops;
2158
2159                 LASSERT(oap->oap_magic == OAP_MAGIC);
2160
2161                 /* in llite being 'ready' equates to the page being locked
2162                  * until completion unlocks it.  commit_write submits a page
2163                  * as not ready because its unlock will happen unconditionally
2164                  * as the call returns.  if we race with commit_write giving
2165                  * us that page we dont' want to create a hole in the page
2166                  * stream, so we stop and leave the rpc to be fired by
2167                  * another dirtier or kupdated interval (the not ready page
2168                  * will still be on the dirty list).  we could call in
2169                  * at the end of ll_file_write to process the queue again. */
2170                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2171                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2172                         if (rc < 0)
2173                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2174                                                 "instead of ready\n", oap,
2175                                                 oap->oap_page, rc);
2176                         switch (rc) {
2177                         case -EAGAIN:
2178                                 /* llite is telling us that the page is still
2179                                  * in commit_write and that we should try
2180                                  * and put it in an rpc again later.  we
2181                                  * break out of the loop so we don't create
2182                                  * a hole in the sequence of pages in the rpc
2183                                  * stream.*/
2184                                 oap = NULL;
2185                                 break;
2186                         case -EINTR:
2187                                 /* the io isn't needed.. tell the checks
2188                                  * below to complete the rpc with EINTR */
2189                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2190                                 oap->oap_count = -EINTR;
2191                                 break;
2192                         case 0:
2193                                 oap->oap_async_flags |= ASYNC_READY;
2194                                 break;
2195                         default:
2196                                 LASSERTF(0, "oap %p page %p returned %d "
2197                                             "from make_ready\n", oap,
2198                                             oap->oap_page, rc);
2199                                 break;
2200                         }
2201                 }
2202                 if (oap == NULL)
2203                         break;
2204                 /*
2205                  * Page submitted for IO has to be locked. Either by
2206                  * ->ap_make_ready() or by higher layers.
2207                  *
2208                  * XXX nikita: this assertion should be adjusted when lustre
2209                  * starts using PG_writeback for pages being written out.
2210                  */
2211 #if defined(__KERNEL__) && defined(__linux__)
2212                 LASSERT(PageLocked(oap->oap_page));
2213 #endif
2214                 /* If there is a gap at the start of this page, it can't merge
2215                  * with any previous page, so we'll hand the network a
2216                  * "fragmented" page array that it can't transfer in 1 RDMA */
2217                 if (page_count != 0 && oap->oap_page_off != 0)
2218                         break;
2219
2220                 /* take the page out of our book-keeping */
2221                 list_del_init(&oap->oap_pending_item);
2222                 lop_update_pending(cli, lop, cmd, -1);
2223                 list_del_init(&oap->oap_urgent_item);
2224
2225                 if (page_count == 0)
2226                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2227                                           (PTLRPC_MAX_BRW_SIZE - 1);
2228
2229                 /* ask the caller for the size of the io as the rpc leaves. */
2230                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2231                         oap->oap_count =
2232                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2233                 if (oap->oap_count <= 0) {
2234                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2235                                oap->oap_count);
2236                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2237                         continue;
2238                 }
2239
2240                 /* now put the page back in our accounting */
2241                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2242                 if (++page_count >= cli->cl_max_pages_per_rpc)
2243                         break;
2244
2245                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2246                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2247                  * have the same alignment as the initial writes that allocated
2248                  * extents on the server. */
2249                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2250                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2251                 if (ending_offset == 0)
2252                         break;
2253
2254                 /* If there is a gap at the end of this page, it can't merge
2255                  * with any subsequent pages, so we'll hand the network a
2256                  * "fragmented" page array that it can't transfer in 1 RDMA */
2257                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2258                         break;
2259         }
2260
2261         osc_wake_cache_waiters(cli);
2262
2263         if (page_count == 0)
2264                 RETURN(0);
2265
2266         loi_list_maint(cli, loi);
2267
2268         client_obd_list_unlock(&cli->cl_loi_list_lock);
2269
2270         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2271         if (IS_ERR(req)) {
2272                 /* this should happen rarely and is pretty bad, it makes the
2273                  * pending list not follow the dirty order */
2274                 client_obd_list_lock(&cli->cl_loi_list_lock);
2275                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2276                         list_del_init(&oap->oap_rpc_item);
2277
2278                         /* queued sync pages can be torn down while the pages
2279                          * were between the pending list and the rpc */
2280                         if (oap->oap_interrupted) {
2281                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2282                                 osc_ap_completion(cli, NULL, oap, 0,
2283                                                   oap->oap_count);
2284                                 continue;
2285                         }
2286                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2287                 }
2288                 loi_list_maint(cli, loi);
2289                 RETURN(PTR_ERR(req));
2290         }
2291
2292         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2293
2294         if (cmd == OBD_BRW_READ) {
2295                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2296                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2297                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2298                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2299                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2300         } else {
2301                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2302                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2303                                  cli->cl_w_in_flight);
2304                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2305                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2306                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2307         }
2308
2309         client_obd_list_lock(&cli->cl_loi_list_lock);
2310
2311         if (cmd == OBD_BRW_READ)
2312                 cli->cl_r_in_flight++;
2313         else
2314                 cli->cl_w_in_flight++;
2315
2316         /* queued sync pages can be torn down while the pages
2317          * were between the pending list and the rpc */
2318         tmp = NULL;
2319         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2320                 /* only one oap gets a request reference */
2321                 if (tmp == NULL)
2322                         tmp = oap;
2323                 if (oap->oap_interrupted && !req->rq_intr) {
2324                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2325                                oap, req);
2326                         ptlrpc_mark_interrupted(req);
2327                 }
2328         }
2329         if (tmp != NULL)
2330                 tmp->oap_request = ptlrpc_request_addref(req);
2331
2332         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2333                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2334
2335         req->rq_interpret_reply = brw_interpret_oap;
2336         ptlrpcd_add_req(req);
2337         RETURN(1);
2338 }
2339
2340 #define LOI_DEBUG(LOI, STR, args...)                                     \
2341         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2342                !list_empty(&(LOI)->loi_cli_item),                        \
2343                (LOI)->loi_write_lop.lop_num_pending,                     \
2344                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2345                (LOI)->loi_read_lop.lop_num_pending,                      \
2346                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2347                args)                                                     \
2348
2349 /* This is called by osc_check_rpcs() to find which objects have pages that
2350  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2351 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2352 {
2353         ENTRY;
2354         /* first return all objects which we already know to have
2355          * pages ready to be stuffed into rpcs */
2356         if (!list_empty(&cli->cl_loi_ready_list))
2357                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2358                                   struct lov_oinfo, loi_cli_item));
2359
2360         /* then if we have cache waiters, return all objects with queued
2361          * writes.  This is especially important when many small files
2362          * have filled up the cache and not been fired into rpcs because
2363          * they don't pass the nr_pending/object threshhold */
2364         if (!list_empty(&cli->cl_cache_waiters) &&
2365             !list_empty(&cli->cl_loi_write_list))
2366                 RETURN(list_entry(cli->cl_loi_write_list.next,
2367                                   struct lov_oinfo, loi_write_item));
2368
2369         /* then return all queued objects when we have an invalid import
2370          * so that they get flushed */
2371         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2372                 if (!list_empty(&cli->cl_loi_write_list))
2373                         RETURN(list_entry(cli->cl_loi_write_list.next,
2374                                           struct lov_oinfo, loi_write_item));
2375                 if (!list_empty(&cli->cl_loi_read_list))
2376                         RETURN(list_entry(cli->cl_loi_read_list.next,
2377                                           struct lov_oinfo, loi_read_item));
2378         }
2379         RETURN(NULL);
2380 }
2381
2382 /* called with the loi list lock held */
2383 static void osc_check_rpcs(struct client_obd *cli)
2384 {
2385         struct lov_oinfo *loi;
2386         int rc = 0, race_counter = 0;
2387         ENTRY;
2388
2389         while ((loi = osc_next_loi(cli)) != NULL) {
2390                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2391
2392                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2393                         break;
2394
2395                 /* attempt some read/write balancing by alternating between
2396                  * reads and writes in an object.  The makes_rpc checks here
2397                  * would be redundant if we were getting read/write work items
2398                  * instead of objects.  we don't want send_oap_rpc to drain a
2399                  * partial read pending queue when we're given this object to
2400                  * do io on writes while there are cache waiters */
2401                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2402                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2403                                               &loi->loi_write_lop);
2404                         if (rc < 0)
2405                                 break;
2406                         if (rc > 0)
2407                                 race_counter = 0;
2408                         else
2409                                 race_counter++;
2410                 }
2411                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2412                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2413                                               &loi->loi_read_lop);
2414                         if (rc < 0)
2415                                 break;
2416                         if (rc > 0)
2417                                 race_counter = 0;
2418                         else
2419                                 race_counter++;
2420                 }
2421
2422                 /* attempt some inter-object balancing by issueing rpcs
2423                  * for each object in turn */
2424                 if (!list_empty(&loi->loi_cli_item))
2425                         list_del_init(&loi->loi_cli_item);
2426                 if (!list_empty(&loi->loi_write_item))
2427                         list_del_init(&loi->loi_write_item);
2428                 if (!list_empty(&loi->loi_read_item))
2429                         list_del_init(&loi->loi_read_item);
2430
2431                 loi_list_maint(cli, loi);
2432
2433                 /* send_oap_rpc fails with 0 when make_ready tells it to
2434                  * back off.  llite's make_ready does this when it tries
2435                  * to lock a page queued for write that is already locked.
2436                  * we want to try sending rpcs from many objects, but we
2437                  * don't want to spin failing with 0.  */
2438                 if (race_counter == 10)
2439                         break;
2440         }
2441         EXIT;
2442 }
2443
2444 /* we're trying to queue a page in the osc so we're subject to the
2445  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2446  * If the osc's queued pages are already at that limit, then we want to sleep
2447  * until there is space in the osc's queue for us.  We also may be waiting for
2448  * write credits from the OST if there are RPCs in flight that may return some
2449  * before we fall back to sync writes.
2450  *
2451  * We need this know our allocation was granted in the presence of signals */
2452 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2453 {
2454         int rc;
2455         ENTRY;
2456         client_obd_list_lock(&cli->cl_loi_list_lock);
2457         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2458         client_obd_list_unlock(&cli->cl_loi_list_lock);
2459         RETURN(rc);
2460 };
2461
2462 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2463  * grant or cache space. */
2464 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2465                            struct osc_async_page *oap)
2466 {
2467         struct osc_cache_waiter ocw;
2468         struct l_wait_info lwi = { 0 };
2469
2470         ENTRY;
2471
2472         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2473                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2474                cli->cl_dirty_max, obd_max_dirty_pages,
2475                cli->cl_lost_grant, cli->cl_avail_grant);
2476
2477         /* force the caller to try sync io.  this can jump the list
2478          * of queued writes and create a discontiguous rpc stream */
2479         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2480             loi->loi_ar.ar_force_sync)
2481                 RETURN(-EDQUOT);
2482
2483         /* Hopefully normal case - cache space and write credits available */
2484         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2485             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2486             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2487                 /* account for ourselves */
2488                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2489                 RETURN(0);
2490         }
2491
2492         /* Make sure that there are write rpcs in flight to wait for.  This
2493          * is a little silly as this object may not have any pending but
2494          * other objects sure might. */
2495         if (cli->cl_w_in_flight) {
2496                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2497                 cfs_waitq_init(&ocw.ocw_waitq);
2498                 ocw.ocw_oap = oap;
2499                 ocw.ocw_rc = 0;
2500
2501                 loi_list_maint(cli, loi);
2502                 osc_check_rpcs(cli);
2503                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2504
2505                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2506                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2507
2508                 client_obd_list_lock(&cli->cl_loi_list_lock);
2509                 if (!list_empty(&ocw.ocw_entry)) {
2510                         list_del(&ocw.ocw_entry);
2511                         RETURN(-EINTR);
2512                 }
2513                 RETURN(ocw.ocw_rc);
2514         }
2515
2516         RETURN(-EDQUOT);
2517 }
2518
2519 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2520                         struct lov_oinfo *loi, cfs_page_t *page,
2521                         obd_off offset, struct obd_async_page_ops *ops,
2522                         void *data, void **res)
2523 {
2524         struct osc_async_page *oap;
2525         ENTRY;
2526
2527         if (!page)
2528                 return size_round(sizeof(*oap));
2529
2530         oap = *res;
2531         oap->oap_magic = OAP_MAGIC;
2532         oap->oap_cli = &exp->exp_obd->u.cli;
2533         oap->oap_loi = loi;
2534
2535         oap->oap_caller_ops = ops;
2536         oap->oap_caller_data = data;
2537
2538         oap->oap_page = page;
2539         oap->oap_obj_off = offset;
2540
2541         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2542         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2543         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2544
2545         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2546
2547         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2548         RETURN(0);
2549 }
2550
2551 struct osc_async_page *oap_from_cookie(void *cookie)
2552 {
2553         struct osc_async_page *oap = cookie;
2554         if (oap->oap_magic != OAP_MAGIC)
2555                 return ERR_PTR(-EINVAL);
2556         return oap;
2557 };
2558
2559 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2560                               struct lov_oinfo *loi, void *cookie,
2561                               int cmd, obd_off off, int count,
2562                               obd_flag brw_flags, enum async_flags async_flags)
2563 {
2564         struct client_obd *cli = &exp->exp_obd->u.cli;
2565         struct osc_async_page *oap;
2566         int rc = 0;
2567         ENTRY;
2568
2569         oap = oap_from_cookie(cookie);
2570         if (IS_ERR(oap))
2571                 RETURN(PTR_ERR(oap));
2572
2573         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2574                 RETURN(-EIO);
2575
2576         if (!list_empty(&oap->oap_pending_item) ||
2577             !list_empty(&oap->oap_urgent_item) ||
2578             !list_empty(&oap->oap_rpc_item))
2579                 RETURN(-EBUSY);
2580
2581         /* check if the file's owner/group is over quota */
2582 #ifdef HAVE_QUOTA_SUPPORT
2583         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2584                 struct obd_async_page_ops *ops;
2585                 struct obdo *oa;
2586
2587                 OBDO_ALLOC(oa);
2588                 if (oa == NULL)
2589                         RETURN(-ENOMEM);
2590
2591                 ops = oap->oap_caller_ops;
2592                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2593                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2594                     NO_QUOTA)
2595                         rc = -EDQUOT;
2596
2597                 OBDO_FREE(oa);
2598                 if (rc)
2599                         RETURN(rc);
2600         }
2601 #endif
2602
2603         if (loi == NULL)
2604                 loi = lsm->lsm_oinfo[0];
2605
2606         client_obd_list_lock(&cli->cl_loi_list_lock);
2607
2608         oap->oap_cmd = cmd;
2609         oap->oap_page_off = off;
2610         oap->oap_count = count;
2611         oap->oap_brw_flags = brw_flags;
2612         oap->oap_async_flags = async_flags;
2613
2614         if (cmd & OBD_BRW_WRITE) {
2615                 rc = osc_enter_cache(cli, loi, oap);
2616                 if (rc) {
2617                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2618                         RETURN(rc);
2619                 }
2620         }
2621
2622         osc_oap_to_pending(oap);
2623         loi_list_maint(cli, loi);
2624
2625         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2626                   cmd);
2627
2628         osc_check_rpcs(cli);
2629         client_obd_list_unlock(&cli->cl_loi_list_lock);
2630
2631         RETURN(0);
2632 }
2633
2634 /* aka (~was & now & flag), but this is more clear :) */
2635 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2636
2637 static int osc_set_async_flags(struct obd_export *exp,
2638                                struct lov_stripe_md *lsm,
2639                                struct lov_oinfo *loi, void *cookie,
2640                                obd_flag async_flags)
2641 {
2642         struct client_obd *cli = &exp->exp_obd->u.cli;
2643         struct loi_oap_pages *lop;
2644         struct osc_async_page *oap;
2645         int rc = 0;
2646         ENTRY;
2647
2648         oap = oap_from_cookie(cookie);
2649         if (IS_ERR(oap))
2650                 RETURN(PTR_ERR(oap));
2651
2652         /*
2653          * bug 7311: OST-side locking is only supported for liblustre for now
2654          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2655          * implementation has to handle case where OST-locked page was picked
2656          * up by, e.g., ->writepage().
2657          */
2658         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2659         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2660                                      * tread here. */
2661
2662         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2663                 RETURN(-EIO);
2664
2665         if (loi == NULL)
2666                 loi = lsm->lsm_oinfo[0];
2667
2668         if (oap->oap_cmd & OBD_BRW_WRITE) {
2669                 lop = &loi->loi_write_lop;
2670         } else {
2671                 lop = &loi->loi_read_lop;
2672         }
2673
2674         client_obd_list_lock(&cli->cl_loi_list_lock);
2675
2676         if (list_empty(&oap->oap_pending_item))
2677                 GOTO(out, rc = -EINVAL);
2678
2679         if ((oap->oap_async_flags & async_flags) == async_flags)
2680                 GOTO(out, rc = 0);
2681
2682         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2683                 oap->oap_async_flags |= ASYNC_READY;
2684
2685         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2686                 if (list_empty(&oap->oap_rpc_item)) {
2687                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2688                         loi_list_maint(cli, loi);
2689                 }
2690         }
2691
2692         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2693                         oap->oap_async_flags);
2694 out:
2695         osc_check_rpcs(cli);
2696         client_obd_list_unlock(&cli->cl_loi_list_lock);
2697         RETURN(rc);
2698 }
2699
2700 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2701                              struct lov_oinfo *loi,
2702                              struct obd_io_group *oig, void *cookie,
2703                              int cmd, obd_off off, int count,
2704                              obd_flag brw_flags,
2705                              obd_flag async_flags)
2706 {
2707         struct client_obd *cli = &exp->exp_obd->u.cli;
2708         struct osc_async_page *oap;
2709         struct loi_oap_pages *lop;
2710         int rc = 0;
2711         ENTRY;
2712
2713         oap = oap_from_cookie(cookie);
2714         if (IS_ERR(oap))
2715                 RETURN(PTR_ERR(oap));
2716
2717         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2718                 RETURN(-EIO);
2719
2720         if (!list_empty(&oap->oap_pending_item) ||
2721             !list_empty(&oap->oap_urgent_item) ||
2722             !list_empty(&oap->oap_rpc_item))
2723                 RETURN(-EBUSY);
2724
2725         if (loi == NULL)
2726                 loi = lsm->lsm_oinfo[0];
2727
2728         client_obd_list_lock(&cli->cl_loi_list_lock);
2729
2730         oap->oap_cmd = cmd;
2731         oap->oap_page_off = off;
2732         oap->oap_count = count;
2733         oap->oap_brw_flags = brw_flags;
2734         oap->oap_async_flags = async_flags;
2735
2736         if (cmd & OBD_BRW_WRITE)
2737                 lop = &loi->loi_write_lop;
2738         else
2739                 lop = &loi->loi_read_lop;
2740
2741         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2742         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2743                 oap->oap_oig = oig;
2744                 rc = oig_add_one(oig, &oap->oap_occ);
2745         }
2746
2747         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2748                   oap, oap->oap_page, rc);
2749
2750         client_obd_list_unlock(&cli->cl_loi_list_lock);
2751
2752         RETURN(rc);
2753 }
2754
2755 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2756                                  struct loi_oap_pages *lop, int cmd)
2757 {
2758         struct list_head *pos, *tmp;
2759         struct osc_async_page *oap;
2760
2761         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2762                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2763                 list_del(&oap->oap_pending_item);
2764                 osc_oap_to_pending(oap);
2765         }
2766         loi_list_maint(cli, loi);
2767 }
2768
2769 static int osc_trigger_group_io(struct obd_export *exp,
2770                                 struct lov_stripe_md *lsm,
2771                                 struct lov_oinfo *loi,
2772                                 struct obd_io_group *oig)
2773 {
2774         struct client_obd *cli = &exp->exp_obd->u.cli;
2775         ENTRY;
2776
2777         if (loi == NULL)
2778                 loi = lsm->lsm_oinfo[0];
2779
2780         client_obd_list_lock(&cli->cl_loi_list_lock);
2781
2782         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2783         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2784
2785         osc_check_rpcs(cli);
2786         client_obd_list_unlock(&cli->cl_loi_list_lock);
2787
2788         RETURN(0);
2789 }
2790
2791 static int osc_teardown_async_page(struct obd_export *exp,
2792                                    struct lov_stripe_md *lsm,
2793                                    struct lov_oinfo *loi, void *cookie)
2794 {
2795         struct client_obd *cli = &exp->exp_obd->u.cli;
2796         struct loi_oap_pages *lop;
2797         struct osc_async_page *oap;
2798         int rc = 0;
2799         ENTRY;
2800
2801         oap = oap_from_cookie(cookie);
2802         if (IS_ERR(oap))
2803                 RETURN(PTR_ERR(oap));
2804
2805         if (loi == NULL)
2806                 loi = lsm->lsm_oinfo[0];
2807
2808         if (oap->oap_cmd & OBD_BRW_WRITE) {
2809                 lop = &loi->loi_write_lop;
2810         } else {
2811                 lop = &loi->loi_read_lop;
2812         }
2813
2814         client_obd_list_lock(&cli->cl_loi_list_lock);
2815
2816         if (!list_empty(&oap->oap_rpc_item))
2817                 GOTO(out, rc = -EBUSY);
2818
2819         osc_exit_cache(cli, oap, 0);
2820         osc_wake_cache_waiters(cli);
2821
2822         if (!list_empty(&oap->oap_urgent_item)) {
2823                 list_del_init(&oap->oap_urgent_item);
2824                 oap->oap_async_flags &= ~ASYNC_URGENT;
2825         }
2826         if (!list_empty(&oap->oap_pending_item)) {
2827                 list_del_init(&oap->oap_pending_item);
2828                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2829         }
2830         loi_list_maint(cli, loi);
2831
2832         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2833 out:
2834         client_obd_list_unlock(&cli->cl_loi_list_lock);
2835         RETURN(rc);
2836 }
2837
2838 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2839                                     int flags)
2840 {
2841         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2842
2843         if (lock == NULL) {
2844                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2845                 return;
2846         }
2847         lock_res_and_lock(lock);
2848 #if defined (__KERNEL__) && defined (__linux__)
2849         /* Liang XXX: Darwin and Winnt checking should be added */
2850         if (lock->l_ast_data && lock->l_ast_data != data) {
2851                 struct inode *new_inode = data;
2852                 struct inode *old_inode = lock->l_ast_data;
2853                 if (!(old_inode->i_state & I_FREEING))
2854                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2855                 LASSERTF(old_inode->i_state & I_FREEING,
2856                          "Found existing inode %p/%lu/%u state %lu in lock: "
2857                          "setting data to %p/%lu/%u\n", old_inode,
2858                          old_inode->i_ino, old_inode->i_generation,
2859                          old_inode->i_state,
2860                          new_inode, new_inode->i_ino, new_inode->i_generation);
2861         }
2862 #endif
2863         lock->l_ast_data = data;
2864         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2865         unlock_res_and_lock(lock);
2866         LDLM_LOCK_PUT(lock);
2867 }
2868
2869 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2870                              ldlm_iterator_t replace, void *data)
2871 {
2872         struct ldlm_res_id res_id = { .name = {0} };
2873         struct obd_device *obd = class_exp2obd(exp);
2874
2875         res_id.name[0] = lsm->lsm_object_id;
2876         res_id.name[2] = lsm->lsm_object_gr;
2877
2878         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2879         return 0;
2880 }
2881
2882 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2883                             int intent, int rc)
2884 {
2885         ENTRY;
2886
2887         if (intent) {
2888                 /* The request was created before ldlm_cli_enqueue call. */
2889                 if (rc == ELDLM_LOCK_ABORTED) {
2890                         struct ldlm_reply *rep;
2891                         rep = req_capsule_server_get(&req->rq_pill,
2892                                                      &RMF_DLM_REP);
2893
2894                         LASSERT(rep != NULL);
2895                         if (rep->lock_policy_res1)
2896                                 rc = rep->lock_policy_res1;
2897                 }
2898         }
2899
2900         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2901                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2902                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2903                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2904                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2905         }
2906
2907         /* Call the update callback. */
2908         rc = oinfo->oi_cb_up(oinfo, rc);
2909         RETURN(rc);
2910 }
2911
2912 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2913                                  struct osc_enqueue_args *aa, int rc)
2914 {
2915         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2916         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2917         struct ldlm_lock *lock;
2918
2919         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2920          * be valid. */
2921         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2922
2923         /* Complete obtaining the lock procedure. */
2924         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2925                                    aa->oa_ei->ei_mode,
2926                                    &aa->oa_oi->oi_flags,
2927                                    &lsm->lsm_oinfo[0]->loi_lvb,
2928                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2929                                    lustre_swab_ost_lvb,
2930                                    aa->oa_oi->oi_lockh, rc);
2931
2932         /* Complete osc stuff. */
2933         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2934
2935         /* Release the lock for async request. */
2936         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2937                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2938
2939         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2940                  aa->oa_oi->oi_lockh, req, aa);
2941         LDLM_LOCK_PUT(lock);
2942         return rc;
2943 }
2944
2945 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2946  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2947  * other synchronous requests, however keeping some locks and trying to obtain
2948  * others may take a considerable amount of time in a case of ost failure; and
2949  * when other sync requests do not get released lock from a client, the client
2950  * is excluded from the cluster -- such scenarious make the life difficult, so
2951  * release locks just after they are obtained. */
2952 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2953                        struct ldlm_enqueue_info *einfo,
2954                        struct ptlrpc_request_set *rqset)
2955 {
2956         struct ldlm_res_id res_id = { .name = {0} };
2957         struct obd_device *obd = exp->exp_obd;
2958         struct ptlrpc_request *req = NULL;
2959         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2960         ldlm_mode_t mode;
2961         int rc;
2962         ENTRY;
2963
2964         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2965         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2966
2967         /* Filesystem lock extents are extended to page boundaries so that
2968          * dealing with the page cache is a little smoother.  */
2969         oinfo->oi_policy.l_extent.start -=
2970                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2971         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2972
2973         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2974                 goto no_match;
2975
2976         /* Next, search for already existing extent locks that will cover us */
2977         /* If we're trying to read, we also search for an existing PW lock.  The
2978          * VFS and page cache already protect us locally, so lots of readers/
2979          * writers can share a single PW lock.
2980          *
2981          * There are problems with conversion deadlocks, so instead of
2982          * converting a read lock to a write lock, we'll just enqueue a new
2983          * one.
2984          *
2985          * At some point we should cancel the read lock instead of making them
2986          * send us a blocking callback, but there are problems with canceling
2987          * locks out from other users right now, too. */
2988         mode = einfo->ei_mode;
2989         if (einfo->ei_mode == LCK_PR)
2990                 mode |= LCK_PW;
2991         mode = ldlm_lock_match(obd->obd_namespace,
2992                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2993                                einfo->ei_type, &oinfo->oi_policy, mode,
2994                                oinfo->oi_lockh);
2995         if (mode) {
2996                 /* addref the lock only if not async requests and PW lock is
2997                  * matched whereas we asked for PR. */
2998                 if (!rqset && einfo->ei_mode != mode)
2999                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3000                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3001                                         oinfo->oi_flags);
3002                 if (intent) {
3003                         /* I would like to be able to ASSERT here that rss <=
3004                          * kms, but I can't, for reasons which are explained in
3005                          * lov_enqueue() */
3006                 }
3007
3008                 /* We already have a lock, and it's referenced */
3009                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3010
3011                 /* For async requests, decref the lock. */
3012                 if (einfo->ei_mode != mode)
3013                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3014                 else if (rqset)
3015                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3016
3017                 RETURN(ELDLM_OK);
3018         }
3019
3020  no_match:
3021         if (intent) {
3022                 CFS_LIST_HEAD(cancels);
3023                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3024                                            &RQF_LDLM_ENQUEUE_LVB);
3025                 if (req == NULL)
3026                         RETURN(-ENOMEM);
3027
3028                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3029                 if (rc)
3030                         RETURN(rc);
3031
3032                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3033                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3034                 ptlrpc_request_set_replen(req);
3035         }
3036
3037         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3038         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3039
3040         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3041                               &oinfo->oi_policy, &oinfo->oi_flags,
3042                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3043                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3044                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3045                               rqset ? 1 : 0);
3046         if (rqset) {
3047                 if (!rc) {
3048                         struct osc_enqueue_args *aa;
3049                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3050                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3051                         aa->oa_oi = oinfo;
3052                         aa->oa_ei = einfo;
3053                         aa->oa_exp = exp;
3054
3055                         req->rq_interpret_reply = osc_enqueue_interpret;
3056                         ptlrpc_set_add_req(rqset, req);
3057                 } else if (intent) {
3058                         ptlrpc_req_finished(req);
3059                 }
3060                 RETURN(rc);
3061         }
3062
3063         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3064         if (intent)
3065                 ptlrpc_req_finished(req);
3066
3067         RETURN(rc);
3068 }
3069
3070 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3071                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3072                      int *flags, void *data, struct lustre_handle *lockh)
3073 {
3074         struct ldlm_res_id res_id = { .name = {0} };
3075         struct obd_device *obd = exp->exp_obd;
3076         int lflags = *flags;
3077         ldlm_mode_t rc;
3078         ENTRY;
3079
3080         res_id.name[0] = lsm->lsm_object_id;
3081         res_id.name[2] = lsm->lsm_object_gr;
3082
3083         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3084                 RETURN(-EIO);
3085
3086         /* Filesystem lock extents are extended to page boundaries so that
3087          * dealing with the page cache is a little smoother */
3088         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3089         policy->l_extent.end |= ~CFS_PAGE_MASK;
3090
3091         /* Next, search for already existing extent locks that will cover us */
3092         /* If we're trying to read, we also search for an existing PW lock.  The
3093          * VFS and page cache already protect us locally, so lots of readers/
3094          * writers can share a single PW lock. */
3095         rc = mode;
3096         if (mode == LCK_PR)
3097                 rc |= LCK_PW;
3098         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3099                              &res_id, type, policy, rc, lockh);
3100         if (rc) {
3101                 osc_set_data_with_check(lockh, data, lflags);
3102                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3103                         ldlm_lock_addref(lockh, LCK_PR);
3104                         ldlm_lock_decref(lockh, LCK_PW);
3105                 }
3106                 RETURN(rc);
3107         }
3108         RETURN(rc);
3109 }
3110
3111 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3112                       __u32 mode, struct lustre_handle *lockh)
3113 {
3114         ENTRY;
3115
3116         if (unlikely(mode == LCK_GROUP))
3117                 ldlm_lock_decref_and_cancel(lockh, mode);
3118         else
3119                 ldlm_lock_decref(lockh, mode);
3120
3121         RETURN(0);
3122 }
3123
3124 static int osc_cancel_unused(struct obd_export *exp,
3125                              struct lov_stripe_md *lsm, int flags,
3126                              void *opaque)
3127 {
3128         struct obd_device *obd = class_exp2obd(exp);
3129         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3130
3131         if (lsm != NULL) {
3132                 res_id.name[0] = lsm->lsm_object_id;
3133                 res_id.name[2] = lsm->lsm_object_gr;
3134                 resp = &res_id;
3135         }
3136
3137         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3138 }
3139
3140 static int osc_join_lru(struct obd_export *exp,
3141                         struct lov_stripe_md *lsm, int join)
3142 {
3143         struct obd_device *obd = class_exp2obd(exp);
3144         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3145
3146         if (lsm != NULL) {
3147                 res_id.name[0] = lsm->lsm_object_id;
3148                 res_id.name[2] = lsm->lsm_object_gr;
3149                 resp = &res_id;
3150         }
3151
3152         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3153 }
3154
3155 static int osc_statfs_interpret(struct ptlrpc_request *req,
3156                                 struct osc_async_args *aa, int rc)
3157 {
3158         struct obd_statfs *msfs;
3159         ENTRY;
3160
3161         if (rc != 0)
3162                 GOTO(out, rc);
3163
3164         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3165         if (msfs == NULL) {
3166                 GOTO(out, rc = -EPROTO);
3167         }
3168
3169         *aa->aa_oi->oi_osfs = *msfs;
3170 out:
3171         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3172         RETURN(rc);
3173 }
3174
3175 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3176                             __u64 max_age, struct ptlrpc_request_set *rqset)
3177 {
3178         struct ptlrpc_request *req;
3179         struct osc_async_args *aa;
3180         int                    rc;
3181         ENTRY;
3182
3183         /* We could possibly pass max_age in the request (as an absolute
3184          * timestamp or a "seconds.usec ago") so the target can avoid doing
3185          * extra calls into the filesystem if that isn't necessary (e.g.
3186          * during mount that would help a bit).  Having relative timestamps
3187          * is not so great if request processing is slow, while absolute
3188          * timestamps are not ideal because they need time synchronization. */
3189         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3190         if (req == NULL)
3191                 RETURN(-ENOMEM);
3192
3193         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3194         if (rc) {
3195                 ptlrpc_request_free(req);
3196                 RETURN(rc);
3197         }
3198         ptlrpc_request_set_replen(req);
3199         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3200
3201         req->rq_interpret_reply = osc_statfs_interpret;
3202         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3203         aa = (struct osc_async_args *)&req->rq_async_args;
3204         aa->aa_oi = oinfo;
3205
3206         ptlrpc_set_add_req(rqset, req);
3207         RETURN(0);
3208 }
3209
3210 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3211                       __u64 max_age)
3212 {
3213         struct obd_statfs     *msfs;
3214         struct ptlrpc_request *req;
3215         int rc;
3216         ENTRY;
3217
3218         /* We could possibly pass max_age in the request (as an absolute
3219          * timestamp or a "seconds.usec ago") so the target can avoid doing
3220          * extra calls into the filesystem if that isn't necessary (e.g.
3221          * during mount that would help a bit).  Having relative timestamps
3222          * is not so great if request processing is slow, while absolute
3223          * timestamps are not ideal because they need time synchronization. */
3224         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3225         if (req == NULL)
3226                 RETURN(-ENOMEM);
3227
3228         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3229         if (rc) {
3230                 ptlrpc_request_free(req);
3231                 RETURN(rc);
3232         }
3233         ptlrpc_request_set_replen(req);
3234         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3235
3236         rc = ptlrpc_queue_wait(req);
3237         if (rc)
3238                 GOTO(out, rc);
3239
3240         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3241         if (msfs == NULL) {
3242                 GOTO(out, rc = -EPROTO);
3243         }
3244
3245         *osfs = *msfs;
3246
3247         EXIT;
3248  out:
3249         ptlrpc_req_finished(req);
3250         return rc;
3251 }
3252
3253 /* Retrieve object striping information.
3254  *
3255  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3256  * the maximum number of OST indices which will fit in the user buffer.
3257  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3258  */
3259 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3260 {
3261         struct lov_user_md lum, *lumk;
3262         int rc = 0, lum_size;
3263         ENTRY;
3264
3265         if (!lsm)
3266                 RETURN(-ENODATA);
3267
3268         if (copy_from_user(&lum, lump, sizeof(lum)))
3269                 RETURN(-EFAULT);
3270
3271         if (lum.lmm_magic != LOV_USER_MAGIC)
3272                 RETURN(-EINVAL);
3273
3274         if (lum.lmm_stripe_count > 0) {
3275                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3276                 OBD_ALLOC(lumk, lum_size);
3277                 if (!lumk)
3278                         RETURN(-ENOMEM);
3279
3280                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3281                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3282         } else {
3283                 lum_size = sizeof(lum);
3284                 lumk = &lum;
3285         }
3286
3287         lumk->lmm_object_id = lsm->lsm_object_id;
3288         lumk->lmm_object_gr = lsm->lsm_object_gr;
3289         lumk->lmm_stripe_count = 1;
3290
3291         if (copy_to_user(lump, lumk, lum_size))
3292                 rc = -EFAULT;
3293
3294         if (lumk != &lum)
3295                 OBD_FREE(lumk, lum_size);
3296
3297         RETURN(rc);
3298 }
3299
3300
3301 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3302                          void *karg, void *uarg)
3303 {
3304         struct obd_device *obd = exp->exp_obd;
3305         struct obd_ioctl_data *data = karg;
3306         int err = 0;
3307         ENTRY;
3308
3309         if (!try_module_get(THIS_MODULE)) {
3310                 CERROR("Can't get module. Is it alive?");
3311                 return -EINVAL;
3312         }
3313         switch (cmd) {
3314         case OBD_IOC_LOV_GET_CONFIG: {
3315                 char *buf;
3316                 struct lov_desc *desc;
3317                 struct obd_uuid uuid;
3318
3319                 buf = NULL;
3320                 len = 0;
3321                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3322                         GOTO(out, err = -EINVAL);
3323
3324                 data = (struct obd_ioctl_data *)buf;
3325
3326                 if (sizeof(*desc) > data->ioc_inllen1) {
3327                         obd_ioctl_freedata(buf, len);
3328                         GOTO(out, err = -EINVAL);
3329                 }
3330
3331                 if (data->ioc_inllen2 < sizeof(uuid)) {
3332                         obd_ioctl_freedata(buf, len);
3333                         GOTO(out, err = -EINVAL);
3334                 }
3335
3336                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3337                 desc->ld_tgt_count = 1;
3338                 desc->ld_active_tgt_count = 1;
3339                 desc->ld_default_stripe_count = 1;
3340                 desc->ld_default_stripe_size = 0;
3341                 desc->ld_default_stripe_offset = 0;
3342                 desc->ld_pattern = 0;
3343                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3344
3345                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3346
3347                 err = copy_to_user((void *)uarg, buf, len);
3348                 if (err)
3349                         err = -EFAULT;
3350                 obd_ioctl_freedata(buf, len);
3351                 GOTO(out, err);
3352         }
3353         case LL_IOC_LOV_SETSTRIPE:
3354                 err = obd_alloc_memmd(exp, karg);
3355                 if (err > 0)
3356                         err = 0;
3357                 GOTO(out, err);
3358         case LL_IOC_LOV_GETSTRIPE:
3359                 err = osc_getstripe(karg, uarg);
3360                 GOTO(out, err);
3361         case OBD_IOC_CLIENT_RECOVER:
3362                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3363                                             data->ioc_inlbuf1);
3364                 if (err > 0)
3365                         err = 0;
3366                 GOTO(out, err);
3367         case IOC_OSC_SET_ACTIVE:
3368                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3369                                                data->ioc_offset);
3370                 GOTO(out, err);
3371         case OBD_IOC_POLL_QUOTACHECK:
3372                 err = lquota_poll_check(quota_interface, exp,
3373                                         (struct if_quotacheck *)karg);
3374                 GOTO(out, err);
3375         default:
3376                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3377                        cmd, cfs_curproc_comm());
3378                 GOTO(out, err = -ENOTTY);
3379         }
3380 out:
3381         module_put(THIS_MODULE);
3382         return err;
3383 }
3384
3385 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3386                         void *key, __u32 *vallen, void *val)
3387 {
3388         ENTRY;
3389         if (!vallen || !val)
3390                 RETURN(-EFAULT);
3391
3392         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3393                 __u32 *stripe = val;
3394                 *vallen = sizeof(*stripe);
3395                 *stripe = 0;
3396                 RETURN(0);
3397         } else if (KEY_IS(KEY_LAST_ID)) {
3398                 struct ptlrpc_request *req;
3399                 obd_id                *reply;
3400                 char                  *tmp;
3401                 int                    rc;
3402
3403                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3404                                            &RQF_OST_GET_INFO_LAST_ID);
3405                 if (req == NULL)
3406                         RETURN(-ENOMEM);
3407
3408                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3409                                      RCL_CLIENT, keylen);
3410                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3411                 if (rc) {
3412                         ptlrpc_request_free(req);
3413                         RETURN(rc);
3414                 }
3415
3416                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3417                 memcpy(tmp, key, keylen);
3418
3419                 ptlrpc_request_set_replen(req);
3420                 rc = ptlrpc_queue_wait(req);
3421                 if (rc)
3422                         GOTO(out, rc);
3423
3424                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3425                 if (reply == NULL)
3426                         GOTO(out, rc = -EPROTO);
3427
3428                 *((obd_id *)val) = *reply;
3429         out:
3430                 ptlrpc_req_finished(req);
3431                 RETURN(rc);
3432         }
3433         RETURN(-EINVAL);
3434 }
3435
3436 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3437                                           void *aa, int rc)
3438 {
3439         struct llog_ctxt *ctxt;
3440         struct obd_import *imp = req->rq_import;
3441         ENTRY;
3442
3443         if (rc != 0)
3444                 RETURN(rc);
3445
3446         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3447         if (ctxt) {
3448                 if (rc == 0)
3449                         rc = llog_initiator_connect(ctxt);
3450                 else
3451                         CERROR("cannot establish connection for "
3452                                "ctxt %p: %d\n", ctxt, rc);
3453         }
3454
3455         llog_ctxt_put(ctxt);
3456         spin_lock(&imp->imp_lock);
3457         imp->imp_server_timeout = 1;
3458         imp->imp_pingable = 1;
3459         spin_unlock(&imp->imp_lock);
3460         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3461
3462         RETURN(rc);
3463 }
3464
3465 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3466                               void *key, obd_count vallen, void *val,
3467                               struct ptlrpc_request_set *set)
3468 {
3469         struct ptlrpc_request *req;
3470         struct obd_device     *obd = exp->exp_obd;
3471         struct obd_import     *imp = class_exp2cliimp(exp);
3472         char                  *tmp;
3473         int                    rc;
3474         ENTRY;
3475
3476         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3477
3478         if (KEY_IS(KEY_NEXT_ID)) {
3479                 if (vallen != sizeof(obd_id))
3480                         RETURN(-ERANGE);
3481                 if (val == NULL)
3482                         RETURN(-EINVAL);
3483                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3484                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3485                        exp->exp_obd->obd_name,
3486                        obd->u.cli.cl_oscc.oscc_next_id);
3487
3488                 RETURN(0);
3489         }
3490
3491         if (KEY_IS("unlinked")) {
3492                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3493                 spin_lock(&oscc->oscc_lock);
3494                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3495                 spin_unlock(&oscc->oscc_lock);
3496                 RETURN(0);
3497         }
3498
3499         if (KEY_IS(KEY_INIT_RECOV)) {
3500                 if (vallen != sizeof(int))
3501                         RETURN(-EINVAL);
3502                 spin_lock(&imp->imp_lock);
3503                 imp->imp_initial_recov = *(int *)val;
3504                 spin_unlock(&imp->imp_lock);
3505                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3506                        exp->exp_obd->obd_name,
3507                        imp->imp_initial_recov);
3508                 RETURN(0);
3509         }
3510
3511         if (KEY_IS("checksum")) {
3512                 if (vallen != sizeof(int))
3513                         RETURN(-EINVAL);
3514                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3515                 RETURN(0);
3516         }
3517
3518         if (KEY_IS(KEY_FLUSH_CTX)) {
3519                 sptlrpc_import_flush_my_ctx(imp);
3520                 RETURN(0);
3521         }
3522
3523         if (!set)
3524                 RETURN(-EINVAL);
3525
3526         /* We pass all other commands directly to OST. Since nobody calls osc
3527            methods directly and everybody is supposed to go through LOV, we
3528            assume lov checked invalid values for us.
3529            The only recognised values so far are evict_by_nid and mds_conn.
3530            Even if something bad goes through, we'd get a -EINVAL from OST
3531            anyway. */
3532
3533
3534         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3535         if (req == NULL)
3536                 RETURN(-ENOMEM);
3537
3538         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3539                              RCL_CLIENT, keylen);
3540         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3541                              RCL_CLIENT, vallen);
3542         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3543         if (rc) {
3544                 ptlrpc_request_free(req);
3545                 RETURN(rc);
3546         }
3547
3548         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3549         memcpy(tmp, key, keylen);
3550         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3551         memcpy(tmp, val, vallen);
3552
3553         if (KEY_IS(KEY_MDS_CONN)) {
3554                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3555
3556                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3557                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3558                 LASSERT(oscc->oscc_oa.o_gr > 0);
3559                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3560         }
3561
3562         ptlrpc_request_set_replen(req);
3563         ptlrpc_set_add_req(set, req);
3564         ptlrpc_check_set(set);
3565
3566         RETURN(0);
3567 }
3568
3569
3570 static struct llog_operations osc_size_repl_logops = {
3571         lop_cancel: llog_obd_repl_cancel
3572 };
3573
3574 static struct llog_operations osc_mds_ost_orig_logops;
3575 static int osc_llog_init(struct obd_device *obd, int group,
3576                          struct obd_device *tgt, int count,
3577                          struct llog_catid *catid, struct obd_uuid *uuid)
3578 {
3579         int rc;
3580         ENTRY;
3581         LASSERT(group == OBD_LLOG_GROUP);
3582         spin_lock(&obd->obd_dev_lock);
3583         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3584                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3585                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3586                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3587                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3588                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3589         }
3590         spin_unlock(&obd->obd_dev_lock);
3591
3592         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3593                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3594         if (rc) {
3595                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3596                 GOTO (out, rc);
3597         }
3598
3599         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3600                         NULL, &osc_size_repl_logops);
3601         if (rc)
3602                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3603 out:
3604         if (rc) {
3605                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3606                        obd->obd_name, tgt->obd_name, count, catid, rc);
3607                 CERROR("logid "LPX64":0x%x\n",
3608                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3609         }
3610         RETURN(rc);
3611 }
3612
3613 static int osc_llog_finish(struct obd_device *obd, int count)
3614 {
3615         struct llog_ctxt *ctxt;
3616         int rc = 0, rc2 = 0;
3617         ENTRY;
3618
3619         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3620         if (ctxt)
3621                 rc = llog_cleanup(ctxt);
3622
3623         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3624         if (ctxt)
3625                 rc2 = llog_cleanup(ctxt);
3626         if (!rc)
3627                 rc = rc2;
3628
3629         RETURN(rc);
3630 }
3631
3632 static int osc_reconnect(const struct lu_env *env,
3633                          struct obd_export *exp, struct obd_device *obd,
3634                          struct obd_uuid *cluuid,
3635                          struct obd_connect_data *data)
3636 {
3637         struct client_obd *cli = &obd->u.cli;
3638
3639         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3640                 long lost_grant;
3641
3642                 client_obd_list_lock(&cli->cl_loi_list_lock);
3643                 data->ocd_grant = cli->cl_avail_grant ?:
3644                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3645                 lost_grant = cli->cl_lost_grant;
3646                 cli->cl_lost_grant = 0;
3647                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3648
3649                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3650                        "cl_lost_grant: %ld\n", data->ocd_grant,
3651                        cli->cl_avail_grant, lost_grant);
3652                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3653                        " ocd_grant: %d\n", data->ocd_connect_flags,
3654                        data->ocd_version, data->ocd_grant);
3655         }
3656
3657         RETURN(0);
3658 }
3659
3660 static int osc_disconnect(struct obd_export *exp)
3661 {
3662         struct obd_device *obd = class_exp2obd(exp);
3663         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3664         int rc;
3665
3666         if (obd->u.cli.cl_conn_count == 1)
3667                 /* flush any remaining cancel messages out to the target */
3668                 llog_sync(ctxt, exp);
3669
3670         llog_ctxt_put(ctxt);
3671
3672         rc = client_disconnect_export(exp);
3673         return rc;
3674 }
3675
3676 static int osc_import_event(struct obd_device *obd,
3677                             struct obd_import *imp,
3678                             enum obd_import_event event)
3679 {
3680         struct client_obd *cli;
3681         int rc = 0;
3682
3683         ENTRY;
3684         LASSERT(imp->imp_obd == obd);
3685
3686         switch (event) {
3687         case IMP_EVENT_DISCON: {
3688                 /* Only do this on the MDS OSC's */
3689                 if (imp->imp_server_timeout) {
3690                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3691
3692                         spin_lock(&oscc->oscc_lock);
3693                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3694                         spin_unlock(&oscc->oscc_lock);
3695                 }
3696                 cli = &obd->u.cli;
3697                 client_obd_list_lock(&cli->cl_loi_list_lock);
3698                 cli->cl_avail_grant = 0;
3699                 cli->cl_lost_grant = 0;
3700                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3701                 break;
3702         }
3703         case IMP_EVENT_INACTIVE: {
3704                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3705                 break;
3706         }
3707         case IMP_EVENT_INVALIDATE: {
3708                 struct ldlm_namespace *ns = obd->obd_namespace;
3709
3710                 /* Reset grants */
3711                 cli = &obd->u.cli;
3712                 client_obd_list_lock(&cli->cl_loi_list_lock);
3713                 /* all pages go to failing rpcs due to the invalid import */
3714                 osc_check_rpcs(cli);
3715                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3716
3717                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3718
3719                 break;
3720         }
3721         case IMP_EVENT_ACTIVE: {
3722                 /* Only do this on the MDS OSC's */
3723                 if (imp->imp_server_timeout) {
3724                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3725
3726                         spin_lock(&oscc->oscc_lock);
3727                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3728                         spin_unlock(&oscc->oscc_lock);
3729                 }
3730                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3731                 break;
3732         }
3733         case IMP_EVENT_OCD: {
3734                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3735
3736                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3737                         osc_init_grant(&obd->u.cli, ocd);
3738
3739                 /* See bug 7198 */
3740                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3741                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3742
3743                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3744                 break;
3745         }
3746         default:
3747                 CERROR("Unknown import event %d\n", event);
3748                 LBUG();
3749         }
3750         RETURN(rc);
3751 }
3752
3753 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3754 {
3755         int rc;
3756         ENTRY;
3757
3758         ENTRY;
3759         rc = ptlrpcd_addref();
3760         if (rc)
3761                 RETURN(rc);
3762
3763         rc = client_obd_setup(obd, lcfg);
3764         if (rc) {
3765                 ptlrpcd_decref();
3766         } else {
3767                 struct lprocfs_static_vars lvars = { 0 };
3768                 struct client_obd *cli = &obd->u.cli;
3769
3770                 lprocfs_osc_init_vars(&lvars);
3771                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3772                         lproc_osc_attach_seqstat(obd);
3773                         sptlrpc_lprocfs_cliobd_attach(obd);
3774                         ptlrpc_lprocfs_register_obd(obd);
3775                 }
3776
3777                 oscc_init(obd);
3778                 /* We need to allocate a few requests more, because
3779                    brw_interpret_oap tries to create new requests before freeing
3780                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3781                    reserved, but I afraid that might be too much wasted RAM
3782                    in fact, so 2 is just my guess and still should work. */
3783                 cli->cl_import->imp_rq_pool =
3784                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3785                                             OST_MAXREQSIZE,
3786                                             ptlrpc_add_rqs_to_pool);
3787         }
3788
3789         RETURN(rc);
3790 }
3791
3792 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3793 {
3794         int rc = 0;
3795         ENTRY;
3796
3797         switch (stage) {
3798         case OBD_CLEANUP_EARLY: {
3799                 struct obd_import *imp;
3800                 imp = obd->u.cli.cl_import;
3801                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3802                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3803                 ptlrpc_deactivate_import(imp);
3804                 spin_lock(&imp->imp_lock);
3805                 imp->imp_pingable = 0;
3806                 spin_unlock(&imp->imp_lock);
3807                 break;
3808         }
3809         case OBD_CLEANUP_EXPORTS: {
3810                 /* If we set up but never connected, the
3811                    client import will not have been cleaned. */
3812                 if (obd->u.cli.cl_import) {
3813                         struct obd_import *imp;
3814                         imp = obd->u.cli.cl_import;
3815                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3816                                obd->obd_name);
3817                         ptlrpc_invalidate_import(imp);
3818                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3819                         class_destroy_import(imp);
3820                         obd->u.cli.cl_import = NULL;
3821                 }
3822                 break;
3823         }
3824         case OBD_CLEANUP_SELF_EXP:
3825                 rc = obd_llog_finish(obd, 0);
3826                 if (rc != 0)
3827                         CERROR("failed to cleanup llogging subsystems\n");
3828                 break;
3829         case OBD_CLEANUP_OBD:
3830                 break;
3831         }
3832         RETURN(rc);
3833 }
3834
3835 int osc_cleanup(struct obd_device *obd)
3836 {
3837         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3838         int rc;
3839
3840         ENTRY;
3841         ptlrpc_lprocfs_unregister_obd(obd);
3842         lprocfs_obd_cleanup(obd);
3843
3844         spin_lock(&oscc->oscc_lock);
3845         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3846         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3847         spin_unlock(&oscc->oscc_lock);
3848
3849         /* free memory of osc quota cache */
3850         lquota_cleanup(quota_interface, obd);
3851
3852         rc = client_obd_cleanup(obd);
3853
3854         ptlrpcd_decref();
3855         RETURN(rc);
3856 }
3857
3858 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3859 {
3860         struct lustre_cfg *lcfg = buf;
3861         struct lprocfs_static_vars lvars = { 0 };
3862         int rc = 0;
3863
3864         lprocfs_osc_init_vars(&lvars);
3865
3866         switch (lcfg->lcfg_command) {
3867         case LCFG_SPTLRPC_CONF:
3868                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3869                 break;
3870         default:
3871                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3872                                               lcfg, obd);
3873                 break;
3874         }
3875
3876         return(rc);
3877 }
3878
3879 struct obd_ops osc_obd_ops = {
3880         .o_owner                = THIS_MODULE,
3881         .o_setup                = osc_setup,
3882         .o_precleanup           = osc_precleanup,
3883         .o_cleanup              = osc_cleanup,
3884         .o_add_conn             = client_import_add_conn,
3885         .o_del_conn             = client_import_del_conn,
3886         .o_connect              = client_connect_import,
3887         .o_reconnect            = osc_reconnect,
3888         .o_disconnect           = osc_disconnect,
3889         .o_statfs               = osc_statfs,
3890         .o_statfs_async         = osc_statfs_async,
3891         .o_packmd               = osc_packmd,
3892         .o_unpackmd             = osc_unpackmd,
3893         .o_precreate            = osc_precreate,
3894         .o_create               = osc_create,
3895         .o_destroy              = osc_destroy,
3896         .o_getattr              = osc_getattr,
3897         .o_getattr_async        = osc_getattr_async,
3898         .o_setattr              = osc_setattr,
3899         .o_setattr_async        = osc_setattr_async,
3900         .o_brw                  = osc_brw,
3901         .o_brw_async            = osc_brw_async,
3902         .o_prep_async_page      = osc_prep_async_page,
3903         .o_queue_async_io       = osc_queue_async_io,
3904         .o_set_async_flags      = osc_set_async_flags,
3905         .o_queue_group_io       = osc_queue_group_io,
3906         .o_trigger_group_io     = osc_trigger_group_io,
3907         .o_teardown_async_page  = osc_teardown_async_page,
3908         .o_punch                = osc_punch,
3909         .o_sync                 = osc_sync,
3910         .o_enqueue              = osc_enqueue,
3911         .o_match                = osc_match,
3912         .o_change_cbdata        = osc_change_cbdata,
3913         .o_cancel               = osc_cancel,
3914         .o_cancel_unused        = osc_cancel_unused,
3915         .o_join_lru             = osc_join_lru,
3916         .o_iocontrol            = osc_iocontrol,
3917         .o_get_info             = osc_get_info,
3918         .o_set_info_async       = osc_set_info_async,
3919         .o_import_event         = osc_import_event,
3920         .o_llog_init            = osc_llog_init,
3921         .o_llog_finish          = osc_llog_finish,
3922         .o_process_config       = osc_process_config,
3923 };
3924 int __init osc_init(void)
3925 {
3926         struct lprocfs_static_vars lvars = { 0 };
3927         int rc;
3928         ENTRY;
3929
3930         lprocfs_osc_init_vars(&lvars);
3931
3932         request_module("lquota");
3933         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3934         lquota_init(quota_interface);
3935         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3936
3937         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3938                                  LUSTRE_OSC_NAME, NULL);
3939         if (rc) {
3940                 if (quota_interface)
3941                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3942                 RETURN(rc);
3943         }
3944
3945         RETURN(rc);
3946 }
3947
3948 #ifdef __KERNEL__
3949 static void /*__exit*/ osc_exit(void)
3950 {
3951         lquota_exit(quota_interface);
3952         if (quota_interface)
3953                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3954
3955         class_unregister_type(LUSTRE_OSC_NAME);
3956 }
3957
3958 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3959 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3960 MODULE_LICENSE("GPL");
3961
3962 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3963 #endif