Whamcloud - gitweb
use generic LIST_HEAD macros instead of linux specific.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_cksum.h>
48 #include <obd_ost.h>
49 #include <obd_lov.h>
50
51 #ifdef  __CYGWIN__
52 # include <ctype.h>
53 #endif
54
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include "osc_internal.h"
61
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72         ENTRY;
73
74         lmm_size = sizeof(**lmmp);
75         if (!lmmp)
76                 RETURN(lmm_size);
77
78         if (*lmmp && !lsm) {
79                 OBD_FREE(*lmmp, lmm_size);
80                 *lmmp = NULL;
81                 RETURN(0);
82         }
83
84         if (!*lmmp) {
85                 OBD_ALLOC(*lmmp, lmm_size);
86                 if (!*lmmp)
87                         RETURN(-ENOMEM);
88         }
89
90         if (lsm) {
91                 LASSERT(lsm->lsm_object_id);
92                 LASSERT(lsm->lsm_object_gr);
93                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
94                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
95         }
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         ENTRY;
106
107         if (lmm != NULL) {
108                 if (lmm_bytes < sizeof (*lmm)) {
109                         CERROR("lov_mds_md too small: %d, need %d\n",
110                                lmm_bytes, (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (lmm->lmm_object_id == 0) {
116                         CERROR("lov_mds_md: zero lmm_object_id\n");
117                         RETURN(-EINVAL);
118                 }
119         }
120
121         lsm_size = lov_stripe_md_size(1);
122         if (lsmp == NULL)
123                 RETURN(lsm_size);
124
125         if (*lsmp != NULL && lmm == NULL) {
126                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138                         OBD_FREE(*lsmp, lsm_size);
139                         RETURN(-ENOMEM);
140                 }
141                 loi_init((*lsmp)->lsm_oinfo[0]);
142         }
143
144         if (lmm != NULL) {
145                 /* XXX zero *lsmp? */
146                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
148                 LASSERT((*lsmp)->lsm_object_id);
149                 LASSERT((*lsmp)->lsm_object_gr);
150         }
151
152         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
153
154         RETURN(lsm_size);
155 }
156
157 static inline void osc_pack_capa(struct ptlrpc_request *req,
158                                  struct ost_body *body, void *capa)
159 {
160         struct obd_capa *oc = (struct obd_capa *)capa;
161         struct lustre_capa *c;
162
163         if (!capa)
164                 return;
165
166         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
167         LASSERT(c);
168         capa_cpy(c, oc);
169         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
170         DEBUG_CAPA(D_SEC, c, "pack");
171 }
172
173 static inline void osc_pack_req_body(struct ptlrpc_request *req,
174                                      struct obd_info *oinfo)
175 {
176         struct ost_body *body;
177
178         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
179         LASSERT(body);
180
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, body, oinfo->oi_capa);
183 }
184
185 static inline void osc_set_capa_size(struct ptlrpc_request *req,
186                                      const struct req_msg_field *field,
187                                      struct obd_capa *oc)
188 {
189         if (oc == NULL)
190                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
191         else
192                 /* it is already calculated as sizeof struct obd_capa */
193                 ;
194 }
195
196 static int osc_getattr_interpret(struct ptlrpc_request *req,
197                                  struct osc_async_args *aa, int rc)
198 {
199         struct ost_body *body;
200         ENTRY;
201
202         if (rc != 0)
203                 GOTO(out, rc);
204
205         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
206                                   lustre_swab_ost_body);
207         if (body) {
208                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
209                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
210
211                 /* This should really be sent by the OST */
212                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
213                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
214         } else {
215                 CDEBUG(D_INFO, "can't unpack ost_body\n");
216                 rc = -EPROTO;
217                 aa->aa_oi->oi_oa->o_valid = 0;
218         }
219 out:
220         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
221         RETURN(rc);
222 }
223
224 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
225                              struct ptlrpc_request_set *set)
226 {
227         struct ptlrpc_request *req;
228         struct osc_async_args *aa;
229         int                    rc;
230         ENTRY;
231
232         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
233         if (req == NULL)
234                 RETURN(-ENOMEM);
235
236         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
237         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
238         if (rc) {
239                 ptlrpc_request_free(req);
240                 RETURN(rc);
241         }
242
243         osc_pack_req_body(req, oinfo);
244
245         ptlrpc_request_set_replen(req);
246         req->rq_interpret_reply = osc_getattr_interpret;
247
248         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
249         aa = (struct osc_async_args *)&req->rq_async_args;
250         aa->aa_oi = oinfo;
251
252         ptlrpc_set_add_req(set, req);
253         RETURN(0);
254 }
255
256 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
257 {
258         struct ptlrpc_request *req;
259         struct ost_body       *body;
260         int                    rc;
261         ENTRY;
262
263         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
264         if (req == NULL)
265                 RETURN(-ENOMEM);
266
267         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
269         if (rc) {
270                 ptlrpc_request_free(req);
271                 RETURN(rc);
272         }
273
274         osc_pack_req_body(req, oinfo);
275
276         ptlrpc_request_set_replen(req);
277  
278         rc = ptlrpc_queue_wait(req);
279         if (rc)
280                 GOTO(out, rc);
281
282         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283         if (body == NULL)
284                 GOTO(out, rc = -EPROTO);
285
286         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
287         *oinfo->oi_oa = body->oa;
288
289         /* This should really be sent by the OST */
290         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
291         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
292
293         EXIT;
294  out:
295         ptlrpc_req_finished(req);
296         return rc;
297 }
298
299 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
300                        struct obd_trans_info *oti)
301 {
302         struct ptlrpc_request *req;
303         struct ost_body       *body;
304         int                    rc;
305         ENTRY;
306
307         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
308                                         oinfo->oi_oa->o_gr > 0);
309
310         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
311         if (req == NULL)
312                 RETURN(-ENOMEM);
313
314         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
315         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
316         if (rc) {
317                 ptlrpc_request_free(req);
318                 RETURN(rc);
319         }
320
321         osc_pack_req_body(req, oinfo);
322
323         ptlrpc_request_set_replen(req);
324  
325
326         rc = ptlrpc_queue_wait(req);
327         if (rc)
328                 GOTO(out, rc);
329
330         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
331         if (body == NULL)
332                 GOTO(out, rc = -EPROTO);
333
334         *oinfo->oi_oa = body->oa;
335
336         EXIT;
337 out:
338         ptlrpc_req_finished(req);
339         RETURN(rc);
340 }
341
342 static int osc_setattr_interpret(struct ptlrpc_request *req,
343                                  struct osc_async_args *aa, int rc)
344 {
345         struct ost_body *body;
346         ENTRY;
347
348         if (rc != 0)
349                 GOTO(out, rc);
350
351         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
352         if (body == NULL)
353                 GOTO(out, rc = -EPROTO);
354
355         *aa->aa_oi->oi_oa = body->oa;
356 out:
357         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
358         RETURN(rc);
359 }
360
361 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
362                              struct obd_trans_info *oti,
363                              struct ptlrpc_request_set *rqset)
364 {
365         struct ptlrpc_request *req;
366         struct osc_async_args *aa;
367         int                    rc;
368         ENTRY;
369
370         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
371         if (req == NULL)
372                 RETURN(-ENOMEM);
373
374         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
375         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
376         if (rc) {
377                 ptlrpc_request_free(req);
378                 RETURN(rc);
379         }
380
381         osc_pack_req_body(req, oinfo);
382
383         ptlrpc_request_set_replen(req);
384  
385         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
386                 LASSERT(oti);
387                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
388         }
389
390         /* do mds to ost setattr asynchronouly */
391         if (!rqset) {
392                 /* Do not wait for response. */
393                 ptlrpcd_add_req(req);
394         } else {
395                 req->rq_interpret_reply = osc_setattr_interpret;
396
397                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
398                 aa = (struct osc_async_args *)&req->rq_async_args;
399                 aa->aa_oi = oinfo;
400
401                 ptlrpc_set_add_req(rqset, req);
402         }
403
404         RETURN(0);
405 }
406
407 int osc_real_create(struct obd_export *exp, struct obdo *oa,
408                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
409 {
410         struct ptlrpc_request *req;
411         struct ost_body       *body;
412         struct lov_stripe_md  *lsm;
413         int                    rc;
414         ENTRY;
415
416         LASSERT(oa);
417         LASSERT(ea);
418
419         lsm = *ea;
420         if (!lsm) {
421                 rc = obd_alloc_memmd(exp, &lsm);
422                 if (rc < 0)
423                         RETURN(rc);
424         }
425
426         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
427         if (req == NULL)
428                 GOTO(out, rc = -ENOMEM);
429
430         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
431         if (rc) {
432                 ptlrpc_request_free(req);
433                 GOTO(out, rc);
434         }
435
436         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437         LASSERT(body);
438         body->oa = *oa;
439
440         ptlrpc_request_set_replen(req);
441
442         if (oa->o_valid & OBD_MD_FLINLINE) {
443                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
444                         oa->o_flags == OBD_FL_DELORPHAN);
445                 DEBUG_REQ(D_HA, req,
446                           "delorphan from OST integration");
447                 /* Don't resend the delorphan req */
448                 req->rq_no_resend = req->rq_no_delay = 1;
449         }
450
451         rc = ptlrpc_queue_wait(req);
452         if (rc)
453                 GOTO(out_req, rc);
454
455         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
456         if (body == NULL)
457                 GOTO(out_req, rc = -EPROTO);
458
459         *oa = body->oa;
460
461         /* This should really be sent by the OST */
462         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
463         oa->o_valid |= OBD_MD_FLBLKSZ;
464
465         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
466          * have valid lsm_oinfo data structs, so don't go touching that.
467          * This needs to be fixed in a big way.
468          */
469         lsm->lsm_object_id = oa->o_id;
470         lsm->lsm_object_gr = oa->o_gr;
471         *ea = lsm;
472
473         if (oti != NULL) {
474                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
475
476                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
477                         if (!oti->oti_logcookies)
478                                 oti_alloc_cookies(oti, 1);
479                         *oti->oti_logcookies = *obdo_logcookie(oa);
480                 }
481         }
482
483         CDEBUG(D_HA, "transno: "LPD64"\n",
484                lustre_msg_get_transno(req->rq_repmsg));
485 out_req:
486         ptlrpc_req_finished(req);
487 out:
488         if (rc && !*ea)
489                 obd_free_memmd(exp, &lsm);
490         RETURN(rc);
491 }
492
493 static int osc_punch_interpret(struct ptlrpc_request *req,
494                                struct osc_async_args *aa, int rc)
495 {
496         struct ost_body *body;
497         ENTRY;
498
499         if (rc != 0)
500                 GOTO(out, rc);
501
502         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
503         if (body == NULL)
504                 GOTO(out, rc = -EPROTO);
505
506         *aa->aa_oi->oi_oa = body->oa;
507 out:
508         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
509         RETURN(rc);
510 }
511
512 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
513                      struct obd_trans_info *oti,
514                      struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request *req;
517         struct osc_async_args *aa;
518         struct ost_body       *body;
519         int                    rc;
520         ENTRY;
521
522         if (!oinfo->oi_oa) {
523                 CDEBUG(D_INFO, "oa NULL\n");
524                 RETURN(-EINVAL);
525         }
526
527         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
528         if (req == NULL)
529                 RETURN(-ENOMEM);
530
531         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
532         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
533         if (rc) {
534                 ptlrpc_request_free(req);
535                 RETURN(rc);
536         }
537         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
538         osc_pack_req_body(req, oinfo);
539
540         /* overload the size and blocks fields in the oa with start/end */
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         body->oa.o_size = oinfo->oi_policy.l_extent.start;
544         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
545         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = osc_punch_interpret;
550         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551         aa = (struct osc_async_args *)&req->rq_async_args;
552         aa->aa_oi = oinfo;
553         ptlrpc_set_add_req(rqset, req);
554
555         RETURN(0);
556 }
557
558 static int osc_sync(struct obd_export *exp, struct obdo *oa,
559                     struct lov_stripe_md *md, obd_size start, obd_size end,
560                     void *capa)
561 {
562         struct ptlrpc_request *req;
563         struct ost_body       *body;
564         int                    rc;
565         ENTRY;
566
567         if (!oa) {
568                 CDEBUG(D_INFO, "oa NULL\n");
569                 RETURN(-EINVAL);
570         }
571
572         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
573         if (req == NULL)
574                 RETURN(-ENOMEM);
575
576         osc_set_capa_size(req, &RMF_CAPA1, capa);
577         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
578         if (rc) {
579                 ptlrpc_request_free(req);
580                 RETURN(rc);
581         }
582
583         /* overload the size and blocks fields in the oa with start/end */
584         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
585         LASSERT(body);
586         body->oa = *oa;
587         body->oa.o_size = start;
588         body->oa.o_blocks = end;
589         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
590         osc_pack_capa(req, body, capa);
591
592         ptlrpc_request_set_replen(req);
593
594         rc = ptlrpc_queue_wait(req);
595         if (rc)
596                 GOTO(out, rc);
597
598         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
599         if (body == NULL)
600                 GOTO(out, rc = -EPROTO);
601
602         *oa = body->oa;
603
604         EXIT;
605  out:
606         ptlrpc_req_finished(req);
607         return rc;
608 }
609
610 /* Find and cancel locally locks matched by @mode in the resource found by
611  * @objid. Found locks are added into @cancel list. Returns the amount of
612  * locks added to @cancels list. */
613 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
614                                    struct list_head *cancels, ldlm_mode_t mode,
615                                    int lock_flags)
616 {
617         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
618         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
619         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
620         int count;
621         ENTRY;
622
623         if (res == NULL)
624                 RETURN(0);
625
626         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
627                                            lock_flags, 0, NULL);
628         ldlm_resource_putref(res);
629         RETURN(count);
630 }
631
632 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
633                                  int rc)
634 {
635         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
636
637         atomic_dec(&cli->cl_destroy_in_flight);
638         cfs_waitq_signal(&cli->cl_destroy_waitq);
639         return 0;
640 }
641
642 static int osc_can_send_destroy(struct client_obd *cli)
643 {
644         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
645             cli->cl_max_rpcs_in_flight) {
646                 /* The destroy request can be sent */
647                 return 1;
648         }
649         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
650             cli->cl_max_rpcs_in_flight) {
651                 /*
652                  * The counter has been modified between the two atomic
653                  * operations.
654                  */
655                 cfs_waitq_signal(&cli->cl_destroy_waitq);
656         }
657         return 0;
658 }
659
660 /* Destroy requests can be async always on the client, and we don't even really
661  * care about the return code since the client cannot do anything at all about
662  * a destroy failure.
663  * When the MDS is unlinking a filename, it saves the file objects into a
664  * recovery llog, and these object records are cancelled when the OST reports
665  * they were destroyed and sync'd to disk (i.e. transaction committed).
666  * If the client dies, or the OST is down when the object should be destroyed,
667  * the records are not cancelled, and when the OST reconnects to the MDS next,
668  * it will retrieve the llog unlink logs and then sends the log cancellation
669  * cookies to the MDS after committing destroy transactions. */
670 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
671                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
672                        struct obd_export *md_export)
673 {
674         struct client_obd     *cli = &exp->exp_obd->u.cli;
675         struct ptlrpc_request *req;
676         struct ost_body       *body;
677         CFS_LIST_HEAD(cancels);
678         int rc, count;
679         ENTRY;
680
681         if (!oa) {
682                 CDEBUG(D_INFO, "oa NULL\n");
683                 RETURN(-EINVAL);
684         }
685
686         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
687                                         LDLM_FL_DISCARD_DATA);
688
689         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
690         if (req == NULL) {
691                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
692                 RETURN(-ENOMEM);
693         }
694
695         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
696                                0, &cancels, count);
697         if (rc) {
698                 ptlrpc_request_free(req);
699                 RETURN(rc);
700         }
701
702         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
703         req->rq_interpret_reply = osc_destroy_interpret;
704
705         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
706                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
707                        sizeof(*oti->oti_logcookies));
708         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
709         LASSERT(body);
710         body->oa = *oa;
711
712         ptlrpc_request_set_replen(req);
713
714         if (!osc_can_send_destroy(cli)) {
715                 struct l_wait_info lwi = { 0 };
716
717                 /*
718                  * Wait until the number of on-going destroy RPCs drops
719                  * under max_rpc_in_flight
720                  */
721                 l_wait_event_exclusive(cli->cl_destroy_waitq,
722                                        osc_can_send_destroy(cli), &lwi);
723         }
724
725         /* Do not wait for response */
726         ptlrpcd_add_req(req);
727         RETURN(0);
728 }
729
730 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
731                                 long writing_bytes)
732 {
733         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
734
735         LASSERT(!(oa->o_valid & bits));
736
737         oa->o_valid |= bits;
738         client_obd_list_lock(&cli->cl_loi_list_lock);
739         oa->o_dirty = cli->cl_dirty;
740         if (cli->cl_dirty > cli->cl_dirty_max) {
741                 CERROR("dirty %lu > dirty_max %lu\n",
742                        cli->cl_dirty, cli->cl_dirty_max);
743                 oa->o_undirty = 0;
744         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
745                 CERROR("dirty %d > system dirty_max %d\n",
746                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
747                 oa->o_undirty = 0;
748         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
749                 CERROR("dirty %lu - dirty_max %lu too big???\n",
750                        cli->cl_dirty, cli->cl_dirty_max);
751                 oa->o_undirty = 0;
752         } else {
753                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
754                                 (cli->cl_max_rpcs_in_flight + 1);
755                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
756         }
757         oa->o_grant = cli->cl_avail_grant;
758         oa->o_dropped = cli->cl_lost_grant;
759         cli->cl_lost_grant = 0;
760         client_obd_list_unlock(&cli->cl_loi_list_lock);
761         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
762                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
763 }
764
765 /* caller must hold loi_list_lock */
766 static void osc_consume_write_grant(struct client_obd *cli,
767                                     struct brw_page *pga)
768 {
769         atomic_inc(&obd_dirty_pages);
770         cli->cl_dirty += CFS_PAGE_SIZE;
771         cli->cl_avail_grant -= CFS_PAGE_SIZE;
772         pga->flag |= OBD_BRW_FROM_GRANT;
773         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
774                CFS_PAGE_SIZE, pga, pga->pg);
775         LASSERT(cli->cl_avail_grant >= 0);
776 }
777
778 /* the companion to osc_consume_write_grant, called when a brw has completed.
779  * must be called with the loi lock held. */
780 static void osc_release_write_grant(struct client_obd *cli,
781                                     struct brw_page *pga, int sent)
782 {
783         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
784         ENTRY;
785
786         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
787                 EXIT;
788                 return;
789         }
790
791         pga->flag &= ~OBD_BRW_FROM_GRANT;
792         atomic_dec(&obd_dirty_pages);
793         cli->cl_dirty -= CFS_PAGE_SIZE;
794         if (!sent) {
795                 cli->cl_lost_grant += CFS_PAGE_SIZE;
796                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
797                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
798         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
799                 /* For short writes we shouldn't count parts of pages that
800                  * span a whole block on the OST side, or our accounting goes
801                  * wrong.  Should match the code in filter_grant_check. */
802                 int offset = pga->off & ~CFS_PAGE_MASK;
803                 int count = pga->count + (offset & (blocksize - 1));
804                 int end = (offset + pga->count) & (blocksize - 1);
805                 if (end)
806                         count += blocksize - end;
807
808                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
809                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
810                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
811                        cli->cl_avail_grant, cli->cl_dirty);
812         }
813
814         EXIT;
815 }
816
817 static unsigned long rpcs_in_flight(struct client_obd *cli)
818 {
819         return cli->cl_r_in_flight + cli->cl_w_in_flight;
820 }
821
822 /* caller must hold loi_list_lock */
823 void osc_wake_cache_waiters(struct client_obd *cli)
824 {
825         struct list_head *l, *tmp;
826         struct osc_cache_waiter *ocw;
827
828         ENTRY;
829         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
830                 /* if we can't dirty more, we must wait until some is written */
831                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
832                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
833                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
834                                "osc max %ld, sys max %d\n", cli->cl_dirty,
835                                cli->cl_dirty_max, obd_max_dirty_pages);
836                         return;
837                 }
838
839                 /* if still dirty cache but no grant wait for pending RPCs that
840                  * may yet return us some grant before doing sync writes */
841                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
842                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
843                                cli->cl_w_in_flight);
844                         return;
845                 }
846
847                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
848                 list_del_init(&ocw->ocw_entry);
849                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
850                         /* no more RPCs in flight to return grant, do sync IO */
851                         ocw->ocw_rc = -EDQUOT;
852                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
853                 } else {
854                         osc_consume_write_grant(cli,
855                                                 &ocw->ocw_oap->oap_brw_page);
856                 }
857
858                 cfs_waitq_signal(&ocw->ocw_waitq);
859         }
860
861         EXIT;
862 }
863
864 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
865 {
866         client_obd_list_lock(&cli->cl_loi_list_lock);
867         cli->cl_avail_grant = ocd->ocd_grant;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869
870         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
871                cli->cl_avail_grant, cli->cl_lost_grant);
872         LASSERT(cli->cl_avail_grant >= 0);
873 }
874
875 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
879         if (body->oa.o_valid & OBD_MD_FLGRANT)
880                 cli->cl_avail_grant += body->oa.o_grant;
881         /* waiters are woken in brw_interpret_oap */
882         client_obd_list_unlock(&cli->cl_loi_list_lock);
883 }
884
885 /* We assume that the reason this OSC got a short read is because it read
886  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
887  * via the LOV, and it _knows_ it's reading inside the file, it's just that
888  * this stripe never got written at or beyond this stripe offset yet. */
889 static void handle_short_read(int nob_read, obd_count page_count,
890                               struct brw_page **pga)
891 {
892         char *ptr;
893         int i = 0;
894
895         /* skip bytes read OK */
896         while (nob_read > 0) {
897                 LASSERT (page_count > 0);
898
899                 if (pga[i]->count > nob_read) {
900                         /* EOF inside this page */
901                         ptr = cfs_kmap(pga[i]->pg) +
902                                 (pga[i]->off & ~CFS_PAGE_MASK);
903                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
904                         cfs_kunmap(pga[i]->pg);
905                         page_count--;
906                         i++;
907                         break;
908                 }
909
910                 nob_read -= pga[i]->count;
911                 page_count--;
912                 i++;
913         }
914
915         /* zero remaining pages */
916         while (page_count-- > 0) {
917                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
918                 memset(ptr, 0, pga[i]->count);
919                 cfs_kunmap(pga[i]->pg);
920                 i++;
921         }
922 }
923
924 static int check_write_rcs(struct ptlrpc_request *req,
925                            int requested_nob, int niocount,
926                            obd_count page_count, struct brw_page **pga)
927 {
928         int    *remote_rcs, i;
929
930         /* return error if any niobuf was in error */
931         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
932                                         sizeof(*remote_rcs) * niocount, NULL);
933         if (remote_rcs == NULL) {
934                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
935                 return(-EPROTO);
936         }
937         if (lustre_msg_swabbed(req->rq_repmsg))
938                 for (i = 0; i < niocount; i++)
939                         __swab32s(&remote_rcs[i]);
940
941         for (i = 0; i < niocount; i++) {
942                 if (remote_rcs[i] < 0)
943                         return(remote_rcs[i]);
944
945                 if (remote_rcs[i] != 0) {
946                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
947                                 i, remote_rcs[i], req);
948                         return(-EPROTO);
949                 }
950         }
951
952         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
953                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
954                        requested_nob, req->rq_bulk->bd_nob_transferred);
955                 return(-EPROTO);
956         }
957
958         return (0);
959 }
960
961 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
962 {
963         if (p1->flag != p2->flag) {
964                 unsigned mask = ~OBD_BRW_FROM_GRANT;
965
966                 /* warn if we try to combine flags that we don't know to be
967                  * safe to combine */
968                 if ((p1->flag & mask) != (p2->flag & mask))
969                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
970                                "same brw?\n", p1->flag, p2->flag);
971                 return 0;
972         }
973
974         return (p1->off + p1->count == p2->off);
975 }
976
977 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
978                                    struct brw_page **pga, int opc,
979                                    cksum_type_t cksum_type)
980 {
981         __u32 cksum;
982         int i = 0;
983
984         LASSERT (pg_count > 0);
985         cksum = init_checksum(cksum_type);
986         while (nob > 0 && pg_count > 0) {
987                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
988                 int off = pga[i]->off & ~CFS_PAGE_MASK;
989                 int count = pga[i]->count > nob ? nob : pga[i]->count;
990
991                 /* corrupt the data before we compute the checksum, to
992                  * simulate an OST->client data error */
993                 if (i == 0 && opc == OST_READ &&
994                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
995                         memcpy(ptr + off, "bad1", min(4, nob));
996                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
997                 cfs_kunmap(pga[i]->pg);
998                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
999                                off, cksum);
1000
1001                 nob -= pga[i]->count;
1002                 pg_count--;
1003                 i++;
1004         }
1005         /* For sending we only compute the wrong checksum instead
1006          * of corrupting the data so it is still correct on a redo */
1007         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1008                 cksum++;
1009
1010         return cksum;
1011 }
1012
1013 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1014                                 struct lov_stripe_md *lsm, obd_count page_count,
1015                                 struct brw_page **pga, 
1016                                 struct ptlrpc_request **reqp,
1017                                 struct obd_capa *ocapa)
1018 {
1019         struct ptlrpc_request   *req;
1020         struct ptlrpc_bulk_desc *desc;
1021         struct ost_body         *body;
1022         struct obd_ioobj        *ioobj;
1023         struct niobuf_remote    *niobuf;
1024         int niocount, i, requested_nob, opc, rc;
1025         struct osc_brw_async_args *aa;
1026         struct req_capsule      *pill;
1027
1028         ENTRY;
1029         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1030                 RETURN(-ENOMEM); /* Recoverable */
1031         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1032                 RETURN(-EINVAL); /* Fatal */
1033
1034         if ((cmd & OBD_BRW_WRITE) != 0) {
1035                 opc = OST_WRITE;
1036                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1037                                                 cli->cl_import->imp_rq_pool,
1038                                                 &RQF_OST_BRW);
1039         } else {
1040                 opc = OST_READ;
1041                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1042         }
1043
1044         if (req == NULL)
1045                 RETURN(-ENOMEM);
1046
1047         for (niocount = i = 1; i < page_count; i++) {
1048                 if (!can_merge_pages(pga[i - 1], pga[i]))
1049                         niocount++;
1050         }
1051
1052         pill = &req->rq_pill;
1053         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1054                              niocount * sizeof(*niobuf));
1055         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1056
1057         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1058         if (rc) {
1059                 ptlrpc_request_free(req);
1060                 RETURN(rc);
1061         }
1062         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1063
1064         if (opc == OST_WRITE)
1065                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1066                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1067         else
1068                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1069                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1070
1071         if (desc == NULL)
1072                 GOTO(out, rc = -ENOMEM);
1073         /* NB request now owns desc and will free it when it gets freed */
1074
1075         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1076         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1077         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1078         LASSERT(body && ioobj && niobuf);
1079
1080         body->oa = *oa;
1081
1082         obdo_to_ioobj(oa, ioobj);
1083         ioobj->ioo_bufcnt = niocount;
1084         osc_pack_capa(req, body, ocapa);
1085         LASSERT (page_count > 0);
1086         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1087                 struct brw_page *pg = pga[i];
1088                 struct brw_page *pg_prev = pga[i - 1];
1089
1090                 LASSERT(pg->count > 0);
1091                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1092                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1093                          pg->off, pg->count);
1094 #ifdef __linux__
1095                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1096                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1097                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1098                          i, page_count,
1099                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1100                          pg_prev->pg, page_private(pg_prev->pg),
1101                          pg_prev->pg->index, pg_prev->off);
1102 #else
1103                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1104                          "i %d p_c %u\n", i, page_count);
1105 #endif
1106                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1107                         (pg->flag & OBD_BRW_SRVLOCK));
1108
1109                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1110                                       pg->count);
1111                 requested_nob += pg->count;
1112
1113                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1114                         niobuf--;
1115                         niobuf->len += pg->count;
1116                 } else {
1117                         niobuf->offset = pg->off;
1118                         niobuf->len    = pg->count;
1119                         niobuf->flags  = pg->flag;
1120                 }
1121         }
1122
1123         LASSERT((void *)(niobuf - niocount) ==
1124                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1125                                niocount * sizeof(*niobuf)));
1126         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1127
1128         /* size[REQ_REC_OFF] still sizeof (*body) */
1129         if (opc == OST_WRITE) {
1130                 if (unlikely(cli->cl_checksum) &&
1131                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1132                         /* store cl_cksum_type in a local variable since
1133                          * it can be changed via lprocfs */
1134                         cksum_type_t cksum_type = cli->cl_cksum_type;
1135
1136                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1137                                 oa->o_flags = body->oa.o_flags = 0;
1138                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1139                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1141                                                              page_count, pga,
1142                                                              OST_WRITE,
1143                                                              cksum_type);
1144                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1145                                body->oa.o_cksum);
1146                         /* save this in 'oa', too, for later checking */
1147                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1148                         oa->o_flags |= cksum_type_pack(cksum_type);
1149                 } else {
1150                         /* clear out the checksum flag, in case this is a
1151                          * resend but cl_checksum is no longer set. b=11238 */
1152                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1153                 }
1154                 oa->o_cksum = body->oa.o_cksum;
1155                 /* 1 RC per niobuf */
1156                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1157                                      sizeof(__u32) * niocount);
1158         } else {
1159                 if (unlikely(cli->cl_checksum) &&
1160                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1161                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1162                                 body->oa.o_flags = 0;
1163                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1164                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165                 }
1166                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1167                 /* 1 RC for the whole I/O */
1168         }
1169         ptlrpc_request_set_replen(req);
1170
1171         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1172         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1173         aa->aa_oa = oa;
1174         aa->aa_requested_nob = requested_nob;
1175         aa->aa_nio_count = niocount;
1176         aa->aa_page_count = page_count;
1177         aa->aa_resends = 0;
1178         aa->aa_ppga = pga;
1179         aa->aa_cli = cli;
1180         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1181
1182         *reqp = req;
1183         RETURN(0);
1184
1185  out:
1186         ptlrpc_req_finished(req);
1187         RETURN(rc);
1188 }
1189
1190 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1191                                 __u32 client_cksum, __u32 server_cksum, int nob,
1192                                 obd_count page_count, struct brw_page **pga,
1193                                 cksum_type_t client_cksum_type)
1194 {
1195         __u32 new_cksum;
1196         char *msg;
1197         cksum_type_t cksum_type;
1198
1199         if (server_cksum == client_cksum) {
1200                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1201                 return 0;
1202         }
1203
1204         if (oa->o_valid & OBD_MD_FLFLAGS)
1205                 cksum_type = cksum_type_unpack(oa->o_flags);
1206         else
1207                 cksum_type = OBD_CKSUM_CRC32;
1208
1209         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1210                                       cksum_type);
1211
1212         if (cksum_type != client_cksum_type)
1213                 msg = "the server did not use the checksum type specified in "
1214                       "the original request - likely a protocol problem";
1215         else if (new_cksum == server_cksum)
1216                 msg = "changed on the client after we checksummed it - "
1217                       "likely false positive due to mmap IO (bug 11742)";
1218         else if (new_cksum == client_cksum)
1219                 msg = "changed in transit before arrival at OST";
1220         else
1221                 msg = "changed in transit AND doesn't match the original - "
1222                       "likely false positive due to mmap IO (bug 11742)";
1223
1224         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1225                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1226                            "["LPU64"-"LPU64"]\n",
1227                            msg, libcfs_nid2str(peer->nid),
1228                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1229                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1230                                                         (__u64)0,
1231                            oa->o_id,
1232                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1233                            pga[0]->off,
1234                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1235         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1236                "client csum now %x\n", client_cksum, client_cksum_type,
1237                server_cksum, cksum_type, new_cksum);
1238         return 1;        
1239 }
1240
1241 /* Note rc enters this function as number of bytes transferred */
1242 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1243 {
1244         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1245         const lnet_process_id_t *peer =
1246                         &req->rq_import->imp_connection->c_peer;
1247         struct client_obd *cli = aa->aa_cli;
1248         struct ost_body *body;
1249         __u32 client_cksum = 0;
1250         ENTRY;
1251
1252         if (rc < 0 && rc != -EDQUOT)
1253                 RETURN(rc);
1254
1255         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1256         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1257                                   lustre_swab_ost_body);
1258         if (body == NULL) {
1259                 CDEBUG(D_INFO, "Can't unpack body\n");
1260                 RETURN(-EPROTO);
1261         }
1262
1263         /* set/clear over quota flag for a uid/gid */
1264         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1265             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1266                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1267                              body->oa.o_gid, body->oa.o_valid,
1268                              body->oa.o_flags);
1269
1270         if (rc < 0)
1271                 RETURN(rc);
1272
1273         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1274                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1275
1276         osc_update_grant(cli, body);
1277
1278         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1279                 if (rc > 0) {
1280                         CERROR("Unexpected +ve rc %d\n", rc);
1281                         RETURN(-EPROTO);
1282                 }
1283                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1284
1285                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1286                     check_write_checksum(&body->oa, peer, client_cksum,
1287                                          body->oa.o_cksum, aa->aa_requested_nob,
1288                                          aa->aa_page_count, aa->aa_ppga,
1289                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1290                         RETURN(-EAGAIN);
1291
1292                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1293                         RETURN(-EAGAIN);
1294
1295                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1296                                      aa->aa_page_count, aa->aa_ppga);
1297                 GOTO(out, rc);
1298         }
1299
1300         /* The rest of this function executes only for OST_READs */
1301         if (rc > aa->aa_requested_nob) {
1302                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1303                        aa->aa_requested_nob);
1304                 RETURN(-EPROTO);
1305         }
1306
1307         if (rc != req->rq_bulk->bd_nob_transferred) {
1308                 CERROR ("Unexpected rc %d (%d transferred)\n",
1309                         rc, req->rq_bulk->bd_nob_transferred);
1310                 return (-EPROTO);
1311         }
1312
1313         if (rc < aa->aa_requested_nob)
1314                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1315
1316         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1317                                          aa->aa_ppga))
1318                 GOTO(out, rc = -EAGAIN);
1319
1320         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1321                 static int cksum_counter;
1322                 __u32      server_cksum = body->oa.o_cksum;
1323                 char      *via;
1324                 char      *router;
1325                 cksum_type_t cksum_type;
1326
1327                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1328                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1329                 else
1330                         cksum_type = OBD_CKSUM_CRC32;
1331                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1332                                                  aa->aa_ppga, OST_READ,
1333                                                  cksum_type);
1334
1335                 if (peer->nid == req->rq_bulk->bd_sender) {
1336                         via = router = "";
1337                 } else {
1338                         via = " via ";
1339                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1340                 }
1341
1342                 if (server_cksum == ~0 && rc > 0) {
1343                         CERROR("Protocol error: server %s set the 'checksum' "
1344                                "bit, but didn't send a checksum.  Not fatal, "
1345                                "but please tell CFS.\n",
1346                                libcfs_nid2str(peer->nid));
1347                 } else if (server_cksum != client_cksum) {
1348                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1349                                            "%s%s%s inum "LPU64"/"LPU64" object "
1350                                            LPU64"/"LPU64" extent "
1351                                            "["LPU64"-"LPU64"]\n",
1352                                            req->rq_import->imp_obd->obd_name,
1353                                            libcfs_nid2str(peer->nid),
1354                                            via, router,
1355                                            body->oa.o_valid & OBD_MD_FLFID ?
1356                                                 body->oa.o_fid : (__u64)0,
1357                                            body->oa.o_valid & OBD_MD_FLFID ?
1358                                                 body->oa.o_generation :(__u64)0,
1359                                            body->oa.o_id,
1360                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1361                                                 body->oa.o_gr : (__u64)0,
1362                                            aa->aa_ppga[0]->off,
1363                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1364                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1365                                                                         1);
1366                         CERROR("client %x, server %x, cksum_type %x\n",
1367                                client_cksum, server_cksum, cksum_type);
1368                         cksum_counter = 0;
1369                         aa->aa_oa->o_cksum = client_cksum;
1370                         rc = -EAGAIN;
1371                 } else {
1372                         cksum_counter++;
1373                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1374                         rc = 0;
1375                 }
1376         } else if (unlikely(client_cksum)) {
1377                 static int cksum_missed;
1378
1379                 cksum_missed++;
1380                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1381                         CERROR("Checksum %u requested from %s but not sent\n",
1382                                cksum_missed, libcfs_nid2str(peer->nid));
1383         } else {
1384                 rc = 0;
1385         }
1386 out:
1387         if (rc >= 0)
1388                 *aa->aa_oa = body->oa;
1389
1390         RETURN(rc);
1391 }
1392
1393 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1394                             struct lov_stripe_md *lsm,
1395                             obd_count page_count, struct brw_page **pga,
1396                             struct obd_capa *ocapa)
1397 {
1398         struct ptlrpc_request *req;
1399         int                    rc;
1400         cfs_waitq_t            waitq;
1401         int                    resends = 0;
1402         struct l_wait_info     lwi;
1403
1404         ENTRY;
1405
1406         cfs_waitq_init(&waitq);
1407
1408 restart_bulk:
1409         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1410                                   page_count, pga, &req, ocapa);
1411         if (rc != 0)
1412                 return (rc);
1413
1414         rc = ptlrpc_queue_wait(req);
1415
1416         if (rc == -ETIMEDOUT && req->rq_resend) {
1417                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1418                 ptlrpc_req_finished(req);
1419                 goto restart_bulk;
1420         }
1421
1422         rc = osc_brw_fini_request(req, rc);
1423
1424         ptlrpc_req_finished(req);
1425         if (osc_recoverable_error(rc)) {
1426                 resends++;
1427                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1428                         CERROR("too many resend retries, returning error\n");
1429                         RETURN(-EIO);
1430                 }
1431
1432                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1433                 l_wait_event(waitq, 0, &lwi);
1434
1435                 goto restart_bulk;
1436         }
1437         
1438         RETURN (rc);
1439 }
1440
1441 int osc_brw_redo_request(struct ptlrpc_request *request,
1442                          struct osc_brw_async_args *aa)
1443 {
1444         struct ptlrpc_request *new_req;
1445         struct ptlrpc_request_set *set = request->rq_set;
1446         struct osc_brw_async_args *new_aa;
1447         struct osc_async_page *oap;
1448         int rc = 0;
1449         ENTRY;
1450
1451         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1452                 CERROR("too many resend retries, returning error\n");
1453                 RETURN(-EIO);
1454         }
1455
1456         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1457 /*
1458         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1459         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1460                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1461                                            REQ_REC_OFF + 3);
1462 */
1463         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1464                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1465                                   aa->aa_cli, aa->aa_oa,
1466                                   NULL /* lsm unused by osc currently */,
1467                                   aa->aa_page_count, aa->aa_ppga, 
1468                                   &new_req, NULL /* ocapa */);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1473
1474         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1475                 if (oap->oap_request != NULL) {
1476                         LASSERTF(request == oap->oap_request,
1477                                  "request %p != oap_request %p\n",
1478                                  request, oap->oap_request);
1479                         if (oap->oap_interrupted) {
1480                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1481                                 ptlrpc_req_finished(new_req);
1482                                 RETURN(-EINTR);
1483                         }
1484                 }
1485         }
1486         /* New request takes over pga and oaps from old request.
1487          * Note that copying a list_head doesn't work, need to move it... */
1488         aa->aa_resends++;
1489         new_req->rq_interpret_reply = request->rq_interpret_reply;
1490         new_req->rq_async_args = request->rq_async_args;
1491         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1492
1493         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1494
1495         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1496         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1497         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1498
1499         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1500                 if (oap->oap_request) {
1501                         ptlrpc_req_finished(oap->oap_request);
1502                         oap->oap_request = ptlrpc_request_addref(new_req);
1503                 }
1504         }
1505
1506         /* use ptlrpc_set_add_req is safe because interpret functions work 
1507          * in check_set context. only one way exist with access to request 
1508          * from different thread got -EINTR - this way protected with 
1509          * cl_loi_list_lock */
1510         ptlrpc_set_add_req(set, new_req);
1511
1512         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1513
1514         DEBUG_REQ(D_INFO, new_req, "new request");
1515         RETURN(0);
1516 }
1517
1518 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1519 {
1520         struct osc_brw_async_args *aa = data;
1521         int                        i;
1522         ENTRY;
1523
1524         rc = osc_brw_fini_request(req, rc);
1525         if (osc_recoverable_error(rc)) {
1526                 rc = osc_brw_redo_request(req, aa);
1527                 if (rc == 0)
1528                         RETURN(0);
1529         }
1530
1531         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1532         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1533                 aa->aa_cli->cl_w_in_flight--;
1534         else
1535                 aa->aa_cli->cl_r_in_flight--;
1536         for (i = 0; i < aa->aa_page_count; i++)
1537                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1538         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1539
1540         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1541
1542         RETURN(rc);
1543 }
1544
1545 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1546                           struct lov_stripe_md *lsm, obd_count page_count,
1547                           struct brw_page **pga, struct ptlrpc_request_set *set,
1548                           struct obd_capa *ocapa)
1549 {
1550         struct ptlrpc_request     *req;
1551         struct client_obd         *cli = &exp->exp_obd->u.cli;
1552         int                        rc, i;
1553         struct osc_brw_async_args *aa;
1554         ENTRY;
1555
1556         /* Consume write credits even if doing a sync write -
1557          * otherwise we may run out of space on OST due to grant. */
1558         if (cmd == OBD_BRW_WRITE) {
1559                 spin_lock(&cli->cl_loi_list_lock);
1560                 for (i = 0; i < page_count; i++) {
1561                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1562                                 osc_consume_write_grant(cli, pga[i]);
1563                 }
1564                 spin_unlock(&cli->cl_loi_list_lock);
1565         }
1566
1567         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1568                                   &req, ocapa);
1569
1570         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1571         if (cmd == OBD_BRW_READ) {
1572                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1573                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1574                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1575         } else {
1576                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1577                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1578                                  cli->cl_w_in_flight);
1579                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1580         }
1581
1582         if (rc == 0) {
1583                 req->rq_interpret_reply = brw_interpret;
1584                 ptlrpc_set_add_req(set, req);
1585                 client_obd_list_lock(&cli->cl_loi_list_lock);
1586                 if (cmd == OBD_BRW_READ)
1587                         cli->cl_r_in_flight++;
1588                 else
1589                         cli->cl_w_in_flight++;
1590                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1591         } else if (cmd == OBD_BRW_WRITE) {
1592                 client_obd_list_lock(&cli->cl_loi_list_lock);
1593                 for (i = 0; i < page_count; i++)
1594                         osc_release_write_grant(cli, pga[i], 0);
1595                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1596         }
1597         RETURN (rc);
1598 }
1599
1600 /*
1601  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1602  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1603  * fine for our small page arrays and doesn't require allocation.  its an
1604  * insertion sort that swaps elements that are strides apart, shrinking the
1605  * stride down until its '1' and the array is sorted.
1606  */
1607 static void sort_brw_pages(struct brw_page **array, int num)
1608 {
1609         int stride, i, j;
1610         struct brw_page *tmp;
1611
1612         if (num == 1)
1613                 return;
1614         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1615                 ;
1616
1617         do {
1618                 stride /= 3;
1619                 for (i = stride ; i < num ; i++) {
1620                         tmp = array[i];
1621                         j = i;
1622                         while (j >= stride && array[j - stride]->off > tmp->off) {
1623                                 array[j] = array[j - stride];
1624                                 j -= stride;
1625                         }
1626                         array[j] = tmp;
1627                 }
1628         } while (stride > 1);
1629 }
1630
1631 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1632 {
1633         int count = 1;
1634         int offset;
1635         int i = 0;
1636
1637         LASSERT (pages > 0);
1638         offset = pg[i]->off & ~CFS_PAGE_MASK;
1639
1640         for (;;) {
1641                 pages--;
1642                 if (pages == 0)         /* that's all */
1643                         return count;
1644
1645                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1646                         return count;   /* doesn't end on page boundary */
1647
1648                 i++;
1649                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1650                 if (offset != 0)        /* doesn't start on page boundary */
1651                         return count;
1652
1653                 count++;
1654         }
1655 }
1656
1657 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1658 {
1659         struct brw_page **ppga;
1660         int i;
1661
1662         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1663         if (ppga == NULL)
1664                 return NULL;
1665
1666         for (i = 0; i < count; i++)
1667                 ppga[i] = pga + i;
1668         return ppga;
1669 }
1670
1671 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1672 {
1673         LASSERT(ppga != NULL);
1674         OBD_FREE(ppga, sizeof(*ppga) * count);
1675 }
1676
1677 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1678                    obd_count page_count, struct brw_page *pga,
1679                    struct obd_trans_info *oti)
1680 {
1681         struct obdo *saved_oa = NULL;
1682         struct brw_page **ppga, **orig;
1683         struct obd_import *imp = class_exp2cliimp(exp);
1684         struct client_obd *cli = &imp->imp_obd->u.cli;
1685         int rc, page_count_orig;
1686         ENTRY;
1687
1688         if (cmd & OBD_BRW_CHECK) {
1689                 /* The caller just wants to know if there's a chance that this
1690                  * I/O can succeed */
1691
1692                 if (imp == NULL || imp->imp_invalid)
1693                         RETURN(-EIO);
1694                 RETURN(0);
1695         }
1696
1697         /* test_brw with a failed create can trip this, maybe others. */
1698         LASSERT(cli->cl_max_pages_per_rpc);
1699
1700         rc = 0;
1701
1702         orig = ppga = osc_build_ppga(pga, page_count);
1703         if (ppga == NULL)
1704                 RETURN(-ENOMEM);
1705         page_count_orig = page_count;
1706
1707         sort_brw_pages(ppga, page_count);
1708         while (page_count) {
1709                 obd_count pages_per_brw;
1710
1711                 if (page_count > cli->cl_max_pages_per_rpc)
1712                         pages_per_brw = cli->cl_max_pages_per_rpc;
1713                 else
1714                         pages_per_brw = page_count;
1715
1716                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1717
1718                 if (saved_oa != NULL) {
1719                         /* restore previously saved oa */
1720                         *oinfo->oi_oa = *saved_oa;
1721                 } else if (page_count > pages_per_brw) {
1722                         /* save a copy of oa (brw will clobber it) */
1723                         OBDO_ALLOC(saved_oa);
1724                         if (saved_oa == NULL)
1725                                 GOTO(out, rc = -ENOMEM);
1726                         *saved_oa = *oinfo->oi_oa;
1727                 }
1728
1729                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1730                                       pages_per_brw, ppga, oinfo->oi_capa);
1731
1732                 if (rc != 0)
1733                         break;
1734
1735                 page_count -= pages_per_brw;
1736                 ppga += pages_per_brw;
1737         }
1738
1739 out:
1740         osc_release_ppga(orig, page_count_orig);
1741
1742         if (saved_oa != NULL)
1743                 OBDO_FREE(saved_oa);
1744
1745         RETURN(rc);
1746 }
1747
1748 static int osc_brw_async(int cmd, struct obd_export *exp,
1749                          struct obd_info *oinfo, obd_count page_count,
1750                          struct brw_page *pga, struct obd_trans_info *oti,
1751                          struct ptlrpc_request_set *set)
1752 {
1753         struct brw_page **ppga, **orig;
1754         struct client_obd *cli = &exp->exp_obd->u.cli;
1755         int page_count_orig;
1756         int rc = 0;
1757         ENTRY;
1758
1759         if (cmd & OBD_BRW_CHECK) {
1760                 struct obd_import *imp = class_exp2cliimp(exp);
1761                 /* The caller just wants to know if there's a chance that this
1762                  * I/O can succeed */
1763
1764                 if (imp == NULL || imp->imp_invalid)
1765                         RETURN(-EIO);
1766                 RETURN(0);
1767         }
1768
1769         orig = ppga = osc_build_ppga(pga, page_count);
1770         if (ppga == NULL)
1771                 RETURN(-ENOMEM);
1772         page_count_orig = page_count;
1773
1774         sort_brw_pages(ppga, page_count);
1775         while (page_count) {
1776                 struct brw_page **copy;
1777                 obd_count pages_per_brw;
1778
1779                 pages_per_brw = min_t(obd_count, page_count,
1780                                       cli->cl_max_pages_per_rpc);
1781
1782                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1783
1784                 /* use ppga only if single RPC is going to fly */
1785                 if (pages_per_brw != page_count_orig || ppga != orig) {
1786                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1787                         if (copy == NULL)
1788                                 GOTO(out, rc = -ENOMEM);
1789                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1790                 } else
1791                         copy = ppga;
1792
1793                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1794                                     pages_per_brw, copy, set, oinfo->oi_capa);
1795
1796                 if (rc != 0) {
1797                         if (copy != ppga)
1798                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1799                         break;
1800                 }
1801                 if (copy == orig) {
1802                         /* we passed it to async_internal() which is
1803                          * now responsible for releasing memory */
1804                         orig = NULL;
1805                 }
1806
1807                 page_count -= pages_per_brw;
1808                 ppga += pages_per_brw;
1809         }
1810 out:
1811         if (orig)
1812                 osc_release_ppga(orig, page_count_orig);
1813         RETURN(rc);
1814 }
1815
1816 static void osc_check_rpcs(struct client_obd *cli);
1817
1818 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1819  * the dirty accounting.  Writeback completes or truncate happens before
1820  * writing starts.  Must be called with the loi lock held. */
1821 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1822                            int sent)
1823 {
1824         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1825 }
1826
1827
1828 /* This maintains the lists of pending pages to read/write for a given object
1829  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1830  * to quickly find objects that are ready to send an RPC. */
1831 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1832                          int cmd)
1833 {
1834         int optimal;
1835         ENTRY;
1836
1837         if (lop->lop_num_pending == 0)
1838                 RETURN(0);
1839
1840         /* if we have an invalid import we want to drain the queued pages
1841          * by forcing them through rpcs that immediately fail and complete
1842          * the pages.  recovery relies on this to empty the queued pages
1843          * before canceling the locks and evicting down the llite pages */
1844         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1845                 RETURN(1);
1846
1847         /* stream rpcs in queue order as long as as there is an urgent page
1848          * queued.  this is our cheap solution for good batching in the case
1849          * where writepage marks some random page in the middle of the file
1850          * as urgent because of, say, memory pressure */
1851         if (!list_empty(&lop->lop_urgent)) {
1852                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1853                 RETURN(1);
1854         }
1855         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1856         optimal = cli->cl_max_pages_per_rpc;
1857         if (cmd & OBD_BRW_WRITE) {
1858                 /* trigger a write rpc stream as long as there are dirtiers
1859                  * waiting for space.  as they're waiting, they're not going to
1860                  * create more pages to coallesce with what's waiting.. */
1861                 if (!list_empty(&cli->cl_cache_waiters)) {
1862                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1863                         RETURN(1);
1864                 }
1865                 /* +16 to avoid triggering rpcs that would want to include pages
1866                  * that are being queued but which can't be made ready until
1867                  * the queuer finishes with the page. this is a wart for
1868                  * llite::commit_write() */
1869                 optimal += 16;
1870         }
1871         if (lop->lop_num_pending >= optimal)
1872                 RETURN(1);
1873
1874         RETURN(0);
1875 }
1876
1877 static void on_list(struct list_head *item, struct list_head *list,
1878                     int should_be_on)
1879 {
1880         if (list_empty(item) && should_be_on)
1881                 list_add_tail(item, list);
1882         else if (!list_empty(item) && !should_be_on)
1883                 list_del_init(item);
1884 }
1885
1886 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1887  * can find pages to build into rpcs quickly */
1888 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1889 {
1890         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1891                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1892                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1893
1894         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1895                 loi->loi_write_lop.lop_num_pending);
1896
1897         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1898                 loi->loi_read_lop.lop_num_pending);
1899 }
1900
1901 static void lop_update_pending(struct client_obd *cli,
1902                                struct loi_oap_pages *lop, int cmd, int delta)
1903 {
1904         lop->lop_num_pending += delta;
1905         if (cmd & OBD_BRW_WRITE)
1906                 cli->cl_pending_w_pages += delta;
1907         else
1908                 cli->cl_pending_r_pages += delta;
1909 }
1910
1911 /* this is called when a sync waiter receives an interruption.  Its job is to
1912  * get the caller woken as soon as possible.  If its page hasn't been put in an
1913  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1914  * desiring interruption which will forcefully complete the rpc once the rpc
1915  * has timed out */
1916 static void osc_occ_interrupted(struct oig_callback_context *occ)
1917 {
1918         struct osc_async_page *oap;
1919         struct loi_oap_pages *lop;
1920         struct lov_oinfo *loi;
1921         ENTRY;
1922
1923         /* XXX member_of() */
1924         oap = list_entry(occ, struct osc_async_page, oap_occ);
1925
1926         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1927
1928         oap->oap_interrupted = 1;
1929
1930         /* ok, it's been put in an rpc. only one oap gets a request reference */
1931         if (oap->oap_request != NULL) {
1932                 ptlrpc_mark_interrupted(oap->oap_request);
1933                 ptlrpcd_wake(oap->oap_request);
1934                 GOTO(unlock, 0);
1935         }
1936
1937         /* we don't get interruption callbacks until osc_trigger_group_io()
1938          * has been called and put the sync oaps in the pending/urgent lists.*/
1939         if (!list_empty(&oap->oap_pending_item)) {
1940                 list_del_init(&oap->oap_pending_item);
1941                 list_del_init(&oap->oap_urgent_item);
1942
1943                 loi = oap->oap_loi;
1944                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1945                         &loi->loi_write_lop : &loi->loi_read_lop;
1946                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1947                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1948
1949                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1950                 oap->oap_oig = NULL;
1951         }
1952
1953 unlock:
1954         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1955 }
1956
1957 /* this is trying to propogate async writeback errors back up to the
1958  * application.  As an async write fails we record the error code for later if
1959  * the app does an fsync.  As long as errors persist we force future rpcs to be
1960  * sync so that the app can get a sync error and break the cycle of queueing
1961  * pages for which writeback will fail. */
1962 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1963                            int rc)
1964 {
1965         if (rc) {
1966                 if (!ar->ar_rc)
1967                         ar->ar_rc = rc;
1968
1969                 ar->ar_force_sync = 1;
1970                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1971                 return;
1972
1973         }
1974
1975         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1976                 ar->ar_force_sync = 0;
1977 }
1978
1979 static void osc_oap_to_pending(struct osc_async_page *oap)
1980 {
1981         struct loi_oap_pages *lop;
1982
1983         if (oap->oap_cmd & OBD_BRW_WRITE)
1984                 lop = &oap->oap_loi->loi_write_lop;
1985         else
1986                 lop = &oap->oap_loi->loi_read_lop;
1987
1988         if (oap->oap_async_flags & ASYNC_URGENT)
1989                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1990         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1991         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1992 }
1993
1994 /* this must be called holding the loi list lock to give coverage to exit_cache,
1995  * async_flag maintenance, and oap_request */
1996 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1997                               struct osc_async_page *oap, int sent, int rc)
1998 {
1999         __u64 xid = 0;
2000
2001         ENTRY;
2002         if (oap->oap_request != NULL) {
2003                 xid = ptlrpc_req_xid(oap->oap_request);
2004                 ptlrpc_req_finished(oap->oap_request);
2005                 oap->oap_request = NULL;
2006         }
2007
2008         oap->oap_async_flags = 0;
2009         oap->oap_interrupted = 0;
2010
2011         if (oap->oap_cmd & OBD_BRW_WRITE) {
2012                 osc_process_ar(&cli->cl_ar, xid, rc);
2013                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2014         }
2015
2016         if (rc == 0 && oa != NULL) {
2017                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2018                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2019                 if (oa->o_valid & OBD_MD_FLMTIME)
2020                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2021                 if (oa->o_valid & OBD_MD_FLATIME)
2022                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2023                 if (oa->o_valid & OBD_MD_FLCTIME)
2024                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2025         }
2026
2027         if (oap->oap_oig) {
2028                 osc_exit_cache(cli, oap, sent);
2029                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2030                 oap->oap_oig = NULL;
2031                 EXIT;
2032                 return;
2033         }
2034
2035         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2036                                                 oap->oap_cmd, oa, rc);
2037
2038         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2039          * I/O on the page could start, but OSC calls it under lock
2040          * and thus we can add oap back to pending safely */
2041         if (rc)
2042                 /* upper layer wants to leave the page on pending queue */
2043                 osc_oap_to_pending(oap);
2044         else
2045                 osc_exit_cache(cli, oap, sent);
2046         EXIT;
2047 }
2048
2049 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2050 {
2051         struct osc_async_page *oap, *tmp;
2052         struct osc_brw_async_args *aa = data;
2053         struct client_obd *cli;
2054         ENTRY;
2055
2056         rc = osc_brw_fini_request(req, rc);
2057         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2058         if (osc_recoverable_error(rc)) {
2059                 rc = osc_brw_redo_request(req, aa);
2060                 if (rc == 0)
2061                         RETURN(0);
2062         }
2063
2064         cli = aa->aa_cli;
2065
2066         client_obd_list_lock(&cli->cl_loi_list_lock);
2067
2068         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2069          * is called so we know whether to go to sync BRWs or wait for more
2070          * RPCs to complete */
2071         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2072                 cli->cl_w_in_flight--;
2073         else
2074                 cli->cl_r_in_flight--;
2075
2076         /* the caller may re-use the oap after the completion call so
2077          * we need to clean it up a little */
2078         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2079                 list_del_init(&oap->oap_rpc_item);
2080                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2081         }
2082
2083         osc_wake_cache_waiters(cli);
2084         osc_check_rpcs(cli);
2085
2086         client_obd_list_unlock(&cli->cl_loi_list_lock);
2087
2088         OBDO_FREE(aa->aa_oa);
2089         
2090         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2091         RETURN(rc);
2092 }
2093
2094 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2095                                             struct list_head *rpc_list,
2096                                             int page_count, int cmd)
2097 {
2098         struct ptlrpc_request *req;
2099         struct brw_page **pga = NULL;
2100         struct osc_brw_async_args *aa;
2101         struct obdo *oa = NULL;
2102         struct obd_async_page_ops *ops = NULL;
2103         void *caller_data = NULL;
2104         struct obd_capa *ocapa;
2105         struct osc_async_page *oap;
2106         int i, rc;
2107
2108         ENTRY;
2109         LASSERT(!list_empty(rpc_list));
2110
2111         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2112         if (pga == NULL)
2113                 RETURN(ERR_PTR(-ENOMEM));
2114
2115         OBDO_ALLOC(oa);
2116         if (oa == NULL)
2117                 GOTO(out, req = ERR_PTR(-ENOMEM));
2118
2119         i = 0;
2120         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2121                 if (ops == NULL) {
2122                         ops = oap->oap_caller_ops;
2123                         caller_data = oap->oap_caller_data;
2124                 }
2125                 pga[i] = &oap->oap_brw_page;
2126                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2129                 i++;
2130         }
2131
2132         /* always get the data for the obdo for the rpc */
2133         LASSERT(ops != NULL);
2134         ops->ap_fill_obdo(caller_data, cmd, oa);
2135         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2136
2137         sort_brw_pages(pga, page_count);
2138         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2139                                   pga, &req, ocapa);
2140         capa_put(ocapa);
2141         if (rc != 0) {
2142                 CERROR("prep_req failed: %d\n", rc);
2143                 GOTO(out, req = ERR_PTR(rc));
2144         }
2145
2146         /* Need to update the timestamps after the request is built in case
2147          * we race with setattr (locally or in queue at OST).  If OST gets
2148          * later setattr before earlier BRW (as determined by the request xid),
2149          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2150          * way to do this in a single call.  bug 10150 */
2151         ops->ap_update_obdo(caller_data, cmd, oa,
2152                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2153
2154         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2155         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2156         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2157         list_splice(rpc_list, &aa->aa_oaps);
2158         CFS_INIT_LIST_HEAD(rpc_list);
2159
2160 out:
2161         if (IS_ERR(req)) {
2162                 if (oa)
2163                         OBDO_FREE(oa);
2164                 if (pga)
2165                         OBD_FREE(pga, sizeof(*pga) * page_count);
2166         }
2167         RETURN(req);
2168 }
2169
2170 /* the loi lock is held across this function but it's allowed to release
2171  * and reacquire it during its work */
2172 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2173                             int cmd, struct loi_oap_pages *lop)
2174 {
2175         struct ptlrpc_request *req;
2176         obd_count page_count = 0;
2177         struct osc_async_page *oap = NULL, *tmp;
2178         struct osc_brw_async_args *aa;
2179         struct obd_async_page_ops *ops;
2180         CFS_LIST_HEAD(rpc_list);
2181         unsigned int ending_offset;
2182         unsigned  starting_offset = 0;
2183         ENTRY;
2184
2185         /* first we find the pages we're allowed to work with */
2186         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2187                                  oap_pending_item) {
2188                 ops = oap->oap_caller_ops;
2189
2190                 LASSERT(oap->oap_magic == OAP_MAGIC);
2191
2192                 /* in llite being 'ready' equates to the page being locked
2193                  * until completion unlocks it.  commit_write submits a page
2194                  * as not ready because its unlock will happen unconditionally
2195                  * as the call returns.  if we race with commit_write giving
2196                  * us that page we dont' want to create a hole in the page
2197                  * stream, so we stop and leave the rpc to be fired by
2198                  * another dirtier or kupdated interval (the not ready page
2199                  * will still be on the dirty list).  we could call in
2200                  * at the end of ll_file_write to process the queue again. */
2201                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2202                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2203                         if (rc < 0)
2204                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2205                                                 "instead of ready\n", oap,
2206                                                 oap->oap_page, rc);
2207                         switch (rc) {
2208                         case -EAGAIN:
2209                                 /* llite is telling us that the page is still
2210                                  * in commit_write and that we should try
2211                                  * and put it in an rpc again later.  we
2212                                  * break out of the loop so we don't create
2213                                  * a hole in the sequence of pages in the rpc
2214                                  * stream.*/
2215                                 oap = NULL;
2216                                 break;
2217                         case -EINTR:
2218                                 /* the io isn't needed.. tell the checks
2219                                  * below to complete the rpc with EINTR */
2220                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2221                                 oap->oap_count = -EINTR;
2222                                 break;
2223                         case 0:
2224                                 oap->oap_async_flags |= ASYNC_READY;
2225                                 break;
2226                         default:
2227                                 LASSERTF(0, "oap %p page %p returned %d "
2228                                             "from make_ready\n", oap,
2229                                             oap->oap_page, rc);
2230                                 break;
2231                         }
2232                 }
2233                 if (oap == NULL)
2234                         break;
2235                 /*
2236                  * Page submitted for IO has to be locked. Either by
2237                  * ->ap_make_ready() or by higher layers.
2238                  *
2239                  * XXX nikita: this assertion should be adjusted when lustre
2240                  * starts using PG_writeback for pages being written out.
2241                  */
2242 #if defined(__KERNEL__) && defined(__linux__)
2243                 LASSERT(PageLocked(oap->oap_page));
2244 #endif
2245                 /* If there is a gap at the start of this page, it can't merge
2246                  * with any previous page, so we'll hand the network a
2247                  * "fragmented" page array that it can't transfer in 1 RDMA */
2248                 if (page_count != 0 && oap->oap_page_off != 0)
2249                         break;
2250
2251                 /* take the page out of our book-keeping */
2252                 list_del_init(&oap->oap_pending_item);
2253                 lop_update_pending(cli, lop, cmd, -1);
2254                 list_del_init(&oap->oap_urgent_item);
2255
2256                 if (page_count == 0)
2257                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2258                                           (PTLRPC_MAX_BRW_SIZE - 1);
2259
2260                 /* ask the caller for the size of the io as the rpc leaves. */
2261                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2262                         oap->oap_count =
2263                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2264                 if (oap->oap_count <= 0) {
2265                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2266                                oap->oap_count);
2267                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2268                         continue;
2269                 }
2270
2271                 /* now put the page back in our accounting */
2272                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2273                 if (++page_count >= cli->cl_max_pages_per_rpc)
2274                         break;
2275
2276                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2277                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2278                  * have the same alignment as the initial writes that allocated
2279                  * extents on the server. */
2280                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2281                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2282                 if (ending_offset == 0)
2283                         break;
2284
2285                 /* If there is a gap at the end of this page, it can't merge
2286                  * with any subsequent pages, so we'll hand the network a
2287                  * "fragmented" page array that it can't transfer in 1 RDMA */
2288                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2289                         break;
2290         }
2291
2292         osc_wake_cache_waiters(cli);
2293
2294         if (page_count == 0)
2295                 RETURN(0);
2296
2297         loi_list_maint(cli, loi);
2298
2299         client_obd_list_unlock(&cli->cl_loi_list_lock);
2300
2301         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2302         if (IS_ERR(req)) {
2303                 /* this should happen rarely and is pretty bad, it makes the
2304                  * pending list not follow the dirty order */
2305                 client_obd_list_lock(&cli->cl_loi_list_lock);
2306                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2307                         list_del_init(&oap->oap_rpc_item);
2308
2309                         /* queued sync pages can be torn down while the pages
2310                          * were between the pending list and the rpc */
2311                         if (oap->oap_interrupted) {
2312                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2313                                 osc_ap_completion(cli, NULL, oap, 0,
2314                                                   oap->oap_count);
2315                                 continue;
2316                         }
2317                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2318                 }
2319                 loi_list_maint(cli, loi);
2320                 RETURN(PTR_ERR(req));
2321         }
2322
2323         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2324
2325         if (cmd == OBD_BRW_READ) {
2326                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2330                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2331         } else {
2332                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2333                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2334                                  cli->cl_w_in_flight);
2335                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2336                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2337                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2338         }
2339
2340         client_obd_list_lock(&cli->cl_loi_list_lock);
2341
2342         if (cmd == OBD_BRW_READ)
2343                 cli->cl_r_in_flight++;
2344         else
2345                 cli->cl_w_in_flight++;
2346
2347         /* queued sync pages can be torn down while the pages
2348          * were between the pending list and the rpc */
2349         tmp = NULL;
2350         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2351                 /* only one oap gets a request reference */
2352                 if (tmp == NULL)
2353                         tmp = oap;
2354                 if (oap->oap_interrupted && !req->rq_intr) {
2355                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2356                                oap, req);
2357                         ptlrpc_mark_interrupted(req);
2358                 }
2359         }
2360         if (tmp != NULL)
2361                 tmp->oap_request = ptlrpc_request_addref(req);
2362
2363         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2364                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2365
2366         req->rq_interpret_reply = brw_interpret_oap;
2367         ptlrpcd_add_req(req);
2368         RETURN(1);
2369 }
2370
2371 #define LOI_DEBUG(LOI, STR, args...)                                     \
2372         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2373                !list_empty(&(LOI)->loi_cli_item),                        \
2374                (LOI)->loi_write_lop.lop_num_pending,                     \
2375                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2376                (LOI)->loi_read_lop.lop_num_pending,                      \
2377                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2378                args)                                                     \
2379
2380 /* This is called by osc_check_rpcs() to find which objects have pages that
2381  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2382 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2383 {
2384         ENTRY;
2385         /* first return all objects which we already know to have
2386          * pages ready to be stuffed into rpcs */
2387         if (!list_empty(&cli->cl_loi_ready_list))
2388                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2389                                   struct lov_oinfo, loi_cli_item));
2390
2391         /* then if we have cache waiters, return all objects with queued
2392          * writes.  This is especially important when many small files
2393          * have filled up the cache and not been fired into rpcs because
2394          * they don't pass the nr_pending/object threshhold */
2395         if (!list_empty(&cli->cl_cache_waiters) &&
2396             !list_empty(&cli->cl_loi_write_list))
2397                 RETURN(list_entry(cli->cl_loi_write_list.next,
2398                                   struct lov_oinfo, loi_write_item));
2399
2400         /* then return all queued objects when we have an invalid import
2401          * so that they get flushed */
2402         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2403                 if (!list_empty(&cli->cl_loi_write_list))
2404                         RETURN(list_entry(cli->cl_loi_write_list.next,
2405                                           struct lov_oinfo, loi_write_item));
2406                 if (!list_empty(&cli->cl_loi_read_list))
2407                         RETURN(list_entry(cli->cl_loi_read_list.next,
2408                                           struct lov_oinfo, loi_read_item));
2409         }
2410         RETURN(NULL);
2411 }
2412
2413 /* called with the loi list lock held */
2414 static void osc_check_rpcs(struct client_obd *cli)
2415 {
2416         struct lov_oinfo *loi;
2417         int rc = 0, race_counter = 0;
2418         ENTRY;
2419
2420         while ((loi = osc_next_loi(cli)) != NULL) {
2421                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2422
2423                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2424                         break;
2425
2426                 /* attempt some read/write balancing by alternating between
2427                  * reads and writes in an object.  The makes_rpc checks here
2428                  * would be redundant if we were getting read/write work items
2429                  * instead of objects.  we don't want send_oap_rpc to drain a
2430                  * partial read pending queue when we're given this object to
2431                  * do io on writes while there are cache waiters */
2432                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2433                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2434                                               &loi->loi_write_lop);
2435                         if (rc < 0)
2436                                 break;
2437                         if (rc > 0)
2438                                 race_counter = 0;
2439                         else
2440                                 race_counter++;
2441                 }
2442                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2443                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2444                                               &loi->loi_read_lop);
2445                         if (rc < 0)
2446                                 break;
2447                         if (rc > 0)
2448                                 race_counter = 0;
2449                         else
2450                                 race_counter++;
2451                 }
2452
2453                 /* attempt some inter-object balancing by issueing rpcs
2454                  * for each object in turn */
2455                 if (!list_empty(&loi->loi_cli_item))
2456                         list_del_init(&loi->loi_cli_item);
2457                 if (!list_empty(&loi->loi_write_item))
2458                         list_del_init(&loi->loi_write_item);
2459                 if (!list_empty(&loi->loi_read_item))
2460                         list_del_init(&loi->loi_read_item);
2461
2462                 loi_list_maint(cli, loi);
2463
2464                 /* send_oap_rpc fails with 0 when make_ready tells it to
2465                  * back off.  llite's make_ready does this when it tries
2466                  * to lock a page queued for write that is already locked.
2467                  * we want to try sending rpcs from many objects, but we
2468                  * don't want to spin failing with 0.  */
2469                 if (race_counter == 10)
2470                         break;
2471         }
2472         EXIT;
2473 }
2474
2475 /* we're trying to queue a page in the osc so we're subject to the
2476  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2477  * If the osc's queued pages are already at that limit, then we want to sleep
2478  * until there is space in the osc's queue for us.  We also may be waiting for
2479  * write credits from the OST if there are RPCs in flight that may return some
2480  * before we fall back to sync writes.
2481  *
2482  * We need this know our allocation was granted in the presence of signals */
2483 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2484 {
2485         int rc;
2486         ENTRY;
2487         client_obd_list_lock(&cli->cl_loi_list_lock);
2488         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2489         client_obd_list_unlock(&cli->cl_loi_list_lock);
2490         RETURN(rc);
2491 };
2492
2493 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2494  * grant or cache space. */
2495 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2496                            struct osc_async_page *oap)
2497 {
2498         struct osc_cache_waiter ocw;
2499         struct l_wait_info lwi = { 0 };
2500
2501         ENTRY;
2502
2503         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2504                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2505                cli->cl_dirty_max, obd_max_dirty_pages,
2506                cli->cl_lost_grant, cli->cl_avail_grant);
2507
2508         /* force the caller to try sync io.  this can jump the list
2509          * of queued writes and create a discontiguous rpc stream */
2510         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2511             loi->loi_ar.ar_force_sync)
2512                 RETURN(-EDQUOT);
2513
2514         /* Hopefully normal case - cache space and write credits available */
2515         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2516             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2517             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2518                 /* account for ourselves */
2519                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2520                 RETURN(0);
2521         }
2522
2523         /* Make sure that there are write rpcs in flight to wait for.  This
2524          * is a little silly as this object may not have any pending but
2525          * other objects sure might. */
2526         if (cli->cl_w_in_flight) {
2527                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2528                 cfs_waitq_init(&ocw.ocw_waitq);
2529                 ocw.ocw_oap = oap;
2530                 ocw.ocw_rc = 0;
2531
2532                 loi_list_maint(cli, loi);
2533                 osc_check_rpcs(cli);
2534                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2535
2536                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2537                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2538
2539                 client_obd_list_lock(&cli->cl_loi_list_lock);
2540                 if (!list_empty(&ocw.ocw_entry)) {
2541                         list_del(&ocw.ocw_entry);
2542                         RETURN(-EINTR);
2543                 }
2544                 RETURN(ocw.ocw_rc);
2545         }
2546
2547         RETURN(-EDQUOT);
2548 }
2549
2550 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2551                         struct lov_oinfo *loi, cfs_page_t *page,
2552                         obd_off offset, struct obd_async_page_ops *ops,
2553                         void *data, void **res)
2554 {
2555         struct osc_async_page *oap;
2556         ENTRY;
2557
2558         if (!page)
2559                 return size_round(sizeof(*oap));
2560
2561         oap = *res;
2562         oap->oap_magic = OAP_MAGIC;
2563         oap->oap_cli = &exp->exp_obd->u.cli;
2564         oap->oap_loi = loi;
2565
2566         oap->oap_caller_ops = ops;
2567         oap->oap_caller_data = data;
2568
2569         oap->oap_page = page;
2570         oap->oap_obj_off = offset;
2571
2572         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2573         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2574         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2575
2576         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2577
2578         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2579         RETURN(0);
2580 }
2581
2582 struct osc_async_page *oap_from_cookie(void *cookie)
2583 {
2584         struct osc_async_page *oap = cookie;
2585         if (oap->oap_magic != OAP_MAGIC)
2586                 return ERR_PTR(-EINVAL);
2587         return oap;
2588 };
2589
2590 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2591                               struct lov_oinfo *loi, void *cookie,
2592                               int cmd, obd_off off, int count,
2593                               obd_flag brw_flags, enum async_flags async_flags)
2594 {
2595         struct client_obd *cli = &exp->exp_obd->u.cli;
2596         struct osc_async_page *oap;
2597         int rc = 0;
2598         ENTRY;
2599
2600         oap = oap_from_cookie(cookie);
2601         if (IS_ERR(oap))
2602                 RETURN(PTR_ERR(oap));
2603
2604         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2605                 RETURN(-EIO);
2606
2607         if (!list_empty(&oap->oap_pending_item) ||
2608             !list_empty(&oap->oap_urgent_item) ||
2609             !list_empty(&oap->oap_rpc_item))
2610                 RETURN(-EBUSY);
2611
2612         /* check if the file's owner/group is over quota */
2613 #ifdef HAVE_QUOTA_SUPPORT
2614         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2615                 struct obd_async_page_ops *ops;
2616                 struct obdo *oa;
2617
2618                 OBDO_ALLOC(oa);
2619                 if (oa == NULL)
2620                         RETURN(-ENOMEM);
2621
2622                 ops = oap->oap_caller_ops;
2623                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2624                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2625                     NO_QUOTA)
2626                         rc = -EDQUOT;
2627
2628                 OBDO_FREE(oa);
2629                 if (rc)
2630                         RETURN(rc);
2631         }
2632 #endif
2633
2634         if (loi == NULL)
2635                 loi = lsm->lsm_oinfo[0];
2636
2637         client_obd_list_lock(&cli->cl_loi_list_lock);
2638
2639         oap->oap_cmd = cmd;
2640         oap->oap_page_off = off;
2641         oap->oap_count = count;
2642         oap->oap_brw_flags = brw_flags;
2643         oap->oap_async_flags = async_flags;
2644
2645         if (cmd & OBD_BRW_WRITE) {
2646                 rc = osc_enter_cache(cli, loi, oap);
2647                 if (rc) {
2648                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2649                         RETURN(rc);
2650                 }
2651         }
2652
2653         osc_oap_to_pending(oap);
2654         loi_list_maint(cli, loi);
2655
2656         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2657                   cmd);
2658
2659         osc_check_rpcs(cli);
2660         client_obd_list_unlock(&cli->cl_loi_list_lock);
2661
2662         RETURN(0);
2663 }
2664
2665 /* aka (~was & now & flag), but this is more clear :) */
2666 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2667
2668 static int osc_set_async_flags(struct obd_export *exp,
2669                                struct lov_stripe_md *lsm,
2670                                struct lov_oinfo *loi, void *cookie,
2671                                obd_flag async_flags)
2672 {
2673         struct client_obd *cli = &exp->exp_obd->u.cli;
2674         struct loi_oap_pages *lop;
2675         struct osc_async_page *oap;
2676         int rc = 0;
2677         ENTRY;
2678
2679         oap = oap_from_cookie(cookie);
2680         if (IS_ERR(oap))
2681                 RETURN(PTR_ERR(oap));
2682
2683         /*
2684          * bug 7311: OST-side locking is only supported for liblustre for now
2685          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2686          * implementation has to handle case where OST-locked page was picked
2687          * up by, e.g., ->writepage().
2688          */
2689         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2690         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2691                                      * tread here. */
2692
2693         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2694                 RETURN(-EIO);
2695
2696         if (loi == NULL)
2697                 loi = lsm->lsm_oinfo[0];
2698
2699         if (oap->oap_cmd & OBD_BRW_WRITE) {
2700                 lop = &loi->loi_write_lop;
2701         } else {
2702                 lop = &loi->loi_read_lop;
2703         }
2704
2705         client_obd_list_lock(&cli->cl_loi_list_lock);
2706
2707         if (list_empty(&oap->oap_pending_item))
2708                 GOTO(out, rc = -EINVAL);
2709
2710         if ((oap->oap_async_flags & async_flags) == async_flags)
2711                 GOTO(out, rc = 0);
2712
2713         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2714                 oap->oap_async_flags |= ASYNC_READY;
2715
2716         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2717                 if (list_empty(&oap->oap_rpc_item)) {
2718                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2719                         loi_list_maint(cli, loi);
2720                 }
2721         }
2722
2723         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2724                         oap->oap_async_flags);
2725 out:
2726         osc_check_rpcs(cli);
2727         client_obd_list_unlock(&cli->cl_loi_list_lock);
2728         RETURN(rc);
2729 }
2730
2731 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2732                              struct lov_oinfo *loi,
2733                              struct obd_io_group *oig, void *cookie,
2734                              int cmd, obd_off off, int count,
2735                              obd_flag brw_flags,
2736                              obd_flag async_flags)
2737 {
2738         struct client_obd *cli = &exp->exp_obd->u.cli;
2739         struct osc_async_page *oap;
2740         struct loi_oap_pages *lop;
2741         int rc = 0;
2742         ENTRY;
2743
2744         oap = oap_from_cookie(cookie);
2745         if (IS_ERR(oap))
2746                 RETURN(PTR_ERR(oap));
2747
2748         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2749                 RETURN(-EIO);
2750
2751         if (!list_empty(&oap->oap_pending_item) ||
2752             !list_empty(&oap->oap_urgent_item) ||
2753             !list_empty(&oap->oap_rpc_item))
2754                 RETURN(-EBUSY);
2755
2756         if (loi == NULL)
2757                 loi = lsm->lsm_oinfo[0];
2758
2759         client_obd_list_lock(&cli->cl_loi_list_lock);
2760
2761         oap->oap_cmd = cmd;
2762         oap->oap_page_off = off;
2763         oap->oap_count = count;
2764         oap->oap_brw_flags = brw_flags;
2765         oap->oap_async_flags = async_flags;
2766
2767         if (cmd & OBD_BRW_WRITE)
2768                 lop = &loi->loi_write_lop;
2769         else
2770                 lop = &loi->loi_read_lop;
2771
2772         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2773         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2774                 oap->oap_oig = oig;
2775                 rc = oig_add_one(oig, &oap->oap_occ);
2776         }
2777
2778         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2779                   oap, oap->oap_page, rc);
2780
2781         client_obd_list_unlock(&cli->cl_loi_list_lock);
2782
2783         RETURN(rc);
2784 }
2785
2786 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2787                                  struct loi_oap_pages *lop, int cmd)
2788 {
2789         struct list_head *pos, *tmp;
2790         struct osc_async_page *oap;
2791
2792         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2793                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2794                 list_del(&oap->oap_pending_item);
2795                 osc_oap_to_pending(oap);
2796         }
2797         loi_list_maint(cli, loi);
2798 }
2799
2800 static int osc_trigger_group_io(struct obd_export *exp,
2801                                 struct lov_stripe_md *lsm,
2802                                 struct lov_oinfo *loi,
2803                                 struct obd_io_group *oig)
2804 {
2805         struct client_obd *cli = &exp->exp_obd->u.cli;
2806         ENTRY;
2807
2808         if (loi == NULL)
2809                 loi = lsm->lsm_oinfo[0];
2810
2811         client_obd_list_lock(&cli->cl_loi_list_lock);
2812
2813         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2814         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2815
2816         osc_check_rpcs(cli);
2817         client_obd_list_unlock(&cli->cl_loi_list_lock);
2818
2819         RETURN(0);
2820 }
2821
2822 static int osc_teardown_async_page(struct obd_export *exp,
2823                                    struct lov_stripe_md *lsm,
2824                                    struct lov_oinfo *loi, void *cookie)
2825 {
2826         struct client_obd *cli = &exp->exp_obd->u.cli;
2827         struct loi_oap_pages *lop;
2828         struct osc_async_page *oap;
2829         int rc = 0;
2830         ENTRY;
2831
2832         oap = oap_from_cookie(cookie);
2833         if (IS_ERR(oap))
2834                 RETURN(PTR_ERR(oap));
2835
2836         if (loi == NULL)
2837                 loi = lsm->lsm_oinfo[0];
2838
2839         if (oap->oap_cmd & OBD_BRW_WRITE) {
2840                 lop = &loi->loi_write_lop;
2841         } else {
2842                 lop = &loi->loi_read_lop;
2843         }
2844
2845         client_obd_list_lock(&cli->cl_loi_list_lock);
2846
2847         if (!list_empty(&oap->oap_rpc_item))
2848                 GOTO(out, rc = -EBUSY);
2849
2850         osc_exit_cache(cli, oap, 0);
2851         osc_wake_cache_waiters(cli);
2852
2853         if (!list_empty(&oap->oap_urgent_item)) {
2854                 list_del_init(&oap->oap_urgent_item);
2855                 oap->oap_async_flags &= ~ASYNC_URGENT;
2856         }
2857         if (!list_empty(&oap->oap_pending_item)) {
2858                 list_del_init(&oap->oap_pending_item);
2859                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2860         }
2861         loi_list_maint(cli, loi);
2862
2863         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2864 out:
2865         client_obd_list_unlock(&cli->cl_loi_list_lock);
2866         RETURN(rc);
2867 }
2868
2869 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2870                                     int flags)
2871 {
2872         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2873
2874         if (lock == NULL) {
2875                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2876                 return;
2877         }
2878         lock_res_and_lock(lock);
2879 #if defined (__KERNEL__) && defined (__linux__)
2880         /* Liang XXX: Darwin and Winnt checking should be added */
2881         if (lock->l_ast_data && lock->l_ast_data != data) {
2882                 struct inode *new_inode = data;
2883                 struct inode *old_inode = lock->l_ast_data;
2884                 if (!(old_inode->i_state & I_FREEING))
2885                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2886                 LASSERTF(old_inode->i_state & I_FREEING,
2887                          "Found existing inode %p/%lu/%u state %lu in lock: "
2888                          "setting data to %p/%lu/%u\n", old_inode,
2889                          old_inode->i_ino, old_inode->i_generation,
2890                          old_inode->i_state,
2891                          new_inode, new_inode->i_ino, new_inode->i_generation);
2892         }
2893 #endif
2894         lock->l_ast_data = data;
2895         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2896         unlock_res_and_lock(lock);
2897         LDLM_LOCK_PUT(lock);
2898 }
2899
2900 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2901                              ldlm_iterator_t replace, void *data)
2902 {
2903         struct ldlm_res_id res_id = { .name = {0} };
2904         struct obd_device *obd = class_exp2obd(exp);
2905
2906         res_id.name[0] = lsm->lsm_object_id;
2907         res_id.name[2] = lsm->lsm_object_gr;
2908
2909         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2910         return 0;
2911 }
2912
2913 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2914                             int intent, int rc)
2915 {
2916         ENTRY;
2917
2918         if (intent) {
2919                 /* The request was created before ldlm_cli_enqueue call. */
2920                 if (rc == ELDLM_LOCK_ABORTED) {
2921                         struct ldlm_reply *rep;
2922                         rep = req_capsule_server_get(&req->rq_pill,
2923                                                      &RMF_DLM_REP);
2924
2925                         LASSERT(rep != NULL);
2926                         if (rep->lock_policy_res1)
2927                                 rc = rep->lock_policy_res1;
2928                 }
2929         }
2930
2931         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2932                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2933                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2934                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2935                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2936         }
2937
2938         /* Call the update callback. */
2939         rc = oinfo->oi_cb_up(oinfo, rc);
2940         RETURN(rc);
2941 }
2942
2943 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2944                                  struct osc_enqueue_args *aa, int rc)
2945 {
2946         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2947         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2948         struct ldlm_lock *lock;
2949
2950         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2951          * be valid. */
2952         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2953
2954         /* Complete obtaining the lock procedure. */
2955         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2956                                    aa->oa_ei->ei_mode,
2957                                    &aa->oa_oi->oi_flags,
2958                                    &lsm->lsm_oinfo[0]->loi_lvb,
2959                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2960                                    lustre_swab_ost_lvb,
2961                                    aa->oa_oi->oi_lockh, rc);
2962
2963         /* Complete osc stuff. */
2964         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2965
2966         /* Release the lock for async request. */
2967         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2968                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2969
2970         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2971                  aa->oa_oi->oi_lockh, req, aa);
2972         LDLM_LOCK_PUT(lock);
2973         return rc;
2974 }
2975
2976 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2977  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2978  * other synchronous requests, however keeping some locks and trying to obtain
2979  * others may take a considerable amount of time in a case of ost failure; and
2980  * when other sync requests do not get released lock from a client, the client
2981  * is excluded from the cluster -- such scenarious make the life difficult, so
2982  * release locks just after they are obtained. */
2983 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2984                        struct ldlm_enqueue_info *einfo,
2985                        struct ptlrpc_request_set *rqset)
2986 {
2987         struct ldlm_res_id res_id = { .name = {0} };
2988         struct obd_device *obd = exp->exp_obd;
2989         struct ptlrpc_request *req = NULL;
2990         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2991         ldlm_mode_t mode;
2992         int rc;
2993         ENTRY;
2994
2995         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2996         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2997
2998         /* Filesystem lock extents are extended to page boundaries so that
2999          * dealing with the page cache is a little smoother.  */
3000         oinfo->oi_policy.l_extent.start -=
3001                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3002         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3003
3004         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3005                 goto no_match;
3006
3007         /* Next, search for already existing extent locks that will cover us */
3008         /* If we're trying to read, we also search for an existing PW lock.  The
3009          * VFS and page cache already protect us locally, so lots of readers/
3010          * writers can share a single PW lock.
3011          *
3012          * There are problems with conversion deadlocks, so instead of
3013          * converting a read lock to a write lock, we'll just enqueue a new
3014          * one.
3015          *
3016          * At some point we should cancel the read lock instead of making them
3017          * send us a blocking callback, but there are problems with canceling
3018          * locks out from other users right now, too. */
3019         mode = einfo->ei_mode;
3020         if (einfo->ei_mode == LCK_PR)
3021                 mode |= LCK_PW;
3022         mode = ldlm_lock_match(obd->obd_namespace,
3023                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3024                                einfo->ei_type, &oinfo->oi_policy, mode,
3025                                oinfo->oi_lockh);
3026         if (mode) {
3027                 /* addref the lock only if not async requests and PW lock is
3028                  * matched whereas we asked for PR. */
3029                 if (!rqset && einfo->ei_mode != mode)
3030                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3031                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3032                                         oinfo->oi_flags);
3033                 if (intent) {
3034                         /* I would like to be able to ASSERT here that rss <=
3035                          * kms, but I can't, for reasons which are explained in
3036                          * lov_enqueue() */
3037                 }
3038
3039                 /* We already have a lock, and it's referenced */
3040                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3041
3042                 /* For async requests, decref the lock. */
3043                 if (einfo->ei_mode != mode)
3044                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3045                 else if (rqset)
3046                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3047
3048                 RETURN(ELDLM_OK);
3049         }
3050
3051  no_match:
3052         if (intent) {
3053                 CFS_LIST_HEAD(cancels);
3054                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3055                                            &RQF_LDLM_ENQUEUE_LVB);
3056                 if (req == NULL)
3057                         RETURN(-ENOMEM);
3058
3059                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3060                 if (rc)
3061                         RETURN(rc);
3062
3063                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3064                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3065                 ptlrpc_request_set_replen(req);
3066         }
3067
3068         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3069         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3070
3071         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3072                               &oinfo->oi_policy, &oinfo->oi_flags,
3073                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3074                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3075                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3076                               rqset ? 1 : 0);
3077         if (rqset) {
3078                 if (!rc) {
3079                         struct osc_enqueue_args *aa;
3080                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3081                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3082                         aa->oa_oi = oinfo;
3083                         aa->oa_ei = einfo;
3084                         aa->oa_exp = exp;
3085
3086                         req->rq_interpret_reply = osc_enqueue_interpret;
3087                         ptlrpc_set_add_req(rqset, req);
3088                 } else if (intent) {
3089                         ptlrpc_req_finished(req);
3090                 }
3091                 RETURN(rc);
3092         }
3093
3094         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3095         if (intent)
3096                 ptlrpc_req_finished(req);
3097
3098         RETURN(rc);
3099 }
3100
3101 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3102                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3103                      int *flags, void *data, struct lustre_handle *lockh)
3104 {
3105         struct ldlm_res_id res_id = { .name = {0} };
3106         struct obd_device *obd = exp->exp_obd;
3107         int lflags = *flags;
3108         ldlm_mode_t rc;
3109         ENTRY;
3110
3111         res_id.name[0] = lsm->lsm_object_id;
3112         res_id.name[2] = lsm->lsm_object_gr;
3113
3114         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3115                 RETURN(-EIO);
3116
3117         /* Filesystem lock extents are extended to page boundaries so that
3118          * dealing with the page cache is a little smoother */
3119         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3120         policy->l_extent.end |= ~CFS_PAGE_MASK;
3121
3122         /* Next, search for already existing extent locks that will cover us */
3123         /* If we're trying to read, we also search for an existing PW lock.  The
3124          * VFS and page cache already protect us locally, so lots of readers/
3125          * writers can share a single PW lock. */
3126         rc = mode;
3127         if (mode == LCK_PR)
3128                 rc |= LCK_PW;
3129         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3130                              &res_id, type, policy, rc, lockh);
3131         if (rc) {
3132                 osc_set_data_with_check(lockh, data, lflags);
3133                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3134                         ldlm_lock_addref(lockh, LCK_PR);
3135                         ldlm_lock_decref(lockh, LCK_PW);
3136                 }
3137                 RETURN(rc);
3138         }
3139         RETURN(rc);
3140 }
3141
3142 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3143                       __u32 mode, struct lustre_handle *lockh)
3144 {
3145         ENTRY;
3146
3147         if (unlikely(mode == LCK_GROUP))
3148                 ldlm_lock_decref_and_cancel(lockh, mode);
3149         else
3150                 ldlm_lock_decref(lockh, mode);
3151
3152         RETURN(0);
3153 }
3154
3155 static int osc_cancel_unused(struct obd_export *exp,
3156                              struct lov_stripe_md *lsm, int flags,
3157                              void *opaque)
3158 {
3159         struct obd_device *obd = class_exp2obd(exp);
3160         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3161
3162         if (lsm != NULL) {
3163                 res_id.name[0] = lsm->lsm_object_id;
3164                 res_id.name[2] = lsm->lsm_object_gr;
3165                 resp = &res_id;
3166         }
3167
3168         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3169 }
3170
3171 static int osc_join_lru(struct obd_export *exp,
3172                         struct lov_stripe_md *lsm, int join)
3173 {
3174         struct obd_device *obd = class_exp2obd(exp);
3175         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3176
3177         if (lsm != NULL) {
3178                 res_id.name[0] = lsm->lsm_object_id;
3179                 res_id.name[2] = lsm->lsm_object_gr;
3180                 resp = &res_id;
3181         }
3182
3183         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3184 }
3185
3186 static int osc_statfs_interpret(struct ptlrpc_request *req,
3187                                 struct osc_async_args *aa, int rc)
3188 {
3189         struct obd_statfs *msfs;
3190         ENTRY;
3191
3192         if (rc != 0)
3193                 GOTO(out, rc);
3194
3195         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3196         if (msfs == NULL) {
3197                 GOTO(out, rc = -EPROTO);
3198         }
3199
3200         *aa->aa_oi->oi_osfs = *msfs;
3201 out:
3202         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3203         RETURN(rc);
3204 }
3205
3206 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3207                             __u64 max_age, struct ptlrpc_request_set *rqset)
3208 {
3209         struct ptlrpc_request *req;
3210         struct osc_async_args *aa;
3211         int                    rc;
3212         ENTRY;
3213
3214         /* We could possibly pass max_age in the request (as an absolute
3215          * timestamp or a "seconds.usec ago") so the target can avoid doing
3216          * extra calls into the filesystem if that isn't necessary (e.g.
3217          * during mount that would help a bit).  Having relative timestamps
3218          * is not so great if request processing is slow, while absolute
3219          * timestamps are not ideal because they need time synchronization. */
3220         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3221         if (req == NULL)
3222                 RETURN(-ENOMEM);
3223
3224         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3225         if (rc) {
3226                 ptlrpc_request_free(req);
3227                 RETURN(rc);
3228         }
3229         ptlrpc_request_set_replen(req);
3230         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3231         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3232                 /* procfs requests not want stat in wait for avoid deadlock */
3233                 req->rq_no_resend = 1;
3234                 req->rq_no_delay = 1;
3235         }
3236
3237         req->rq_interpret_reply = osc_statfs_interpret;
3238         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3239         aa = (struct osc_async_args *)&req->rq_async_args;
3240         aa->aa_oi = oinfo;
3241
3242         ptlrpc_set_add_req(rqset, req);
3243         RETURN(0);
3244 }
3245
3246 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3247                       __u64 max_age, __u32 flags)
3248 {
3249         struct obd_statfs     *msfs;
3250         struct ptlrpc_request *req;
3251         int rc;
3252         ENTRY;
3253
3254         /* We could possibly pass max_age in the request (as an absolute
3255          * timestamp or a "seconds.usec ago") so the target can avoid doing
3256          * extra calls into the filesystem if that isn't necessary (e.g.
3257          * during mount that would help a bit).  Having relative timestamps
3258          * is not so great if request processing is slow, while absolute
3259          * timestamps are not ideal because they need time synchronization. */
3260         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3261         if (req == NULL)
3262                 RETURN(-ENOMEM);
3263
3264         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3265         if (rc) {
3266                 ptlrpc_request_free(req);
3267                 RETURN(rc);
3268         }
3269         ptlrpc_request_set_replen(req);
3270         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3271
3272         if (flags & OBD_STATFS_NODELAY) {
3273                 /* procfs requests not want stat in wait for avoid deadlock */
3274                 req->rq_no_resend = 1;
3275                 req->rq_no_delay = 1;
3276         }
3277
3278         rc = ptlrpc_queue_wait(req);
3279         if (rc)
3280                 GOTO(out, rc);
3281
3282         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3283         if (msfs == NULL) {
3284                 GOTO(out, rc = -EPROTO);
3285         }
3286
3287         *osfs = *msfs;
3288
3289         EXIT;
3290  out:
3291         ptlrpc_req_finished(req);
3292         return rc;
3293 }
3294
3295 /* Retrieve object striping information.
3296  *
3297  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3298  * the maximum number of OST indices which will fit in the user buffer.
3299  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3300  */
3301 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3302 {
3303         struct lov_user_md lum, *lumk;
3304         int rc = 0, lum_size;
3305         ENTRY;
3306
3307         if (!lsm)
3308                 RETURN(-ENODATA);
3309
3310         if (copy_from_user(&lum, lump, sizeof(lum)))
3311                 RETURN(-EFAULT);
3312
3313         if (lum.lmm_magic != LOV_USER_MAGIC)
3314                 RETURN(-EINVAL);
3315
3316         if (lum.lmm_stripe_count > 0) {
3317                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3318                 OBD_ALLOC(lumk, lum_size);
3319                 if (!lumk)
3320                         RETURN(-ENOMEM);
3321
3322                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3323                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3324         } else {
3325                 lum_size = sizeof(lum);
3326                 lumk = &lum;
3327         }
3328
3329         lumk->lmm_object_id = lsm->lsm_object_id;
3330         lumk->lmm_object_gr = lsm->lsm_object_gr;
3331         lumk->lmm_stripe_count = 1;
3332
3333         if (copy_to_user(lump, lumk, lum_size))
3334                 rc = -EFAULT;
3335
3336         if (lumk != &lum)
3337                 OBD_FREE(lumk, lum_size);
3338
3339         RETURN(rc);
3340 }
3341
3342
3343 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3344                          void *karg, void *uarg)
3345 {
3346         struct obd_device *obd = exp->exp_obd;
3347         struct obd_ioctl_data *data = karg;
3348         int err = 0;
3349         ENTRY;
3350
3351         if (!try_module_get(THIS_MODULE)) {
3352                 CERROR("Can't get module. Is it alive?");
3353                 return -EINVAL;
3354         }
3355         switch (cmd) {
3356         case OBD_IOC_LOV_GET_CONFIG: {
3357                 char *buf;
3358                 struct lov_desc *desc;
3359                 struct obd_uuid uuid;
3360
3361                 buf = NULL;
3362                 len = 0;
3363                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3364                         GOTO(out, err = -EINVAL);
3365
3366                 data = (struct obd_ioctl_data *)buf;
3367
3368                 if (sizeof(*desc) > data->ioc_inllen1) {
3369                         obd_ioctl_freedata(buf, len);
3370                         GOTO(out, err = -EINVAL);
3371                 }
3372
3373                 if (data->ioc_inllen2 < sizeof(uuid)) {
3374                         obd_ioctl_freedata(buf, len);
3375                         GOTO(out, err = -EINVAL);
3376                 }
3377
3378                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3379                 desc->ld_tgt_count = 1;
3380                 desc->ld_active_tgt_count = 1;
3381                 desc->ld_default_stripe_count = 1;
3382                 desc->ld_default_stripe_size = 0;
3383                 desc->ld_default_stripe_offset = 0;
3384                 desc->ld_pattern = 0;
3385                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3386
3387                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3388
3389                 err = copy_to_user((void *)uarg, buf, len);
3390                 if (err)
3391                         err = -EFAULT;
3392                 obd_ioctl_freedata(buf, len);
3393                 GOTO(out, err);
3394         }
3395         case LL_IOC_LOV_SETSTRIPE:
3396                 err = obd_alloc_memmd(exp, karg);
3397                 if (err > 0)
3398                         err = 0;
3399                 GOTO(out, err);
3400         case LL_IOC_LOV_GETSTRIPE:
3401                 err = osc_getstripe(karg, uarg);
3402                 GOTO(out, err);
3403         case OBD_IOC_CLIENT_RECOVER:
3404                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3405                                             data->ioc_inlbuf1);
3406                 if (err > 0)
3407                         err = 0;
3408                 GOTO(out, err);
3409         case IOC_OSC_SET_ACTIVE:
3410                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3411                                                data->ioc_offset);
3412                 GOTO(out, err);
3413         case OBD_IOC_POLL_QUOTACHECK:
3414                 err = lquota_poll_check(quota_interface, exp,
3415                                         (struct if_quotacheck *)karg);
3416                 GOTO(out, err);
3417         default:
3418                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3419                        cmd, cfs_curproc_comm());
3420                 GOTO(out, err = -ENOTTY);
3421         }
3422 out:
3423         module_put(THIS_MODULE);
3424         return err;
3425 }
3426
3427 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3428                         void *key, __u32 *vallen, void *val)
3429 {
3430         ENTRY;
3431         if (!vallen || !val)
3432                 RETURN(-EFAULT);
3433
3434         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3435                 __u32 *stripe = val;
3436                 *vallen = sizeof(*stripe);
3437                 *stripe = 0;
3438                 RETURN(0);
3439         } else if (KEY_IS(KEY_LAST_ID)) {
3440                 struct ptlrpc_request *req;
3441                 obd_id                *reply;
3442                 char                  *tmp;
3443                 int                    rc;
3444
3445                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3446                                            &RQF_OST_GET_INFO_LAST_ID);
3447                 if (req == NULL)
3448                         RETURN(-ENOMEM);
3449
3450                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3451                                      RCL_CLIENT, keylen);
3452                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3453                 if (rc) {
3454                         ptlrpc_request_free(req);
3455                         RETURN(rc);
3456                 }
3457
3458                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3459                 memcpy(tmp, key, keylen);
3460
3461                 ptlrpc_request_set_replen(req);
3462                 rc = ptlrpc_queue_wait(req);
3463                 if (rc)
3464                         GOTO(out, rc);
3465
3466                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3467                 if (reply == NULL)
3468                         GOTO(out, rc = -EPROTO);
3469
3470                 *((obd_id *)val) = *reply;
3471         out:
3472                 ptlrpc_req_finished(req);
3473                 RETURN(rc);
3474         }
3475         RETURN(-EINVAL);
3476 }
3477
3478 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3479                                           void *aa, int rc)
3480 {
3481         struct llog_ctxt *ctxt;
3482         struct obd_import *imp = req->rq_import;
3483         ENTRY;
3484
3485         if (rc != 0)
3486                 RETURN(rc);
3487
3488         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3489         if (ctxt) {
3490                 if (rc == 0)
3491                         rc = llog_initiator_connect(ctxt);
3492                 else
3493                         CERROR("cannot establish connection for "
3494                                "ctxt %p: %d\n", ctxt, rc);
3495         }
3496
3497         llog_ctxt_put(ctxt);
3498         spin_lock(&imp->imp_lock);
3499         imp->imp_server_timeout = 1;
3500         imp->imp_pingable = 1;
3501         spin_unlock(&imp->imp_lock);
3502         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3503
3504         RETURN(rc);
3505 }
3506
3507 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3508                               void *key, obd_count vallen, void *val,
3509                               struct ptlrpc_request_set *set)
3510 {
3511         struct ptlrpc_request *req;
3512         struct obd_device     *obd = exp->exp_obd;
3513         struct obd_import     *imp = class_exp2cliimp(exp);
3514         char                  *tmp;
3515         int                    rc;
3516         ENTRY;
3517
3518         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3519
3520         if (KEY_IS(KEY_NEXT_ID)) {
3521                 if (vallen != sizeof(obd_id))
3522                         RETURN(-ERANGE);
3523                 if (val == NULL)
3524                         RETURN(-EINVAL);
3525                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3526                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3527                        exp->exp_obd->obd_name,
3528                        obd->u.cli.cl_oscc.oscc_next_id);
3529
3530                 RETURN(0);
3531         }
3532
3533         if (KEY_IS("unlinked")) {
3534                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3535                 spin_lock(&oscc->oscc_lock);
3536                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3537                 spin_unlock(&oscc->oscc_lock);
3538                 RETURN(0);
3539         }
3540
3541         if (KEY_IS(KEY_INIT_RECOV)) {
3542                 if (vallen != sizeof(int))
3543                         RETURN(-EINVAL);
3544                 spin_lock(&imp->imp_lock);
3545                 imp->imp_initial_recov = *(int *)val;
3546                 spin_unlock(&imp->imp_lock);
3547                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3548                        exp->exp_obd->obd_name,
3549                        imp->imp_initial_recov);
3550                 RETURN(0);
3551         }
3552
3553         if (KEY_IS("checksum")) {
3554                 if (vallen != sizeof(int))
3555                         RETURN(-EINVAL);
3556                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3557                 RETURN(0);
3558         }
3559
3560         if (KEY_IS(KEY_FLUSH_CTX)) {
3561                 sptlrpc_import_flush_my_ctx(imp);
3562                 RETURN(0);
3563         }
3564
3565         if (!set)
3566                 RETURN(-EINVAL);
3567
3568         /* We pass all other commands directly to OST. Since nobody calls osc
3569            methods directly and everybody is supposed to go through LOV, we
3570            assume lov checked invalid values for us.
3571            The only recognised values so far are evict_by_nid and mds_conn.
3572            Even if something bad goes through, we'd get a -EINVAL from OST
3573            anyway. */
3574
3575
3576         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3577         if (req == NULL)
3578                 RETURN(-ENOMEM);
3579
3580         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3581                              RCL_CLIENT, keylen);
3582         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3583                              RCL_CLIENT, vallen);
3584         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3585         if (rc) {
3586                 ptlrpc_request_free(req);
3587                 RETURN(rc);
3588         }
3589
3590         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3591         memcpy(tmp, key, keylen);
3592         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3593         memcpy(tmp, val, vallen);
3594
3595         if (KEY_IS(KEY_MDS_CONN)) {
3596                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3597
3598                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3599                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3600                 LASSERT(oscc->oscc_oa.o_gr > 0);
3601                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3602         }
3603
3604         ptlrpc_request_set_replen(req);
3605         ptlrpc_set_add_req(set, req);
3606         ptlrpc_check_set(set);
3607
3608         RETURN(0);
3609 }
3610
3611
3612 static struct llog_operations osc_size_repl_logops = {
3613         lop_cancel: llog_obd_repl_cancel
3614 };
3615
3616 static struct llog_operations osc_mds_ost_orig_logops;
3617 static int osc_llog_init(struct obd_device *obd, int group,
3618                          struct obd_device *tgt, int count,
3619                          struct llog_catid *catid, struct obd_uuid *uuid)
3620 {
3621         int rc;
3622         ENTRY;
3623         LASSERT(group == OBD_LLOG_GROUP);
3624         spin_lock(&obd->obd_dev_lock);
3625         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3626                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3627                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3628                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3629                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3630                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3631         }
3632         spin_unlock(&obd->obd_dev_lock);
3633
3634         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3635                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3636         if (rc) {
3637                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3638                 GOTO (out, rc);
3639         }
3640
3641         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3642                         NULL, &osc_size_repl_logops);
3643         if (rc)
3644                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3645 out:
3646         if (rc) {
3647                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3648                        obd->obd_name, tgt->obd_name, count, catid, rc);
3649                 CERROR("logid "LPX64":0x%x\n",
3650                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3651         }
3652         RETURN(rc);
3653 }
3654
3655 static int osc_llog_finish(struct obd_device *obd, int count)
3656 {
3657         struct llog_ctxt *ctxt;
3658         int rc = 0, rc2 = 0;
3659         ENTRY;
3660
3661         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3662         if (ctxt)
3663                 rc = llog_cleanup(ctxt);
3664
3665         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3666         if (ctxt)
3667                 rc2 = llog_cleanup(ctxt);
3668         if (!rc)
3669                 rc = rc2;
3670
3671         RETURN(rc);
3672 }
3673
3674 static int osc_reconnect(const struct lu_env *env,
3675                          struct obd_export *exp, struct obd_device *obd,
3676                          struct obd_uuid *cluuid,
3677                          struct obd_connect_data *data)
3678 {
3679         struct client_obd *cli = &obd->u.cli;
3680
3681         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3682                 long lost_grant;
3683
3684                 client_obd_list_lock(&cli->cl_loi_list_lock);
3685                 data->ocd_grant = cli->cl_avail_grant ?:
3686                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3687                 lost_grant = cli->cl_lost_grant;
3688                 cli->cl_lost_grant = 0;
3689                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3690
3691                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3692                        "cl_lost_grant: %ld\n", data->ocd_grant,
3693                        cli->cl_avail_grant, lost_grant);
3694                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3695                        " ocd_grant: %d\n", data->ocd_connect_flags,
3696                        data->ocd_version, data->ocd_grant);
3697         }
3698
3699         RETURN(0);
3700 }
3701
3702 static int osc_disconnect(struct obd_export *exp)
3703 {
3704         struct obd_device *obd = class_exp2obd(exp);
3705         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3706         int rc;
3707
3708         if (obd->u.cli.cl_conn_count == 1)
3709                 /* flush any remaining cancel messages out to the target */
3710                 llog_sync(ctxt, exp);
3711
3712         llog_ctxt_put(ctxt);
3713
3714         rc = client_disconnect_export(exp);
3715         return rc;
3716 }
3717
3718 static int osc_import_event(struct obd_device *obd,
3719                             struct obd_import *imp,
3720                             enum obd_import_event event)
3721 {
3722         struct client_obd *cli;
3723         int rc = 0;
3724
3725         ENTRY;
3726         LASSERT(imp->imp_obd == obd);
3727
3728         switch (event) {
3729         case IMP_EVENT_DISCON: {
3730                 /* Only do this on the MDS OSC's */
3731                 if (imp->imp_server_timeout) {
3732                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3733
3734                         spin_lock(&oscc->oscc_lock);
3735                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3736                         spin_unlock(&oscc->oscc_lock);
3737                 }
3738                 cli = &obd->u.cli;
3739                 client_obd_list_lock(&cli->cl_loi_list_lock);
3740                 cli->cl_avail_grant = 0;
3741                 cli->cl_lost_grant = 0;
3742                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3743                 break;
3744         }
3745         case IMP_EVENT_INACTIVE: {
3746                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3747                 break;
3748         }
3749         case IMP_EVENT_INVALIDATE: {
3750                 struct ldlm_namespace *ns = obd->obd_namespace;
3751
3752                 /* Reset grants */
3753                 cli = &obd->u.cli;
3754                 client_obd_list_lock(&cli->cl_loi_list_lock);
3755                 /* all pages go to failing rpcs due to the invalid import */
3756                 osc_check_rpcs(cli);
3757                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3758
3759                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3760
3761                 break;
3762         }
3763         case IMP_EVENT_ACTIVE: {
3764                 /* Only do this on the MDS OSC's */
3765                 if (imp->imp_server_timeout) {
3766                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3767
3768                         spin_lock(&oscc->oscc_lock);
3769                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3770                         spin_unlock(&oscc->oscc_lock);
3771                 }
3772                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3773                 break;
3774         }
3775         case IMP_EVENT_OCD: {
3776                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3777
3778                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3779                         osc_init_grant(&obd->u.cli, ocd);
3780
3781                 /* See bug 7198 */
3782                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3783                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3784
3785                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3786                 break;
3787         }
3788         default:
3789                 CERROR("Unknown import event %d\n", event);
3790                 LBUG();
3791         }
3792         RETURN(rc);
3793 }
3794
3795 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3796 {
3797         int rc;
3798         ENTRY;
3799
3800         ENTRY;
3801         rc = ptlrpcd_addref();
3802         if (rc)
3803                 RETURN(rc);
3804
3805         rc = client_obd_setup(obd, lcfg);
3806         if (rc) {
3807                 ptlrpcd_decref();
3808         } else {
3809                 struct lprocfs_static_vars lvars = { 0 };
3810                 struct client_obd *cli = &obd->u.cli;
3811
3812                 lprocfs_osc_init_vars(&lvars);
3813                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3814                         lproc_osc_attach_seqstat(obd);
3815                         sptlrpc_lprocfs_cliobd_attach(obd);
3816                         ptlrpc_lprocfs_register_obd(obd);
3817                 }
3818
3819                 oscc_init(obd);
3820                 /* We need to allocate a few requests more, because
3821                    brw_interpret_oap tries to create new requests before freeing
3822                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3823                    reserved, but I afraid that might be too much wasted RAM
3824                    in fact, so 2 is just my guess and still should work. */
3825                 cli->cl_import->imp_rq_pool =
3826                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3827                                             OST_MAXREQSIZE,
3828                                             ptlrpc_add_rqs_to_pool);
3829         }
3830
3831         RETURN(rc);
3832 }
3833
3834 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3835 {
3836         int rc = 0;
3837         ENTRY;
3838
3839         switch (stage) {
3840         case OBD_CLEANUP_EARLY: {
3841                 struct obd_import *imp;
3842                 imp = obd->u.cli.cl_import;
3843                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3844                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3845                 ptlrpc_deactivate_import(imp);
3846                 spin_lock(&imp->imp_lock);
3847                 imp->imp_pingable = 0;
3848                 spin_unlock(&imp->imp_lock);
3849                 break;
3850         }
3851         case OBD_CLEANUP_EXPORTS: {
3852                 /* If we set up but never connected, the
3853                    client import will not have been cleaned. */
3854                 if (obd->u.cli.cl_import) {
3855                         struct obd_import *imp;
3856                         imp = obd->u.cli.cl_import;
3857                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3858                                obd->obd_name);
3859                         ptlrpc_invalidate_import(imp);
3860                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3861                         class_destroy_import(imp);
3862                         obd->u.cli.cl_import = NULL;
3863                 }
3864                 break;
3865         }
3866         case OBD_CLEANUP_SELF_EXP:
3867                 rc = obd_llog_finish(obd, 0);
3868                 if (rc != 0)
3869                         CERROR("failed to cleanup llogging subsystems\n");
3870                 break;
3871         case OBD_CLEANUP_OBD:
3872                 break;
3873         }
3874         RETURN(rc);
3875 }
3876
3877 int osc_cleanup(struct obd_device *obd)
3878 {
3879         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3880         int rc;
3881
3882         ENTRY;
3883         ptlrpc_lprocfs_unregister_obd(obd);
3884         lprocfs_obd_cleanup(obd);
3885
3886         spin_lock(&oscc->oscc_lock);
3887         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3888         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3889         spin_unlock(&oscc->oscc_lock);
3890
3891         /* free memory of osc quota cache */
3892         lquota_cleanup(quota_interface, obd);
3893
3894         rc = client_obd_cleanup(obd);
3895
3896         ptlrpcd_decref();
3897         RETURN(rc);
3898 }
3899
3900 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3901 {
3902         struct lustre_cfg *lcfg = buf;
3903         struct lprocfs_static_vars lvars = { 0 };
3904         int rc = 0;
3905
3906         lprocfs_osc_init_vars(&lvars);
3907
3908         switch (lcfg->lcfg_command) {
3909         case LCFG_SPTLRPC_CONF:
3910                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3911                 break;
3912         default:
3913                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3914                                               lcfg, obd);
3915                 break;
3916         }
3917
3918         return(rc);
3919 }
3920
3921 struct obd_ops osc_obd_ops = {
3922         .o_owner                = THIS_MODULE,
3923         .o_setup                = osc_setup,
3924         .o_precleanup           = osc_precleanup,
3925         .o_cleanup              = osc_cleanup,
3926         .o_add_conn             = client_import_add_conn,
3927         .o_del_conn             = client_import_del_conn,
3928         .o_connect              = client_connect_import,
3929         .o_reconnect            = osc_reconnect,
3930         .o_disconnect           = osc_disconnect,
3931         .o_statfs               = osc_statfs,
3932         .o_statfs_async         = osc_statfs_async,
3933         .o_packmd               = osc_packmd,
3934         .o_unpackmd             = osc_unpackmd,
3935         .o_precreate            = osc_precreate,
3936         .o_create               = osc_create,
3937         .o_destroy              = osc_destroy,
3938         .o_getattr              = osc_getattr,
3939         .o_getattr_async        = osc_getattr_async,
3940         .o_setattr              = osc_setattr,
3941         .o_setattr_async        = osc_setattr_async,
3942         .o_brw                  = osc_brw,
3943         .o_brw_async            = osc_brw_async,
3944         .o_prep_async_page      = osc_prep_async_page,
3945         .o_queue_async_io       = osc_queue_async_io,
3946         .o_set_async_flags      = osc_set_async_flags,
3947         .o_queue_group_io       = osc_queue_group_io,
3948         .o_trigger_group_io     = osc_trigger_group_io,
3949         .o_teardown_async_page  = osc_teardown_async_page,
3950         .o_punch                = osc_punch,
3951         .o_sync                 = osc_sync,
3952         .o_enqueue              = osc_enqueue,
3953         .o_match                = osc_match,
3954         .o_change_cbdata        = osc_change_cbdata,
3955         .o_cancel               = osc_cancel,
3956         .o_cancel_unused        = osc_cancel_unused,
3957         .o_join_lru             = osc_join_lru,
3958         .o_iocontrol            = osc_iocontrol,
3959         .o_get_info             = osc_get_info,
3960         .o_set_info_async       = osc_set_info_async,
3961         .o_import_event         = osc_import_event,
3962         .o_llog_init            = osc_llog_init,
3963         .o_llog_finish          = osc_llog_finish,
3964         .o_process_config       = osc_process_config,
3965 };
3966 int __init osc_init(void)
3967 {
3968         struct lprocfs_static_vars lvars = { 0 };
3969         int rc;
3970         ENTRY;
3971
3972         lprocfs_osc_init_vars(&lvars);
3973
3974         request_module("lquota");
3975         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3976         lquota_init(quota_interface);
3977         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3978
3979         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3980                                  LUSTRE_OSC_NAME, NULL);
3981         if (rc) {
3982                 if (quota_interface)
3983                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3984                 RETURN(rc);
3985         }
3986
3987         RETURN(rc);
3988 }
3989
3990 #ifdef __KERNEL__
3991 static void /*__exit*/ osc_exit(void)
3992 {
3993         lquota_exit(quota_interface);
3994         if (quota_interface)
3995                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3996
3997         class_unregister_type(LUSTRE_OSC_NAME);
3998 }
3999
4000 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4001 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4002 MODULE_LICENSE("GPL");
4003
4004 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4005 #endif