Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* by default 10s */
70 atomic_t osc_resend_time; 
71
72 /* Pack OSC object metadata for disk storage (LE byte order). */
73 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
74                       struct lov_stripe_md *lsm)
75 {
76         int lmm_size;
77         ENTRY;
78
79         lmm_size = sizeof(**lmmp);
80         if (!lmmp)
81                 RETURN(lmm_size);
82
83         if (*lmmp && !lsm) {
84                 OBD_FREE(*lmmp, lmm_size);
85                 *lmmp = NULL;
86                 RETURN(0);
87         }
88
89         if (!*lmmp) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (!*lmmp)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm) {
96                 LASSERT(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 LASSERT((*lsmp)->lsm_object_id);
151         }
152
153         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155         RETURN(lsm_size);
156 }
157
158 static int osc_getattr_interpret(struct ptlrpc_request *req,
159                                  struct osc_async_args *aa, int rc)
160 {
161         struct ost_body *body;
162         ENTRY;
163
164         if (rc != 0)
165                 GOTO(out, rc);
166
167         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
168                                   lustre_swab_ost_body);
169         if (body) {
170                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
172
173                 /* This should really be sent by the OST */
174                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
175                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
176         } else {
177                 CERROR("can't unpack ost_body\n");
178                 rc = -EPROTO;
179                 aa->aa_oi->oi_oa->o_valid = 0;
180         }
181 out:
182         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
183         RETURN(rc);
184 }
185
186 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
187                              struct ptlrpc_request_set *set)
188 {
189         struct ptlrpc_request *req;
190         struct ost_body *body;
191         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
192         struct osc_async_args *aa;
193         ENTRY;
194
195         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
196                               OST_GETATTR, 2, size,NULL);
197         if (!req)
198                 RETURN(-ENOMEM);
199
200         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
201         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
202
203         ptlrpc_req_set_repsize(req, 2, size);
204         req->rq_interpret_reply = osc_getattr_interpret;
205
206         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
207         aa = (struct osc_async_args *)&req->rq_async_args;
208         aa->aa_oi = oinfo;
209
210         ptlrpc_set_add_req(set, req);
211         RETURN (0);
212 }
213
214 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         ENTRY;
220
221         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
222                               OST_GETATTR, 2, size, NULL);
223         if (!req)
224                 RETURN(-ENOMEM);
225
226         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
227         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
228
229         ptlrpc_req_set_repsize(req, 2, size);
230
231         rc = ptlrpc_queue_wait(req);
232         if (rc) {
233                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
234                 GOTO(out, rc);
235         }
236
237         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
238                                   lustre_swab_ost_body);
239         if (body == NULL) {
240                 CERROR ("can't unpack ost_body\n");
241                 GOTO (out, rc = -EPROTO);
242         }
243
244         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
246
247         /* This should really be sent by the OST */
248         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
249         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
250
251         EXIT;
252  out:
253         ptlrpc_req_finished(req);
254         return rc;
255 }
256
257 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
258                        struct obd_trans_info *oti)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body *body;
262         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
263         ENTRY;
264
265         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
266                               OST_SETATTR, 2, size, NULL);
267         if (!req)
268                 RETURN(-ENOMEM);
269
270         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
271         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
272
273         ptlrpc_req_set_repsize(req, 2, size);
274
275         rc = ptlrpc_queue_wait(req);
276         if (rc)
277                 GOTO(out, rc);
278
279         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
280                                   lustre_swab_ost_body);
281         if (body == NULL)
282                 GOTO(out, rc = -EPROTO);
283
284         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
285
286         EXIT;
287 out:
288         ptlrpc_req_finished(req);
289         RETURN(rc);
290 }
291
292 static int osc_setattr_interpret(struct ptlrpc_request *req,
293                                  struct osc_async_args *aa, int rc)
294 {
295         struct ost_body *body;
296         ENTRY;
297
298         if (rc != 0)
299                 GOTO(out, rc);
300
301         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
302                                   lustre_swab_ost_body);
303         if (body == NULL) {
304                 CERROR("can't unpack ost_body\n");
305                 GOTO(out, rc = -EPROTO);
306         }
307
308         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
309 out:
310         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
311         RETURN(rc);
312 }
313
314 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
315                              struct obd_trans_info *oti,
316                              struct ptlrpc_request_set *rqset)
317 {
318         struct ptlrpc_request *req;
319         struct ost_body *body;
320         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
321         struct osc_async_args *aa;
322         ENTRY;
323
324         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
325                               OST_SETATTR, 2, size, NULL);
326         if (!req)
327                 RETURN(-ENOMEM);
328
329         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
330
331         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
332                 LASSERT(oti);
333                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
334                        sizeof(*oti->oti_logcookies));
335         }
336
337         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
338         ptlrpc_req_set_repsize(req, 2, size);
339         /* do mds to ost setattr asynchronouly */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req);
343         } else {
344                 req->rq_interpret_reply = osc_setattr_interpret;
345
346                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
347                 aa = (struct osc_async_args *)&req->rq_async_args;
348                 aa->aa_oi = oinfo;
349
350                 ptlrpc_set_add_req(rqset, req);
351         }
352
353         RETURN(0);
354 }
355
356 int osc_real_create(struct obd_export *exp, struct obdo *oa,
357                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
358 {
359         struct ptlrpc_request *req;
360         struct ost_body *body;
361         struct lov_stripe_md *lsm;
362         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
363         ENTRY;
364
365         LASSERT(oa);
366         LASSERT(ea);
367
368         lsm = *ea;
369         if (!lsm) {
370                 rc = obd_alloc_memmd(exp, &lsm);
371                 if (rc < 0)
372                         RETURN(rc);
373         }
374
375         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
376                               OST_CREATE, 2, size, NULL);
377         if (!req)
378                 GOTO(out, rc = -ENOMEM);
379
380         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
381         memcpy(&body->oa, oa, sizeof(body->oa));
382
383         ptlrpc_req_set_repsize(req, 2, size);
384         if (oa->o_valid & OBD_MD_FLINLINE) {
385                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
386                         oa->o_flags == OBD_FL_DELORPHAN);
387                 DEBUG_REQ(D_HA, req,
388                           "delorphan from OST integration");
389                 /* Don't resend the delorphan req */
390                 req->rq_no_resend = req->rq_no_delay = 1;
391         }
392
393         rc = ptlrpc_queue_wait(req);
394         if (rc)
395                 GOTO(out_req, rc);
396
397         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398                                   lustre_swab_ost_body);
399         if (body == NULL) {
400                 CERROR ("can't unpack ost_body\n");
401                 GOTO (out_req, rc = -EPROTO);
402         }
403
404         memcpy(oa, &body->oa, sizeof(*oa));
405
406         /* This should really be sent by the OST */
407         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408         oa->o_valid |= OBD_MD_FLBLKSZ;
409
410         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411          * have valid lsm_oinfo data structs, so don't go touching that.
412          * This needs to be fixed in a big way.
413          */
414         lsm->lsm_object_id = oa->o_id;
415         *ea = lsm;
416
417         if (oti != NULL) {
418                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
419
420                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421                         if (!oti->oti_logcookies)
422                                 oti_alloc_cookies(oti, 1);
423                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
424                                sizeof(oti->oti_onecookie));
425                 }
426         }
427
428         CDEBUG(D_HA, "transno: "LPD64"\n",
429                lustre_msg_get_transno(req->rq_repmsg));
430 out_req:
431         ptlrpc_req_finished(req);
432 out:
433         if (rc && !*ea)
434                 obd_free_memmd(exp, &lsm);
435         RETURN(rc);
436 }
437
438 static int osc_punch_interpret(struct ptlrpc_request *req,
439                                struct osc_async_args *aa, int rc)
440 {
441         struct ost_body *body;
442         ENTRY;
443
444         if (rc != 0)
445                 GOTO(out, rc);
446
447         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
448                                   lustre_swab_ost_body);
449         if (body == NULL) {
450                 CERROR ("can't unpack ost_body\n");
451                 GOTO(out, rc = -EPROTO);
452         }
453
454         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
455 out:
456         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
457         RETURN(rc);
458 }
459
460 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
461                      struct obd_trans_info *oti,
462                      struct ptlrpc_request_set *rqset)
463 {
464         struct ptlrpc_request *req;
465         struct osc_async_args *aa;
466         struct ost_body *body;
467         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
468         ENTRY;
469
470         if (!oinfo->oi_oa) {
471                 CERROR("oa NULL\n");
472                 RETURN(-EINVAL);
473         }
474
475         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
476                               OST_PUNCH, 2, size, NULL);
477         if (!req)
478                 RETURN(-ENOMEM);
479
480         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
481
482         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
484
485         /* overload the size and blocks fields in the oa with start/end */
486         body->oa.o_size = oinfo->oi_policy.l_extent.start;
487         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
489
490         ptlrpc_req_set_repsize(req, 2, size);
491
492         req->rq_interpret_reply = osc_punch_interpret;
493         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494         aa = (struct osc_async_args *)&req->rq_async_args;
495         aa->aa_oi = oinfo;
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN(0);
499 }
500
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502                     struct lov_stripe_md *md, obd_size start, obd_size end)
503 {
504         struct ptlrpc_request *req;
505         struct ost_body *body;
506         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
507         ENTRY;
508
509         if (!oa) {
510                 CERROR("oa NULL\n");
511                 RETURN(-EINVAL);
512         }
513
514         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515                               OST_SYNC, 2, size, NULL);
516         if (!req)
517                 RETURN(-ENOMEM);
518
519         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520         memcpy(&body->oa, oa, sizeof(*oa));
521
522         /* overload the size and blocks fields in the oa with start/end */
523         body->oa.o_size = start;
524         body->oa.o_blocks = end;
525         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
526
527         ptlrpc_req_set_repsize(req, 2, size);
528
529         rc = ptlrpc_queue_wait(req);
530         if (rc)
531                 GOTO(out, rc);
532
533         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534                                   lustre_swab_ost_body);
535         if (body == NULL) {
536                 CERROR ("can't unpack ost_body\n");
537                 GOTO (out, rc = -EPROTO);
538         }
539
540         memcpy(oa, &body->oa, sizeof(*oa));
541
542         EXIT;
543  out:
544         ptlrpc_req_finished(req);
545         return rc;
546 }
547
548 /* Find and cancel locally locks matched by @mode in the resource found by
549  * @objid. Found locks are added into @cancel list. Returns the amount of
550  * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552                                    struct list_head *cancels, ldlm_mode_t mode,
553                                    int lock_flags)
554 {
555         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556         struct ldlm_res_id res_id = { .name = { objid } };
557         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
558         int count;
559         ENTRY;
560
561         if (res == NULL)
562                 RETURN(0);
563
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         ldlm_resource_putref(res);
567         RETURN(count);
568 }
569
570 /* Destroy requests can be async always on the client, and we don't even really
571  * care about the return code since the client cannot do anything at all about
572  * a destroy failure.
573  * When the MDS is unlinking a filename, it saves the file objects into a
574  * recovery llog, and these object records are cancelled when the OST reports
575  * they were destroyed and sync'd to disk (i.e. transaction committed).
576  * If the client dies, or the OST is down when the object should be destroyed,
577  * the records are not cancelled, and when the OST reconnects to the MDS next,
578  * it will retrieve the llog unlink logs and then sends the log cancellation
579  * cookies to the MDS after committing destroy transactions. */
580 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
581                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
582                        struct obd_export *md_export)
583 {
584         CFS_LIST_HEAD(cancels);
585         struct ptlrpc_request *req;
586         struct ost_body *body;
587         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
588         int count, bufcount = 2;
589         ENTRY;
590
591         if (!oa) {
592                 CERROR("oa NULL\n");
593                 RETURN(-EINVAL);
594         }
595
596         count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
597                                         LDLM_FL_DISCARD_DATA);
598         if (exp_connect_cancelset(exp) && count) {
599                 bufcount = 3;
600                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
601         }
602         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
603                               OST_DESTROY, bufcount, size, NULL);
604         if (exp_connect_cancelset(exp) && req)
605                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
606         else
607                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
608
609         if (!req)
610                 RETURN(-ENOMEM);
611
612         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
613
614         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
615
616         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
617                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
618                        sizeof(*oti->oti_logcookies));
619         }
620
621         memcpy(&body->oa, oa, sizeof(*oa));
622         ptlrpc_req_set_repsize(req, 2, size);
623
624         ptlrpcd_add_req(req);
625         RETURN(0);
626 }
627
628 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
629                                 long writing_bytes)
630 {
631         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
632
633         LASSERT(!(oa->o_valid & bits));
634
635         oa->o_valid |= bits;
636         client_obd_list_lock(&cli->cl_loi_list_lock);
637         oa->o_dirty = cli->cl_dirty;
638         if (cli->cl_dirty > cli->cl_dirty_max) {
639                 CERROR("dirty %lu > dirty_max %lu\n",
640                        cli->cl_dirty, cli->cl_dirty_max);
641                 oa->o_undirty = 0;
642         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
643                 CERROR("dirty %d > system dirty_max %d\n",
644                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
645                 oa->o_undirty = 0;
646         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
647                 CERROR("dirty %lu - dirty_max %lu too big???\n",
648                        cli->cl_dirty, cli->cl_dirty_max);
649                 oa->o_undirty = 0;
650         } else {
651                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
652                                 (cli->cl_max_rpcs_in_flight + 1);
653                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
654         }
655         oa->o_grant = cli->cl_avail_grant;
656         oa->o_dropped = cli->cl_lost_grant;
657         cli->cl_lost_grant = 0;
658         client_obd_list_unlock(&cli->cl_loi_list_lock);
659         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
660                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
661 }
662
663 /* caller must hold loi_list_lock */
664 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
665 {
666         atomic_inc(&obd_dirty_pages);
667         cli->cl_dirty += CFS_PAGE_SIZE;
668         cli->cl_avail_grant -= CFS_PAGE_SIZE;
669         pga->flag |= OBD_BRW_FROM_GRANT;
670         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
671                CFS_PAGE_SIZE, pga, pga->pg);
672         LASSERT(cli->cl_avail_grant >= 0);
673 }
674
675 /* the companion to osc_consume_write_grant, called when a brw has completed.
676  * must be called with the loi lock held. */
677 static void osc_release_write_grant(struct client_obd *cli,
678                                     struct brw_page *pga, int sent)
679 {
680         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
681         ENTRY;
682
683         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
684                 EXIT;
685                 return;
686         }
687
688         pga->flag &= ~OBD_BRW_FROM_GRANT;
689         atomic_dec(&obd_dirty_pages);
690         cli->cl_dirty -= CFS_PAGE_SIZE;
691         if (!sent) {
692                 cli->cl_lost_grant += CFS_PAGE_SIZE;
693                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
694                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
695         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
696                 /* For short writes we shouldn't count parts of pages that
697                  * span a whole block on the OST side, or our accounting goes
698                  * wrong.  Should match the code in filter_grant_check. */
699                 int offset = pga->off & ~CFS_PAGE_MASK;
700                 int count = pga->count + (offset & (blocksize - 1));
701                 int end = (offset + pga->count) & (blocksize - 1);
702                 if (end)
703                         count += blocksize - end;
704
705                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
706                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
707                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
708                        cli->cl_avail_grant, cli->cl_dirty);
709         }
710
711         EXIT;
712 }
713
714 static unsigned long rpcs_in_flight(struct client_obd *cli)
715 {
716         return cli->cl_r_in_flight + cli->cl_w_in_flight;
717 }
718
719 /* caller must hold loi_list_lock */
720 void osc_wake_cache_waiters(struct client_obd *cli)
721 {
722         struct list_head *l, *tmp;
723         struct osc_cache_waiter *ocw;
724
725         ENTRY;
726         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
727                 /* if we can't dirty more, we must wait until some is written */
728                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
729                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
730                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
731                                "osc max %ld, sys max %d\n", cli->cl_dirty,
732                                cli->cl_dirty_max, obd_max_dirty_pages);
733                         return;
734                 }
735
736                 /* if still dirty cache but no grant wait for pending RPCs that
737                  * may yet return us some grant before doing sync writes */
738                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
739                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
740                                cli->cl_w_in_flight);
741                         return;
742                 }
743
744                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
745                 list_del_init(&ocw->ocw_entry);
746                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
747                         /* no more RPCs in flight to return grant, do sync IO */
748                         ocw->ocw_rc = -EDQUOT;
749                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
750                 } else {
751                         osc_consume_write_grant(cli,
752                                                 &ocw->ocw_oap->oap_brw_page);
753                 }
754
755                 cfs_waitq_signal(&ocw->ocw_waitq);
756         }
757
758         EXIT;
759 }
760
761 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
762 {
763         client_obd_list_lock(&cli->cl_loi_list_lock);
764         cli->cl_avail_grant = ocd->ocd_grant;
765         client_obd_list_unlock(&cli->cl_loi_list_lock);
766
767         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
768                cli->cl_avail_grant, cli->cl_lost_grant);
769         LASSERT(cli->cl_avail_grant >= 0);
770 }
771
772 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
773 {
774         client_obd_list_lock(&cli->cl_loi_list_lock);
775         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
776         if (body->oa.o_valid & OBD_MD_FLGRANT)
777                 cli->cl_avail_grant += body->oa.o_grant;
778         /* waiters are woken in brw_interpret_oap */
779         client_obd_list_unlock(&cli->cl_loi_list_lock);
780 }
781
782 /* We assume that the reason this OSC got a short read is because it read
783  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
784  * via the LOV, and it _knows_ it's reading inside the file, it's just that
785  * this stripe never got written at or beyond this stripe offset yet. */
786 static void handle_short_read(int nob_read, obd_count page_count,
787                               struct brw_page **pga)
788 {
789         char *ptr;
790         int i = 0;
791
792         /* skip bytes read OK */
793         while (nob_read > 0) {
794                 LASSERT (page_count > 0);
795
796                 if (pga[i]->count > nob_read) {
797                         /* EOF inside this page */
798                         ptr = cfs_kmap(pga[i]->pg) + 
799                                 (pga[i]->off & ~CFS_PAGE_MASK);
800                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
801                         cfs_kunmap(pga[i]->pg);
802                         page_count--;
803                         i++;
804                         break;
805                 }
806
807                 nob_read -= pga[i]->count;
808                 page_count--;
809                 i++;
810         }
811
812         /* zero remaining pages */
813         while (page_count-- > 0) {
814                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
815                 memset(ptr, 0, pga[i]->count);
816                 cfs_kunmap(pga[i]->pg);
817                 i++;
818         }
819 }
820
821 static int check_write_rcs(struct ptlrpc_request *req,
822                            int requested_nob, int niocount,
823                            obd_count page_count, struct brw_page **pga)
824 {
825         int    *remote_rcs, i;
826
827         /* return error if any niobuf was in error */
828         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
829                                         sizeof(*remote_rcs) * niocount, NULL);
830         if (remote_rcs == NULL) {
831                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
832                 return(-EPROTO);
833         }
834         if (lustre_msg_swabbed(req->rq_repmsg))
835                 for (i = 0; i < niocount; i++)
836                         __swab32s(&remote_rcs[i]);
837
838         for (i = 0; i < niocount; i++) {
839                 if (remote_rcs[i] < 0)
840                         return(remote_rcs[i]);
841
842                 if (remote_rcs[i] != 0) {
843                         CERROR("rc[%d] invalid (%d) req %p\n",
844                                 i, remote_rcs[i], req);
845                         return(-EPROTO);
846                 }
847         }
848
849         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
850                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
851                        requested_nob, req->rq_bulk->bd_nob_transferred);
852                 return(-EPROTO);
853         }
854
855         return (0);
856 }
857
858 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
859 {
860         if (p1->flag != p2->flag) {
861                 unsigned mask = ~OBD_BRW_FROM_GRANT;
862
863                 /* warn if we try to combine flags that we don't know to be
864                  * safe to combine */
865                 if ((p1->flag & mask) != (p2->flag & mask))
866                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
867                                "same brw?\n", p1->flag, p2->flag);
868                 return 0;
869         }
870
871         return (p1->off + p1->count == p2->off);
872 }
873
874 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
875                                    struct brw_page **pga, int opc)
876 {
877         __u32 cksum = ~0;
878         int i = 0;
879
880         LASSERT (pg_count > 0);
881         while (nob > 0 && pg_count > 0) {
882                 char *ptr = cfs_kmap(pga[i]->pg);
883                 int off = pga[i]->off & ~CFS_PAGE_MASK;
884                 int count = pga[i]->count > nob ? nob : pga[i]->count;
885
886                 /* corrupt the data before we compute the checksum, to
887                  * simulate an OST->client data error */
888                 if (i == 0 && opc == OST_READ &&
889                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
890                         memcpy(ptr + off, "bad1", min(4, nob));
891                 cksum = crc32_le(cksum, ptr + off, count);
892                 cfs_kunmap(pga[i]->pg);
893                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
894                                off, cksum);
895
896                 nob -= pga[i]->count;
897                 pg_count--;
898                 i++;
899         }
900         /* For sending we only compute the wrong checksum instead
901          * of corrupting the data so it is still correct on a redo */
902         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
903                 cksum++;
904
905         return cksum;
906 }
907
908 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
909                                 struct lov_stripe_md *lsm, obd_count page_count,
910                                 struct brw_page **pga,
911                                 struct ptlrpc_request **reqp)
912 {
913         struct ptlrpc_request   *req;
914         struct ptlrpc_bulk_desc *desc;
915         struct ost_body         *body;
916         struct obd_ioobj        *ioobj;
917         struct niobuf_remote    *niobuf;
918         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
919         int niocount, i, requested_nob, opc, rc;
920         struct ptlrpc_request_pool *pool;
921         struct osc_brw_async_args *aa;
922         struct brw_page *pg_prev;
923
924         ENTRY;
925         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
926         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
927
928         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
929         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
930
931         for (niocount = i = 1; i < page_count; i++) {
932                 if (!can_merge_pages(pga[i - 1], pga[i]))
933                         niocount++;
934         }
935
936         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
937         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
938
939         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
940                                    NULL, pool);
941         if (req == NULL)
942                 RETURN (-ENOMEM);
943
944         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
945
946         if (opc == OST_WRITE)
947                 desc = ptlrpc_prep_bulk_imp (req, page_count,
948                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
949         else
950                 desc = ptlrpc_prep_bulk_imp (req, page_count,
951                                              BULK_PUT_SINK, OST_BULK_PORTAL);
952         if (desc == NULL)
953                 GOTO(out, rc = -ENOMEM);
954         /* NB request now owns desc and will free it when it gets freed */
955
956         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
957         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
958         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
959                                 niocount * sizeof(*niobuf));
960
961         memcpy(&body->oa, oa, sizeof(*oa));
962
963         obdo_to_ioobj(oa, ioobj);
964         ioobj->ioo_bufcnt = niocount;
965
966         LASSERT (page_count > 0);
967         pg_prev = pga[0];
968         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
969                 struct brw_page *pg = pga[i];
970
971                 LASSERT(pg->count > 0);
972                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
973                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
974                          pg->off, pg->count);
975 #ifdef __LINUX__
976                 LASSERTF(i == 0 || pg->off > pg_prev->off,
977                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
978                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
979                          i, page_count,
980                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
981                          pg_prev->pg, page_private(pg_prev->pg),
982                          pg_prev->pg->index, pg_prev->off);
983 #else
984                 LASSERTF(i == 0 || pg->off > pg_prev->off,
985                          "i %d p_c %u\n", i, page_count);
986 #endif
987                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
988                         (pg->flag & OBD_BRW_SRVLOCK));
989
990                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
991                                       pg->count);
992                 requested_nob += pg->count;
993
994                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
995                         niobuf--;
996                         niobuf->len += pg->count;
997                 } else {
998                         niobuf->offset = pg->off;
999                         niobuf->len    = pg->count;
1000                         niobuf->flags  = pg->flag;
1001                 }
1002                 pg_prev = pg;
1003         }
1004
1005         LASSERTF((void *)(niobuf - niocount) ==
1006                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1007                                niocount * sizeof(*niobuf)),
1008                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1009                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1010                 (void *)(niobuf - niocount));
1011
1012         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1013
1014         /* size[REQ_REC_OFF] still sizeof (*body) */
1015         if (opc == OST_WRITE) {
1016                 if (unlikely(cli->cl_checksum)) {
1017                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1018                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1019                                                              page_count, pga,
1020                                                              OST_WRITE);
1021                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1022                                body->oa.o_cksum);
1023                         /* save this in 'oa', too, for later checking */
1024                         oa->o_valid |= OBD_MD_FLCKSUM;
1025                 } else {
1026                         /* clear out the checksum flag, in case this is a
1027                          * resend but cl_checksum is no longer set. b=11238 */
1028                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1029                 }
1030                 oa->o_cksum = body->oa.o_cksum;
1031                 /* 1 RC per niobuf */
1032                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1033                 ptlrpc_req_set_repsize(req, 3, size);
1034         } else {
1035                 if (unlikely(cli->cl_checksum))
1036                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1037                 /* 1 RC for the whole I/O */
1038                 ptlrpc_req_set_repsize(req, 2, size);
1039         }
1040
1041         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1042         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1043         aa->aa_oa = oa;
1044         aa->aa_requested_nob = requested_nob;
1045         aa->aa_nio_count = niocount;
1046         aa->aa_page_count = page_count;
1047         aa->aa_resends = 0;
1048         aa->aa_ppga = pga;
1049         aa->aa_cli = cli;
1050         INIT_LIST_HEAD(&aa->aa_oaps);
1051
1052         *reqp = req;
1053         RETURN (0);
1054
1055  out:
1056         ptlrpc_req_finished (req);
1057         RETURN (rc);
1058 }
1059
1060 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1061                                  __u32 client_cksum, __u32 server_cksum, int nob,
1062                                  obd_count page_count, struct brw_page **pga)
1063 {
1064         __u32 new_cksum;
1065         char *msg;
1066
1067         if (server_cksum == client_cksum) {
1068                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1069                 return 0;
1070         }
1071
1072         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1073
1074         if (new_cksum == server_cksum)
1075                 msg = "changed on the client after we checksummed it - "
1076                       "likely false positive due to mmap IO (bug 11742)";
1077         else if (new_cksum == client_cksum)
1078                 msg = "changed in transit before arrival at OST";
1079         else
1080                 msg = "changed in transit AND doesn't match the original - "
1081                       "likely false positive due to mmap IO (bug 11742)";
1082
1083         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1084                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1085                            "["LPU64"-"LPU64"]\n",
1086                            msg, libcfs_nid2str(peer->nid),
1087                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1088                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1089                                                         (__u64)0,
1090                            oa->o_id,
1091                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1092                            pga[0]->off,
1093                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1094         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1095                client_cksum, server_cksum, new_cksum);
1096
1097         return 1;
1098 }
1099
1100 /* Note rc enters this function as number of bytes transferred */
1101 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1102 {
1103         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1104         const lnet_process_id_t *peer =
1105                         &req->rq_import->imp_connection->c_peer;
1106         struct client_obd *cli = aa->aa_cli;
1107         struct ost_body *body;
1108         __u32 client_cksum = 0;
1109         ENTRY;
1110
1111         if (rc < 0 && rc != -EDQUOT)
1112                 RETURN(rc);
1113
1114         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1115         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1116                                   lustre_swab_ost_body);
1117         if (body == NULL) {
1118                 CERROR ("Can't unpack body\n");
1119                 RETURN(-EPROTO);
1120         }
1121
1122         /* set/clear over quota flag for a uid/gid */
1123         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1124             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1125                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1126                              body->oa.o_gid, body->oa.o_valid,
1127                              body->oa.o_flags);
1128
1129         if (rc < 0)
1130                 RETURN(rc);
1131
1132         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1133                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1134
1135         osc_update_grant(cli, body);
1136
1137         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1138                 if (rc > 0) {
1139                         CERROR ("Unexpected +ve rc %d\n", rc);
1140                         RETURN(-EPROTO);
1141                 }
1142                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1143
1144                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1145                              client_cksum &&
1146                              check_write_checksum(&body->oa, peer, client_cksum,
1147                                                   body->oa.o_cksum,
1148                                                   aa->aa_requested_nob,
1149                                                   aa->aa_page_count,
1150                                                   aa->aa_ppga)))
1151                         RETURN(-EAGAIN);
1152
1153                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1154                                      aa->aa_page_count, aa->aa_ppga);
1155                 GOTO(out, rc);
1156         }
1157
1158         /* The rest of this function executes only for OST_READs */
1159         if (rc > aa->aa_requested_nob) {
1160                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1161                        aa->aa_requested_nob);
1162                 RETURN(-EPROTO);
1163         }
1164
1165         if (rc != req->rq_bulk->bd_nob_transferred) {
1166                 CERROR ("Unexpected rc %d (%d transferred)\n",
1167                         rc, req->rq_bulk->bd_nob_transferred);
1168                 return (-EPROTO);
1169         }
1170
1171         if (rc < aa->aa_requested_nob)
1172                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1173
1174         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1175                 static int cksum_counter;
1176                 __u32      server_cksum = body->oa.o_cksum;
1177                 char      *via;
1178                 char      *router;
1179
1180                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1181                                                  aa->aa_ppga, OST_READ);
1182
1183                 if (peer->nid == req->rq_bulk->bd_sender) {
1184                         via = router = "";
1185                 } else {
1186                         via = " via ";
1187                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1188                 }
1189                 
1190                 if (server_cksum == ~0 && rc > 0) {
1191                         CERROR("Protocol error: server %s set the 'checksum' "
1192                                "bit, but didn't send a checksum.  Not fatal, "
1193                                "but please tell CFS.\n",
1194                                libcfs_nid2str(peer->nid));
1195                 } else if (server_cksum != client_cksum) {
1196                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1197                                            "%s%s%s inum "LPU64"/"LPU64" object "
1198                                            LPU64"/"LPU64" extent "
1199                                            "["LPU64"-"LPU64"]\n",
1200                                            req->rq_import->imp_obd->obd_name,
1201                                            libcfs_nid2str(peer->nid),
1202                                            via, router,
1203                                            body->oa.o_valid & OBD_MD_FLFID ?
1204                                                 body->oa.o_fid : (__u64)0,
1205                                            body->oa.o_valid & OBD_MD_FLFID ?
1206                                                 body->oa.o_generation :(__u64)0,
1207                                            body->oa.o_id,
1208                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1209                                                 body->oa.o_gr : (__u64)0,
1210                                            aa->aa_ppga[0]->off,
1211                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1212                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1213                                                                         1);
1214                         CERROR("client %x, server %x\n",
1215                                client_cksum, server_cksum);
1216                         cksum_counter = 0;
1217                         aa->aa_oa->o_cksum = client_cksum;
1218                         rc = -EAGAIN;
1219                 } else {
1220                         cksum_counter++;
1221                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1222                         rc = 0;
1223                 }
1224         } else if (unlikely(client_cksum)) {
1225                 static int cksum_missed;
1226
1227                 cksum_missed++;
1228                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1229                         CERROR("Checksum %u requested from %s but not sent\n",
1230                                cksum_missed, libcfs_nid2str(peer->nid));
1231         } else {
1232                 rc = 0;
1233         }
1234 out:
1235         if (rc >= 0)
1236                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1237
1238         RETURN(rc);
1239 }
1240
1241 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1242                             struct lov_stripe_md *lsm,
1243                             obd_count page_count, struct brw_page **pga)
1244 {
1245         struct ptlrpc_request *request;
1246         int                    rc;
1247         cfs_waitq_t            waitq;
1248         int                    resends = 0;
1249         struct l_wait_info     lwi;
1250
1251         ENTRY;
1252         init_waitqueue_head(&waitq);
1253
1254 restart_bulk:
1255         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1256                                   page_count, pga, &request);
1257         if (rc != 0)
1258                 return (rc);
1259
1260         rc = ptlrpc_queue_wait(request);
1261
1262         if (rc == -ETIMEDOUT && request->rq_resend) {
1263                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1264                 ptlrpc_req_finished(request);
1265                 goto restart_bulk;
1266         }
1267
1268         rc = osc_brw_fini_request(request, rc);
1269
1270         ptlrpc_req_finished(request);
1271         if (osc_recoverable_error(rc)) {
1272                 resends++;
1273                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1274                         CERROR("too many resend retries, returning error\n");
1275                         RETURN(-EIO);
1276                 }
1277                 
1278                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1279                 l_wait_event(waitq, 0, &lwi);
1280
1281                 goto restart_bulk;
1282         }
1283         RETURN(rc);
1284 }
1285
1286 int osc_brw_redo_request(struct ptlrpc_request *request,
1287                          struct osc_brw_async_args *aa)
1288 {
1289         struct ptlrpc_request *new_req;
1290         struct ptlrpc_request_set *set = request->rq_set;
1291         struct osc_brw_async_args *new_aa;
1292         struct osc_async_page *oap;
1293         int rc = 0;
1294         ENTRY;
1295
1296         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1297                 CERROR("too many resend retries, returning error\n");
1298                 RETURN(-EIO);
1299         }
1300         
1301         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1302
1303         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1304                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1305                                   aa->aa_cli, aa->aa_oa,
1306                                   NULL /* lsm unused by osc currently */,
1307                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1308         if (rc)
1309                 RETURN(rc);
1310
1311         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1312    
1313         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1314                 if (oap->oap_request != NULL) {
1315                         LASSERTF(request == oap->oap_request,
1316                                  "request %p != oap_request %p\n",
1317                                  request, oap->oap_request);
1318                         if (oap->oap_interrupted) {
1319                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1320                                 ptlrpc_req_finished(new_req);                        
1321                                 RETURN(-EINTR);
1322                         }
1323                 }
1324         }
1325         /* New request takes over pga and oaps from old request.
1326          * Note that copying a list_head doesn't work, need to move it... */
1327         aa->aa_resends++;
1328         new_req->rq_interpret_reply = request->rq_interpret_reply;
1329         new_req->rq_async_args = request->rq_async_args;
1330         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1331
1332         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1333
1334         INIT_LIST_HEAD(&new_aa->aa_oaps);
1335         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1336         INIT_LIST_HEAD(&aa->aa_oaps);
1337
1338         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1339                 if (oap->oap_request) {
1340                         ptlrpc_req_finished(oap->oap_request);
1341                         oap->oap_request = ptlrpc_request_addref(new_req);
1342                 }
1343         }
1344         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1345
1346         DEBUG_REQ(D_INFO, new_req, "new request");
1347
1348         ptlrpc_set_add_req(set, new_req);
1349
1350         RETURN(0);
1351 }
1352
1353 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1354 {
1355         struct osc_brw_async_args *aa = data;
1356         int                        i;
1357         int                        nob = rc;
1358         ENTRY;
1359
1360         rc = osc_brw_fini_request(request, rc);
1361         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);   
1362         if (osc_recoverable_error(rc)) {
1363                 rc = osc_brw_redo_request(request, aa);
1364                 if (rc == 0)
1365                         RETURN(0);
1366         }
1367         if ((rc >= 0) && request->rq_set && request->rq_set->set_countp)
1368                 atomic_add(nob, (atomic_t *)request->rq_set->set_countp);
1369         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1370         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1371                 aa->aa_cli->cl_w_in_flight--;
1372         else
1373                 aa->aa_cli->cl_r_in_flight--;
1374
1375         for (i = 0; i < aa->aa_page_count; i++)
1376                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1377         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1378         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1379
1380         RETURN(rc);
1381 }
1382
1383 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1384                           struct lov_stripe_md *lsm, obd_count page_count,
1385                           struct brw_page **pga, struct ptlrpc_request_set *set)
1386 {
1387         struct ptlrpc_request     *request;
1388         struct client_obd         *cli = &exp->exp_obd->u.cli;
1389         int                        rc, i;
1390         struct osc_brw_async_args *aa;
1391         ENTRY;
1392
1393         /* Consume write credits even if doing a sync write -
1394          * otherwise we may run out of space on OST due to grant. */
1395         if (cmd == OBD_BRW_WRITE) {
1396                 client_obd_list_lock(&cli->cl_loi_list_lock);
1397                 for (i = 0; i < page_count; i++) {
1398                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1399                                 osc_consume_write_grant(cli, pga[i]);
1400                 }
1401                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1402         }
1403
1404         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1405                                   page_count, pga, &request);
1406
1407         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1408         if (cmd == OBD_BRW_READ) {
1409                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1410                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1411                 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1412         } else {
1413                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1414                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1415                                  cli->cl_w_in_flight);
1416                 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1417         }
1418
1419         if (rc == 0) {
1420                 request->rq_interpret_reply = brw_interpret;
1421                 ptlrpc_set_add_req(set, request);
1422                 client_obd_list_lock(&cli->cl_loi_list_lock);
1423                 if (cmd == OBD_BRW_READ)
1424                         cli->cl_r_in_flight++;
1425                 else
1426                         cli->cl_w_in_flight++;
1427                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1428         } else if (cmd == OBD_BRW_WRITE) {
1429                 client_obd_list_lock(&cli->cl_loi_list_lock);
1430                 for (i = 0; i < page_count; i++)
1431                         osc_release_write_grant(cli, pga[i], 0);
1432                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1433         }
1434
1435         RETURN (rc);
1436 }
1437
1438 /*
1439  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1440  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1441  * fine for our small page arrays and doesn't require allocation.  its an
1442  * insertion sort that swaps elements that are strides apart, shrinking the
1443  * stride down until its '1' and the array is sorted.
1444  */
1445 static void sort_brw_pages(struct brw_page **array, int num)
1446 {
1447         int stride, i, j;
1448         struct brw_page *tmp;
1449
1450         if (num == 1)
1451                 return;
1452         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1453                 ;
1454
1455         do {
1456                 stride /= 3;
1457                 for (i = stride ; i < num ; i++) {
1458                         tmp = array[i];
1459                         j = i;
1460                         while (j >= stride && array[j-stride]->off > tmp->off) {
1461                                 array[j] = array[j - stride];
1462                                 j -= stride;
1463                         }
1464                         array[j] = tmp;
1465                 }
1466         } while (stride > 1);
1467 }
1468
1469 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1470 {
1471         int count = 1;
1472         int offset;
1473         int i = 0;
1474
1475         LASSERT (pages > 0);
1476         offset = pg[i]->off & (~CFS_PAGE_MASK);
1477
1478         for (;;) {
1479                 pages--;
1480                 if (pages == 0)         /* that's all */
1481                         return count;
1482
1483                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1484                         return count;   /* doesn't end on page boundary */
1485
1486                 i++;
1487                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1488                 if (offset != 0)        /* doesn't start on page boundary */
1489                         return count;
1490
1491                 count++;
1492         }
1493 }
1494
1495 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1496 {
1497         struct brw_page **ppga;
1498         int i;
1499
1500         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1501         if (ppga == NULL)
1502                 return NULL;
1503
1504         for (i = 0; i < count; i++)
1505                 ppga[i] = pga + i;
1506         return ppga;
1507 }
1508
1509 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1510 {
1511         LASSERT(ppga != NULL);
1512         OBD_FREE(ppga, sizeof(*ppga) * count);
1513 }
1514
1515 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1516                    obd_count page_count, struct brw_page *pga,
1517                    struct obd_trans_info *oti)
1518 {
1519         struct obdo *saved_oa = NULL;
1520         struct brw_page **ppga, **orig;
1521         struct obd_import *imp = class_exp2cliimp(exp);
1522         struct client_obd *cli = &imp->imp_obd->u.cli;
1523         int rc, page_count_orig;
1524         ENTRY;
1525
1526         if (cmd & OBD_BRW_CHECK) {
1527                 /* The caller just wants to know if there's a chance that this
1528                  * I/O can succeed */
1529
1530                 if (imp == NULL || imp->imp_invalid)
1531                         RETURN(-EIO);
1532                 RETURN(0);
1533         }
1534
1535         /* test_brw with a failed create can trip this, maybe others. */
1536         LASSERT(cli->cl_max_pages_per_rpc);
1537
1538         rc = 0;
1539
1540         orig = ppga = osc_build_ppga(pga, page_count);
1541         if (ppga == NULL)
1542                 RETURN(-ENOMEM);
1543         page_count_orig = page_count;
1544
1545         sort_brw_pages(ppga, page_count);
1546         while (page_count) {
1547                 obd_count pages_per_brw;
1548
1549                 if (page_count > cli->cl_max_pages_per_rpc)
1550                         pages_per_brw = cli->cl_max_pages_per_rpc;
1551                 else
1552                         pages_per_brw = page_count;
1553
1554                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1555
1556                 if (saved_oa != NULL) {
1557                         /* restore previously saved oa */
1558                         *oinfo->oi_oa = *saved_oa;
1559                 } else if (page_count > pages_per_brw) {
1560                         /* save a copy of oa (brw will clobber it) */
1561                         OBDO_ALLOC(saved_oa);
1562                         if (saved_oa == NULL)
1563                                 GOTO(out, rc = -ENOMEM);
1564                         *saved_oa = *oinfo->oi_oa;
1565                 }
1566
1567                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1568                                       pages_per_brw, ppga);
1569
1570                 if (rc != 0)
1571                         break;
1572
1573                 page_count -= pages_per_brw;
1574                 ppga += pages_per_brw;
1575         }
1576
1577 out:
1578         osc_release_ppga(orig, page_count_orig);
1579
1580         if (saved_oa != NULL)
1581                 OBDO_FREE(saved_oa);
1582
1583         RETURN(rc);
1584 }
1585
1586 static int osc_brw_async(int cmd, struct obd_export *exp,
1587                          struct obd_info *oinfo, obd_count page_count,
1588                          struct brw_page *pga, struct obd_trans_info *oti,
1589                          struct ptlrpc_request_set *set)
1590 {
1591         struct brw_page **ppga, **orig;
1592         int page_count_orig;
1593         int rc = 0;
1594         ENTRY;
1595
1596         if (cmd & OBD_BRW_CHECK) {
1597                 /* The caller just wants to know if there's a chance that this
1598                  * I/O can succeed */
1599                 struct obd_import *imp = class_exp2cliimp(exp);
1600
1601                 if (imp == NULL || imp->imp_invalid)
1602                         RETURN(-EIO);
1603                 RETURN(0);
1604         }
1605
1606         orig = ppga = osc_build_ppga(pga, page_count);
1607         if (ppga == NULL)
1608                 RETURN(-ENOMEM);
1609         page_count_orig = page_count;
1610
1611         sort_brw_pages(ppga, page_count);
1612         while (page_count) {
1613                 struct brw_page **copy;
1614                 obd_count pages_per_brw;
1615
1616                 pages_per_brw = min_t(obd_count, page_count,
1617                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1618
1619                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1620
1621                 /* use ppga only if single RPC is going to fly */
1622                 if (pages_per_brw != page_count_orig || ppga != orig) {
1623                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1624                         if (copy == NULL)
1625                                 GOTO(out, rc = -ENOMEM);
1626                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1627                 } else
1628                         copy = ppga;
1629
1630                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1631                                     pages_per_brw, copy, set);
1632
1633                 if (rc != 0) {
1634                         if (copy != ppga)
1635                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1636                         break;
1637                 }
1638
1639                 if (copy == orig) {
1640                         /* we passed it to async_internal() which is
1641                          * now responsible for releasing memory */
1642                         orig = NULL;
1643                 }
1644
1645                 page_count -= pages_per_brw;
1646                 ppga += pages_per_brw;
1647         }
1648 out:
1649         if (orig)
1650                 osc_release_ppga(orig, page_count_orig);
1651         RETURN(rc);
1652 }
1653
1654 static void osc_check_rpcs(struct client_obd *cli);
1655
1656 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1657  * the dirty accounting.  Writeback completes or truncate happens before
1658  * writing starts.  Must be called with the loi lock held. */
1659 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1660                            int sent)
1661 {
1662         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1663 }
1664
1665 /* This maintains the lists of pending pages to read/write for a given object
1666  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1667  * to quickly find objects that are ready to send an RPC. */
1668 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1669                          int cmd)
1670 {
1671         int optimal;
1672         ENTRY;
1673
1674         if (lop->lop_num_pending == 0)
1675                 RETURN(0);
1676
1677         /* if we have an invalid import we want to drain the queued pages
1678          * by forcing them through rpcs that immediately fail and complete
1679          * the pages.  recovery relies on this to empty the queued pages
1680          * before canceling the locks and evicting down the llite pages */
1681         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1682                 RETURN(1);
1683
1684         /* stream rpcs in queue order as long as as there is an urgent page
1685          * queued.  this is our cheap solution for good batching in the case
1686          * where writepage marks some random page in the middle of the file
1687          * as urgent because of, say, memory pressure */
1688         if (!list_empty(&lop->lop_urgent)) {
1689                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1690                 RETURN(1);
1691         }
1692
1693         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1694         optimal = cli->cl_max_pages_per_rpc;
1695         if (cmd & OBD_BRW_WRITE) {
1696                 /* trigger a write rpc stream as long as there are dirtiers
1697                  * waiting for space.  as they're waiting, they're not going to
1698                  * create more pages to coallesce with what's waiting.. */
1699                 if (!list_empty(&cli->cl_cache_waiters)) {
1700                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1701                         RETURN(1);
1702                 }
1703
1704                 /* +16 to avoid triggering rpcs that would want to include pages
1705                  * that are being queued but which can't be made ready until
1706                  * the queuer finishes with the page. this is a wart for
1707                  * llite::commit_write() */
1708                 optimal += 16;
1709         }
1710         if (lop->lop_num_pending >= optimal)
1711                 RETURN(1);
1712
1713         RETURN(0);
1714 }
1715
1716 static void on_list(struct list_head *item, struct list_head *list,
1717                     int should_be_on)
1718 {
1719         if (list_empty(item) && should_be_on)
1720                 list_add_tail(item, list);
1721         else if (!list_empty(item) && !should_be_on)
1722                 list_del_init(item);
1723 }
1724
1725 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1726  * can find pages to build into rpcs quickly */
1727 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1728 {
1729         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1730                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1731                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1732
1733         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1734                 loi->loi_write_lop.lop_num_pending);
1735
1736         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1737                 loi->loi_read_lop.lop_num_pending);
1738 }
1739
1740 static void lop_update_pending(struct client_obd *cli,
1741                                struct loi_oap_pages *lop, int cmd, int delta)
1742 {
1743         lop->lop_num_pending += delta;
1744         if (cmd & OBD_BRW_WRITE)
1745                 cli->cl_pending_w_pages += delta;
1746         else
1747                 cli->cl_pending_r_pages += delta;
1748 }
1749
1750 /* this is called when a sync waiter receives an interruption.  Its job is to
1751  * get the caller woken as soon as possible.  If its page hasn't been put in an
1752  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1753  * desiring interruption which will forcefully complete the rpc once the rpc
1754  * has timed out */
1755 static void osc_occ_interrupted(struct oig_callback_context *occ)
1756 {
1757         struct osc_async_page *oap;
1758         struct loi_oap_pages *lop;
1759         struct lov_oinfo *loi;
1760         ENTRY;
1761
1762         /* XXX member_of() */
1763         oap = list_entry(occ, struct osc_async_page, oap_occ);
1764
1765         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1766
1767         oap->oap_interrupted = 1;
1768
1769         /* ok, it's been put in an rpc. only one oap gets a request reference */
1770         if (oap->oap_request != NULL) {
1771                 ptlrpc_mark_interrupted(oap->oap_request);
1772                 ptlrpcd_wake(oap->oap_request);
1773                 GOTO(unlock, 0);
1774         }
1775
1776         /* we don't get interruption callbacks until osc_trigger_group_io()
1777          * has been called and put the sync oaps in the pending/urgent lists.*/
1778         if (!list_empty(&oap->oap_pending_item)) {
1779                 list_del_init(&oap->oap_pending_item);
1780                 list_del_init(&oap->oap_urgent_item);
1781
1782                 loi = oap->oap_loi;
1783                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1784                         &loi->loi_write_lop : &loi->loi_read_lop;
1785                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1786                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1787
1788                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1789                 oap->oap_oig = NULL;
1790         }
1791
1792 unlock:
1793         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1794 }
1795
1796 /* this is trying to propogate async writeback errors back up to the
1797  * application.  As an async write fails we record the error code for later if
1798  * the app does an fsync.  As long as errors persist we force future rpcs to be
1799  * sync so that the app can get a sync error and break the cycle of queueing
1800  * pages for which writeback will fail. */
1801 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1802                            int rc)
1803 {
1804         if (rc) {
1805                 if (!ar->ar_rc)
1806                         ar->ar_rc = rc;
1807
1808                 ar->ar_force_sync = 1;
1809                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1810                 return;
1811
1812         }
1813
1814         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1815                 ar->ar_force_sync = 0;
1816 }
1817
1818 static void osc_oap_to_pending(struct osc_async_page *oap)
1819 {
1820         struct loi_oap_pages *lop;
1821
1822         if (oap->oap_cmd & OBD_BRW_WRITE)
1823                 lop = &oap->oap_loi->loi_write_lop;
1824         else
1825                 lop = &oap->oap_loi->loi_read_lop;
1826
1827         if (oap->oap_async_flags & ASYNC_URGENT)
1828                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1829         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1830         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1831 }
1832
1833 /* this must be called holding the loi list lock to give coverage to exit_cache,
1834  * async_flag maintenance, and oap_request */
1835 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1836                               struct osc_async_page *oap, int sent, int rc)
1837 {
1838         __u64 xid = 0;
1839
1840         ENTRY;
1841         if (oap->oap_request != NULL) {
1842                 xid = ptlrpc_req_xid(oap->oap_request);
1843                 ptlrpc_req_finished(oap->oap_request);
1844                 oap->oap_request = NULL;
1845         }
1846
1847         oap->oap_async_flags = 0;
1848         oap->oap_interrupted = 0;
1849
1850         if (oap->oap_cmd & OBD_BRW_WRITE) {
1851                 osc_process_ar(&cli->cl_ar, xid, rc);
1852                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1853         }
1854
1855         if (rc == 0 && oa != NULL) {
1856                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1857                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1858                 if (oa->o_valid & OBD_MD_FLMTIME)
1859                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1860                 if (oa->o_valid & OBD_MD_FLATIME)
1861                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1862                 if (oa->o_valid & OBD_MD_FLCTIME)
1863                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1864         }
1865
1866         if (oap->oap_oig) {
1867                 osc_exit_cache(cli, oap, sent);
1868                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1869                 oap->oap_oig = NULL;
1870                 EXIT;
1871                 return;
1872         }
1873
1874         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1875                                                 oap->oap_cmd, oa, rc);
1876
1877         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1878          * I/O on the page could start, but OSC calls it under lock
1879          * and thus we can add oap back to pending safely */
1880         if (rc)
1881                 /* upper layer wants to leave the page on pending queue */
1882                 osc_oap_to_pending(oap);
1883         else
1884                 osc_exit_cache(cli, oap, sent);
1885         EXIT;
1886 }
1887
1888 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1889 {
1890         struct osc_brw_async_args *aa = data;
1891         struct osc_async_page *oap, *tmp;
1892         struct client_obd *cli;
1893         ENTRY;
1894
1895         rc = osc_brw_fini_request(request, rc);
1896         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1897
1898         if (osc_recoverable_error(rc)) {
1899                 rc = osc_brw_redo_request(request, aa);
1900                 if (rc == 0)
1901                         RETURN(0);
1902         }
1903
1904         cli = aa->aa_cli;
1905         client_obd_list_lock(&cli->cl_loi_list_lock);
1906         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1907          * is called so we know whether to go to sync BRWs or wait for more
1908          * RPCs to complete */
1909         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1910                 cli->cl_w_in_flight--;
1911         else
1912                 cli->cl_r_in_flight--;
1913
1914         /* the caller may re-use the oap after the completion call so
1915          * we need to clean it up a little */
1916         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1917                 list_del_init(&oap->oap_rpc_item);
1918                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1919         }
1920
1921         osc_wake_cache_waiters(cli);
1922         osc_check_rpcs(cli);
1923         client_obd_list_unlock(&cli->cl_loi_list_lock);
1924
1925         OBDO_FREE(aa->aa_oa);
1926
1927         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1928         RETURN(rc);
1929 }
1930
1931 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1932                                             struct list_head *rpc_list,
1933                                             int page_count, int cmd)
1934 {
1935         struct ptlrpc_request *req;
1936         struct brw_page **pga = NULL;
1937         struct osc_brw_async_args *aa;
1938         struct obdo *oa = NULL;
1939         struct obd_async_page_ops *ops = NULL;
1940         void *caller_data = NULL;
1941         struct osc_async_page *oap;
1942         int i, rc;
1943
1944         ENTRY;
1945         LASSERT(!list_empty(rpc_list));
1946
1947         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1948         if (pga == NULL)
1949                 RETURN(ERR_PTR(-ENOMEM));
1950
1951         OBDO_ALLOC(oa);
1952         if (oa == NULL)
1953                 GOTO(out, req = ERR_PTR(-ENOMEM));
1954
1955         i = 0;
1956         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1957                 if (ops == NULL) {
1958                         ops = oap->oap_caller_ops;
1959                         caller_data = oap->oap_caller_data;
1960                 }
1961                 pga[i] = &oap->oap_brw_page;
1962                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1963                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1964                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1965                 i++;
1966         }
1967
1968         /* always get the data for the obdo for the rpc */
1969         LASSERT(ops != NULL);
1970         ops->ap_fill_obdo(caller_data, cmd, oa);
1971
1972         sort_brw_pages(pga, page_count);
1973         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1974         if (rc != 0) {
1975                 CERROR("prep_req failed: %d\n", rc);
1976                 GOTO(out, req = ERR_PTR(rc));
1977         }
1978
1979         /* Need to update the timestamps after the request is built in case
1980          * we race with setattr (locally or in queue at OST).  If OST gets
1981          * later setattr before earlier BRW (as determined by the request xid),
1982          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1983          * way to do this in a single call.  bug 10150 */
1984         ops->ap_update_obdo(caller_data, cmd, oa,
1985                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1986
1987         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1988         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1989         INIT_LIST_HEAD(&aa->aa_oaps);
1990         list_splice(rpc_list, &aa->aa_oaps);
1991         INIT_LIST_HEAD(rpc_list);
1992
1993 out:
1994         if (IS_ERR(req)) {
1995                 if (oa)
1996                         OBDO_FREE(oa);
1997                 if (pga)
1998                         OBD_FREE(pga, sizeof(*pga) * page_count);
1999         }
2000         RETURN(req);
2001 }
2002
2003 /* the loi lock is held across this function but it's allowed to release
2004  * and reacquire it during its work */
2005 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2006                             int cmd, struct loi_oap_pages *lop)
2007 {
2008         struct ptlrpc_request *req;
2009         obd_count page_count = 0;
2010         struct osc_async_page *oap = NULL, *tmp;
2011         struct osc_brw_async_args *aa;
2012         struct obd_async_page_ops *ops;
2013         CFS_LIST_HEAD(rpc_list);
2014         unsigned int ending_offset;
2015         unsigned  starting_offset = 0;
2016         int srvlock = 0;
2017         ENTRY;
2018
2019         /* first we find the pages we're allowed to work with */
2020         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2021                 ops = oap->oap_caller_ops;
2022
2023                 LASSERT(oap->oap_magic == OAP_MAGIC);
2024
2025                 if (page_count != 0 &&
2026                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2027                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2028                                " oap %p, page %p, srvlock %u\n",
2029                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2030                         break;
2031                 }
2032                 /* in llite being 'ready' equates to the page being locked
2033                  * until completion unlocks it.  commit_write submits a page
2034                  * as not ready because its unlock will happen unconditionally
2035                  * as the call returns.  if we race with commit_write giving
2036                  * us that page we dont' want to create a hole in the page
2037                  * stream, so we stop and leave the rpc to be fired by
2038                  * another dirtier or kupdated interval (the not ready page
2039                  * will still be on the dirty list).  we could call in
2040                  * at the end of ll_file_write to process the queue again. */
2041                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2042                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2043                         if (rc < 0)
2044                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2045                                                 "instead of ready\n", oap,
2046                                                 oap->oap_page, rc);
2047                         switch (rc) {
2048                         case -EAGAIN:
2049                                 /* llite is telling us that the page is still
2050                                  * in commit_write and that we should try
2051                                  * and put it in an rpc again later.  we
2052                                  * break out of the loop so we don't create
2053                                  * a hole in the sequence of pages in the rpc
2054                                  * stream.*/
2055                                 oap = NULL;
2056                                 break;
2057                         case -EINTR:
2058                                 /* the io isn't needed.. tell the checks
2059                                  * below to complete the rpc with EINTR */
2060                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2061                                 oap->oap_count = -EINTR;
2062                                 break;
2063                         case 0:
2064                                 oap->oap_async_flags |= ASYNC_READY;
2065                                 break;
2066                         default:
2067                                 LASSERTF(0, "oap %p page %p returned %d "
2068                                             "from make_ready\n", oap,
2069                                             oap->oap_page, rc);
2070                                 break;
2071                         }
2072                 }
2073                 if (oap == NULL)
2074                         break;
2075                 /*
2076                  * Page submitted for IO has to be locked. Either by
2077                  * ->ap_make_ready() or by higher layers.
2078                  *
2079                  * XXX nikita: this assertion should be adjusted when lustre
2080                  * starts using PG_writeback for pages being written out.
2081                  */
2082 #if defined(__KERNEL__) && defined(__LINUX__)
2083                 LASSERT(PageLocked(oap->oap_page));
2084 #endif
2085                 /* If there is a gap at the start of this page, it can't merge
2086                  * with any previous page, so we'll hand the network a
2087                  * "fragmented" page array that it can't transfer in 1 RDMA */
2088                 if (page_count != 0 && oap->oap_page_off != 0)
2089                         break;
2090
2091                 /* take the page out of our book-keeping */
2092                 list_del_init(&oap->oap_pending_item);
2093                 lop_update_pending(cli, lop, cmd, -1);
2094                 list_del_init(&oap->oap_urgent_item);
2095
2096                 if (page_count == 0)
2097                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2098                                           (PTLRPC_MAX_BRW_SIZE - 1);
2099
2100                 /* ask the caller for the size of the io as the rpc leaves. */
2101                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2102                         oap->oap_count =
2103                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2104                 if (oap->oap_count <= 0) {
2105                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2106                                oap->oap_count);
2107                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2108                         continue;
2109                 }
2110
2111                 /* now put the page back in our accounting */
2112                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2113                 if (page_count == 0)
2114                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2115                 if (++page_count >= cli->cl_max_pages_per_rpc)
2116                         break;
2117
2118                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2119                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2120                  * have the same alignment as the initial writes that allocated
2121                  * extents on the server. */
2122                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2123                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2124                 if (ending_offset == 0)
2125                         break;
2126
2127                 /* If there is a gap at the end of this page, it can't merge
2128                  * with any subsequent pages, so we'll hand the network a
2129                  * "fragmented" page array that it can't transfer in 1 RDMA */
2130                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2131                         break;
2132         }
2133
2134         osc_wake_cache_waiters(cli);
2135
2136         if (page_count == 0)
2137                 RETURN(0);
2138
2139         loi_list_maint(cli, loi);
2140
2141         client_obd_list_unlock(&cli->cl_loi_list_lock);
2142
2143         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2144         if (IS_ERR(req)) {
2145                 /* this should happen rarely and is pretty bad, it makes the
2146                  * pending list not follow the dirty order */
2147                 client_obd_list_lock(&cli->cl_loi_list_lock);
2148                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2149                         list_del_init(&oap->oap_rpc_item);
2150
2151                         /* queued sync pages can be torn down while the pages
2152                          * were between the pending list and the rpc */
2153                         if (oap->oap_interrupted) {
2154                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2155                                 osc_ap_completion(cli, NULL, oap, 0,
2156                                                   oap->oap_count);
2157                                 continue;
2158                         }
2159                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2160                 }
2161                 loi_list_maint(cli, loi);
2162                 RETURN(PTR_ERR(req));
2163         }
2164
2165         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2166         if (cmd == OBD_BRW_READ) {
2167                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2168                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2169                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2170                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2171                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2172         } else {
2173                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2174                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2175                                  cli->cl_w_in_flight);
2176                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2177                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2178                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2179         }
2180
2181         client_obd_list_lock(&cli->cl_loi_list_lock);
2182
2183         if (cmd == OBD_BRW_READ)
2184                 cli->cl_r_in_flight++;
2185         else
2186                 cli->cl_w_in_flight++;
2187
2188         /* queued sync pages can be torn down while the pages
2189          * were between the pending list and the rpc */
2190         tmp = NULL;
2191         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2192                 /* only one oap gets a request reference */
2193                 if (tmp == NULL)
2194                         tmp = oap;
2195                 if (oap->oap_interrupted && !req->rq_intr) {
2196                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2197                                oap, req);
2198                         ptlrpc_mark_interrupted(req);
2199                 }
2200         }
2201         if (tmp != NULL)
2202                 tmp->oap_request = ptlrpc_request_addref(req);
2203
2204         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2205                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2206
2207         req->rq_interpret_reply = brw_interpret_oap;
2208         ptlrpcd_add_req(req);
2209         RETURN(1);
2210 }
2211
2212 #define LOI_DEBUG(LOI, STR, args...)                                     \
2213         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2214                !list_empty(&(LOI)->loi_cli_item),                        \
2215                (LOI)->loi_write_lop.lop_num_pending,                     \
2216                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2217                (LOI)->loi_read_lop.lop_num_pending,                      \
2218                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2219                args)                                                     \
2220
2221 /* This is called by osc_check_rpcs() to find which objects have pages that
2222  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2223 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2224 {
2225         ENTRY;
2226         /* first return all objects which we already know to have
2227          * pages ready to be stuffed into rpcs */
2228         if (!list_empty(&cli->cl_loi_ready_list))
2229                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2230                                   struct lov_oinfo, loi_cli_item));
2231
2232         /* then if we have cache waiters, return all objects with queued
2233          * writes.  This is especially important when many small files
2234          * have filled up the cache and not been fired into rpcs because
2235          * they don't pass the nr_pending/object threshhold */
2236         if (!list_empty(&cli->cl_cache_waiters) &&
2237             !list_empty(&cli->cl_loi_write_list))
2238                 RETURN(list_entry(cli->cl_loi_write_list.next,
2239                                   struct lov_oinfo, loi_write_item));
2240
2241         /* then return all queued objects when we have an invalid import
2242          * so that they get flushed */
2243         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2244                 if (!list_empty(&cli->cl_loi_write_list))
2245                         RETURN(list_entry(cli->cl_loi_write_list.next,
2246                                           struct lov_oinfo, loi_write_item));
2247                 if (!list_empty(&cli->cl_loi_read_list))
2248                         RETURN(list_entry(cli->cl_loi_read_list.next,
2249                                           struct lov_oinfo, loi_read_item));
2250         }
2251         RETURN(NULL);
2252 }
2253
2254 /* called with the loi list lock held */
2255 static void osc_check_rpcs(struct client_obd *cli)
2256 {
2257         struct lov_oinfo *loi;
2258         int rc = 0, race_counter = 0;
2259         ENTRY;
2260
2261         while ((loi = osc_next_loi(cli)) != NULL) {
2262                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2263
2264                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2265                         break;
2266
2267                 /* attempt some read/write balancing by alternating between
2268                  * reads and writes in an object.  The makes_rpc checks here
2269                  * would be redundant if we were getting read/write work items
2270                  * instead of objects.  we don't want send_oap_rpc to drain a
2271                  * partial read pending queue when we're given this object to
2272                  * do io on writes while there are cache waiters */
2273                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2274                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2275                                               &loi->loi_write_lop);
2276                         if (rc < 0)
2277                                 break;
2278                         if (rc > 0)
2279                                 race_counter = 0;
2280                         else
2281                                 race_counter++;
2282                 }
2283                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2284                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2285                                               &loi->loi_read_lop);
2286                         if (rc < 0)
2287                                 break;
2288                         if (rc > 0)
2289                                 race_counter = 0;
2290                         else
2291                                 race_counter++;
2292                 }
2293
2294                 /* attempt some inter-object balancing by issueing rpcs
2295                  * for each object in turn */
2296                 if (!list_empty(&loi->loi_cli_item))
2297                         list_del_init(&loi->loi_cli_item);
2298                 if (!list_empty(&loi->loi_write_item))
2299                         list_del_init(&loi->loi_write_item);
2300                 if (!list_empty(&loi->loi_read_item))
2301                         list_del_init(&loi->loi_read_item);
2302
2303                 loi_list_maint(cli, loi);
2304
2305                 /* send_oap_rpc fails with 0 when make_ready tells it to
2306                  * back off.  llite's make_ready does this when it tries
2307                  * to lock a page queued for write that is already locked.
2308                  * we want to try sending rpcs from many objects, but we
2309                  * don't want to spin failing with 0.  */
2310                 if (race_counter == 10)
2311                         break;
2312         }
2313         EXIT;
2314 }
2315
2316 /* we're trying to queue a page in the osc so we're subject to the
2317  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2318  * If the osc's queued pages are already at that limit, then we want to sleep
2319  * until there is space in the osc's queue for us.  We also may be waiting for
2320  * write credits from the OST if there are RPCs in flight that may return some
2321  * before we fall back to sync writes.
2322  *
2323  * We need this know our allocation was granted in the presence of signals */
2324 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2325 {
2326         int rc;
2327         ENTRY;
2328         client_obd_list_lock(&cli->cl_loi_list_lock);
2329         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2330         client_obd_list_unlock(&cli->cl_loi_list_lock);
2331         RETURN(rc);
2332 };
2333
2334 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2335  * grant or cache space. */
2336 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2337                            struct osc_async_page *oap)
2338 {
2339         struct osc_cache_waiter ocw;
2340         struct l_wait_info lwi = { 0 };
2341         ENTRY;
2342
2343         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2344                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2345                cli->cl_dirty_max, obd_max_dirty_pages,
2346                cli->cl_lost_grant, cli->cl_avail_grant);
2347
2348         /* force the caller to try sync io.  this can jump the list
2349          * of queued writes and create a discontiguous rpc stream */
2350         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2351             loi->loi_ar.ar_force_sync)
2352                 RETURN(-EDQUOT);
2353
2354         /* Hopefully normal case - cache space and write credits available */
2355         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2356             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2357             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2358                 /* account for ourselves */
2359                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2360                 RETURN(0);
2361         }
2362
2363         /* Make sure that there are write rpcs in flight to wait for.  This
2364          * is a little silly as this object may not have any pending but
2365          * other objects sure might. */
2366         if (cli->cl_w_in_flight) {
2367                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2368                 cfs_waitq_init(&ocw.ocw_waitq);
2369                 ocw.ocw_oap = oap;
2370                 ocw.ocw_rc = 0;
2371
2372                 loi_list_maint(cli, loi);
2373                 osc_check_rpcs(cli);
2374                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2375
2376                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2377                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2378
2379                 client_obd_list_lock(&cli->cl_loi_list_lock);
2380                 if (!list_empty(&ocw.ocw_entry)) {
2381                         list_del(&ocw.ocw_entry);
2382                         RETURN(-EINTR);
2383                 }
2384                 RETURN(ocw.ocw_rc);
2385         }
2386
2387         RETURN(-EDQUOT);
2388 }
2389
2390 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2391                         struct lov_oinfo *loi, cfs_page_t *page,
2392                         obd_off offset, struct obd_async_page_ops *ops,
2393                         void *data, void **res)
2394 {
2395         struct osc_async_page *oap;
2396         ENTRY;
2397
2398         if (!page)
2399                 return size_round(sizeof(*oap));
2400
2401         oap = *res;
2402         oap->oap_magic = OAP_MAGIC;
2403         oap->oap_cli = &exp->exp_obd->u.cli;
2404         oap->oap_loi = loi;
2405
2406         oap->oap_caller_ops = ops;
2407         oap->oap_caller_data = data;
2408
2409         oap->oap_page = page;
2410         oap->oap_obj_off = offset;
2411
2412         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2413         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2414         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2415
2416         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2417
2418         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2419         RETURN(0);
2420 }
2421
2422 struct osc_async_page *oap_from_cookie(void *cookie)
2423 {
2424         struct osc_async_page *oap = cookie;
2425         if (oap->oap_magic != OAP_MAGIC)
2426                 return ERR_PTR(-EINVAL);
2427         return oap;
2428 };
2429
2430 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2431                               struct lov_oinfo *loi, void *cookie,
2432                               int cmd, obd_off off, int count,
2433                               obd_flag brw_flags, enum async_flags async_flags)
2434 {
2435         struct client_obd *cli = &exp->exp_obd->u.cli;
2436         struct osc_async_page *oap;
2437         int rc = 0;
2438         ENTRY;
2439
2440         oap = oap_from_cookie(cookie);
2441         if (IS_ERR(oap))
2442                 RETURN(PTR_ERR(oap));
2443
2444         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2445                 RETURN(-EIO);
2446
2447         if (!list_empty(&oap->oap_pending_item) ||
2448             !list_empty(&oap->oap_urgent_item) ||
2449             !list_empty(&oap->oap_rpc_item))
2450                 RETURN(-EBUSY);
2451
2452         /* check if the file's owner/group is over quota */
2453 #ifdef HAVE_QUOTA_SUPPORT
2454         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2455                 struct obd_async_page_ops *ops;
2456                 struct obdo *oa;
2457
2458                 OBDO_ALLOC(oa);
2459                 if (oa == NULL)
2460                         RETURN(-ENOMEM);
2461
2462                 ops = oap->oap_caller_ops;
2463                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2464                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2465                     NO_QUOTA)
2466                         rc = -EDQUOT;
2467
2468                 OBDO_FREE(oa);
2469                 if (rc)
2470                         RETURN(rc);
2471         }
2472 #endif
2473
2474         if (loi == NULL)
2475                 loi = lsm->lsm_oinfo[0];
2476
2477         client_obd_list_lock(&cli->cl_loi_list_lock);
2478
2479         oap->oap_cmd = cmd;
2480         oap->oap_page_off = off;
2481         oap->oap_count = count;
2482         oap->oap_brw_flags = brw_flags;
2483         oap->oap_async_flags = async_flags;
2484
2485         if (cmd & OBD_BRW_WRITE) {
2486                 rc = osc_enter_cache(cli, loi, oap);
2487                 if (rc) {
2488                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2489                         RETURN(rc);
2490                 }
2491         }
2492
2493         osc_oap_to_pending(oap);
2494         loi_list_maint(cli, loi);
2495
2496         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2497                   cmd);
2498
2499         osc_check_rpcs(cli);
2500         client_obd_list_unlock(&cli->cl_loi_list_lock);
2501
2502         RETURN(0);
2503 }
2504
2505 /* aka (~was & now & flag), but this is more clear :) */
2506 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2507
2508 static int osc_set_async_flags(struct obd_export *exp,
2509                                struct lov_stripe_md *lsm,
2510                                struct lov_oinfo *loi, void *cookie,
2511                                obd_flag async_flags)
2512 {
2513         struct client_obd *cli = &exp->exp_obd->u.cli;
2514         struct loi_oap_pages *lop;
2515         struct osc_async_page *oap;
2516         int rc = 0;
2517         ENTRY;
2518
2519         oap = oap_from_cookie(cookie);
2520         if (IS_ERR(oap))
2521                 RETURN(PTR_ERR(oap));
2522
2523         /*
2524          * bug 7311: OST-side locking is only supported for liblustre for now
2525          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2526          * implementation has to handle case where OST-locked page was picked
2527          * up by, e.g., ->writepage().
2528          */
2529         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2530         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2531                                      * tread here. */
2532
2533         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2534                 RETURN(-EIO);
2535
2536         if (loi == NULL)
2537                 loi = lsm->lsm_oinfo[0];
2538
2539         if (oap->oap_cmd & OBD_BRW_WRITE) {
2540                 lop = &loi->loi_write_lop;
2541         } else {
2542                 lop = &loi->loi_read_lop;
2543         }
2544
2545         client_obd_list_lock(&cli->cl_loi_list_lock);
2546
2547         if (list_empty(&oap->oap_pending_item))
2548                 GOTO(out, rc = -EINVAL);
2549
2550         if ((oap->oap_async_flags & async_flags) == async_flags)
2551                 GOTO(out, rc = 0);
2552
2553         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2554                 oap->oap_async_flags |= ASYNC_READY;
2555
2556         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2557                 if (list_empty(&oap->oap_rpc_item)) {
2558                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2559                         loi_list_maint(cli, loi);
2560                 }
2561         }
2562
2563         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2564                         oap->oap_async_flags);
2565 out:
2566         osc_check_rpcs(cli);
2567         client_obd_list_unlock(&cli->cl_loi_list_lock);
2568         RETURN(rc);
2569 }
2570
2571 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2572                              struct lov_oinfo *loi,
2573                              struct obd_io_group *oig, void *cookie,
2574                              int cmd, obd_off off, int count,
2575                              obd_flag brw_flags,
2576                              obd_flag async_flags)
2577 {
2578         struct client_obd *cli = &exp->exp_obd->u.cli;
2579         struct osc_async_page *oap;
2580         struct loi_oap_pages *lop;
2581         int rc = 0;
2582         ENTRY;
2583
2584         oap = oap_from_cookie(cookie);
2585         if (IS_ERR(oap))
2586                 RETURN(PTR_ERR(oap));
2587
2588         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2589                 RETURN(-EIO);
2590
2591         if (!list_empty(&oap->oap_pending_item) ||
2592             !list_empty(&oap->oap_urgent_item) ||
2593             !list_empty(&oap->oap_rpc_item))
2594                 RETURN(-EBUSY);
2595
2596         if (loi == NULL)
2597                 loi = lsm->lsm_oinfo[0];
2598
2599         client_obd_list_lock(&cli->cl_loi_list_lock);
2600
2601         oap->oap_cmd = cmd;
2602         oap->oap_page_off = off;
2603         oap->oap_count = count;
2604         oap->oap_brw_flags = brw_flags;
2605         oap->oap_async_flags = async_flags;
2606
2607         if (cmd & OBD_BRW_WRITE)
2608                 lop = &loi->loi_write_lop;
2609         else
2610                 lop = &loi->loi_read_lop;
2611
2612         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2613         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2614                 oap->oap_oig = oig;
2615                 rc = oig_add_one(oig, &oap->oap_occ);
2616         }
2617
2618         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2619                   oap, oap->oap_page, rc);
2620
2621         client_obd_list_unlock(&cli->cl_loi_list_lock);
2622
2623         RETURN(rc);
2624 }
2625
2626 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2627                                  struct loi_oap_pages *lop, int cmd)
2628 {
2629         struct list_head *pos, *tmp;
2630         struct osc_async_page *oap;
2631
2632         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2633                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2634                 list_del(&oap->oap_pending_item);
2635                 osc_oap_to_pending(oap);
2636         }
2637         loi_list_maint(cli, loi);
2638 }
2639
2640 static int osc_trigger_group_io(struct obd_export *exp,
2641                                 struct lov_stripe_md *lsm,
2642                                 struct lov_oinfo *loi,
2643                                 struct obd_io_group *oig)
2644 {
2645         struct client_obd *cli = &exp->exp_obd->u.cli;
2646         ENTRY;
2647
2648         if (loi == NULL)
2649                 loi = lsm->lsm_oinfo[0];
2650
2651         client_obd_list_lock(&cli->cl_loi_list_lock);
2652
2653         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2654         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2655
2656         osc_check_rpcs(cli);
2657         client_obd_list_unlock(&cli->cl_loi_list_lock);
2658
2659         RETURN(0);
2660 }
2661
2662 static int osc_teardown_async_page(struct obd_export *exp,
2663                                    struct lov_stripe_md *lsm,
2664                                    struct lov_oinfo *loi, void *cookie)
2665 {
2666         struct client_obd *cli = &exp->exp_obd->u.cli;
2667         struct loi_oap_pages *lop;
2668         struct osc_async_page *oap;
2669         int rc = 0;
2670         ENTRY;
2671
2672         oap = oap_from_cookie(cookie);
2673         if (IS_ERR(oap))
2674                 RETURN(PTR_ERR(oap));
2675
2676         if (loi == NULL)
2677                 loi = lsm->lsm_oinfo[0];
2678
2679         if (oap->oap_cmd & OBD_BRW_WRITE) {
2680                 lop = &loi->loi_write_lop;
2681         } else {
2682                 lop = &loi->loi_read_lop;
2683         }
2684
2685         client_obd_list_lock(&cli->cl_loi_list_lock);
2686
2687         if (!list_empty(&oap->oap_rpc_item))
2688                 GOTO(out, rc = -EBUSY);
2689
2690         osc_exit_cache(cli, oap, 0);
2691         osc_wake_cache_waiters(cli);
2692
2693         if (!list_empty(&oap->oap_urgent_item)) {
2694                 list_del_init(&oap->oap_urgent_item);
2695                 oap->oap_async_flags &= ~ASYNC_URGENT;
2696         }
2697         if (!list_empty(&oap->oap_pending_item)) {
2698                 list_del_init(&oap->oap_pending_item);
2699                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2700         }
2701         loi_list_maint(cli, loi);
2702
2703         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2704 out:
2705         client_obd_list_unlock(&cli->cl_loi_list_lock);
2706         RETURN(rc);
2707 }
2708
2709 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2710                                     int flags)
2711 {
2712         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2713
2714         if (lock == NULL) {
2715                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2716                 return;
2717         }
2718         lock_res_and_lock(lock);
2719 #ifdef __KERNEL__
2720 #ifdef __LINUX__
2721         /* Liang XXX: Darwin and Winnt checking should be added */
2722         if (lock->l_ast_data && lock->l_ast_data != data) {
2723                 struct inode *new_inode = data;
2724                 struct inode *old_inode = lock->l_ast_data;
2725                 if (!(old_inode->i_state & I_FREEING))
2726                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2727                 LASSERTF(old_inode->i_state & I_FREEING,
2728                          "Found existing inode %p/%lu/%u state %lu in lock: "
2729                          "setting data to %p/%lu/%u\n", old_inode,
2730                          old_inode->i_ino, old_inode->i_generation,
2731                          old_inode->i_state,
2732                          new_inode, new_inode->i_ino, new_inode->i_generation);
2733         }
2734 #endif
2735 #endif
2736         lock->l_ast_data = data;
2737         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2738         unlock_res_and_lock(lock);
2739         LDLM_LOCK_PUT(lock);
2740 }
2741
2742 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2743                              ldlm_iterator_t replace, void *data)
2744 {
2745         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2746         struct obd_device *obd = class_exp2obd(exp);
2747
2748         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2749         return 0;
2750 }
2751
2752 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2753                             int intent, int rc)
2754 {
2755         ENTRY;
2756
2757         if (intent) {
2758                 /* The request was created before ldlm_cli_enqueue call. */
2759                 if (rc == ELDLM_LOCK_ABORTED) {
2760                         struct ldlm_reply *rep;
2761
2762                         /* swabbed by ldlm_cli_enqueue() */
2763                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2764                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2765                                              sizeof(*rep));
2766                         LASSERT(rep != NULL);
2767                         if (rep->lock_policy_res1)
2768                                 rc = rep->lock_policy_res1;
2769                 }
2770         }
2771
2772         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2773                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2774                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2775                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2776                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2777         }
2778
2779         /* Call the update callback. */
2780         rc = oinfo->oi_cb_up(oinfo, rc);
2781         RETURN(rc);
2782 }
2783
2784 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2785                                  struct osc_enqueue_args *aa, int rc)
2786 {
2787         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2788         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2789         struct ldlm_lock *lock;
2790
2791         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2792          * be valid. */
2793         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2794
2795         /* Complete obtaining the lock procedure. */
2796         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2797                                    aa->oa_ei->ei_mode,
2798                                    &aa->oa_oi->oi_flags,
2799                                    &lsm->lsm_oinfo[0]->loi_lvb,
2800                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2801                                    lustre_swab_ost_lvb,
2802                                    aa->oa_oi->oi_lockh, rc);
2803
2804         /* Complete osc stuff. */
2805         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2806
2807         /* Release the lock for async request. */
2808         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2809                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2810
2811         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2812                  aa->oa_oi->oi_lockh, req, aa);
2813         LDLM_LOCK_PUT(lock);
2814         return rc;
2815 }
2816
2817 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2818  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2819  * other synchronous requests, however keeping some locks and trying to obtain
2820  * others may take a considerable amount of time in a case of ost failure; and
2821  * when other sync requests do not get released lock from a client, the client
2822  * is excluded from the cluster -- such scenarious make the life difficult, so
2823  * release locks just after they are obtained. */
2824 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2825                        struct ldlm_enqueue_info *einfo,
2826                        struct ptlrpc_request_set *rqset)
2827 {
2828         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2829         struct obd_device *obd = exp->exp_obd;
2830         struct ldlm_reply *rep;
2831         struct ptlrpc_request *req = NULL;
2832         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2833         int rc;
2834         ENTRY;
2835
2836         /* Filesystem lock extents are extended to page boundaries so that
2837          * dealing with the page cache is a little smoother.  */
2838         oinfo->oi_policy.l_extent.start -=
2839                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2840         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2841
2842         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2843                 goto no_match;
2844
2845         /* Next, search for already existing extent locks that will cover us */
2846         rc = ldlm_lock_match(obd->obd_namespace,
2847                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2848                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2849                              oinfo->oi_lockh);
2850         if (rc == 1) {
2851                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2852                                         oinfo->oi_flags);
2853                 if (intent) {
2854                         /* I would like to be able to ASSERT here that rss <=
2855                          * kms, but I can't, for reasons which are explained in
2856                          * lov_enqueue() */
2857                 }
2858
2859                 /* We already have a lock, and it's referenced */
2860                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2861
2862                 /* For async requests, decref the lock. */
2863                 if (rqset)
2864                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2865
2866                 RETURN(ELDLM_OK);
2867         }
2868
2869         /* If we're trying to read, we also search for an existing PW lock.  The
2870          * VFS and page cache already protect us locally, so lots of readers/
2871          * writers can share a single PW lock.
2872          *
2873          * There are problems with conversion deadlocks, so instead of
2874          * converting a read lock to a write lock, we'll just enqueue a new
2875          * one.
2876          *
2877          * At some point we should cancel the read lock instead of making them
2878          * send us a blocking callback, but there are problems with canceling
2879          * locks out from other users right now, too. */
2880
2881         if (einfo->ei_mode == LCK_PR) {
2882                 rc = ldlm_lock_match(obd->obd_namespace,
2883                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2884                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2885                                      LCK_PW, oinfo->oi_lockh);
2886                 if (rc == 1) {
2887                         /* FIXME: This is not incredibly elegant, but it might
2888                          * be more elegant than adding another parameter to
2889                          * lock_match.  I want a second opinion. */
2890                         /* addref the lock only if not async requests. */
2891                         if (!rqset)
2892                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2893                         osc_set_data_with_check(oinfo->oi_lockh,
2894                                                 einfo->ei_cbdata,
2895                                                 oinfo->oi_flags);
2896                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2897                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2898                         RETURN(ELDLM_OK);
2899                 }
2900         }
2901
2902  no_match:
2903         if (intent) {
2904                 int size[3] = {
2905                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2906                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2907                         [DLM_LOCKREQ_OFF + 1] = 0 };
2908
2909                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2910                 if (req == NULL)
2911                         RETURN(-ENOMEM);
2912
2913                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2914                 size[DLM_REPLY_REC_OFF] = 
2915                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2916                 ptlrpc_req_set_repsize(req, 3, size);
2917         }
2918
2919         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2920         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2921
2922         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
2923                               &oinfo->oi_policy, &oinfo->oi_flags,
2924                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2925                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2926                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2927                               rqset ? 1 : 0);
2928         if (rqset) {
2929                 if (!rc) {
2930                         struct osc_enqueue_args *aa;
2931                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2932                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2933                         aa->oa_oi = oinfo;
2934                         aa->oa_ei = einfo;
2935                         aa->oa_exp = exp;
2936
2937                         req->rq_interpret_reply = osc_enqueue_interpret;
2938                         ptlrpc_set_add_req(rqset, req);
2939                 } else if (intent) {
2940                         ptlrpc_req_finished(req);
2941                 }
2942                 RETURN(rc);
2943         }
2944
2945         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2946         if (intent)
2947                 ptlrpc_req_finished(req);
2948
2949         RETURN(rc);
2950 }
2951
2952 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2953                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2954                      int *flags, void *data, struct lustre_handle *lockh)
2955 {
2956         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2957         struct obd_device *obd = exp->exp_obd;
2958         int rc;
2959         int lflags = *flags;
2960         ENTRY;
2961
2962         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2963
2964         /* Filesystem lock extents are extended to page boundaries so that
2965          * dealing with the page cache is a little smoother */
2966         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2967         policy->l_extent.end |= ~CFS_PAGE_MASK;
2968
2969         /* Next, search for already existing extent locks that will cover us */
2970         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2971                              policy, mode, lockh);
2972         if (rc) {
2973                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2974                         osc_set_data_with_check(lockh, data, lflags);
2975                 RETURN(rc);
2976         }
2977         /* If we're trying to read, we also search for an existing PW lock.  The
2978          * VFS and page cache already protect us locally, so lots of readers/
2979          * writers can share a single PW lock. */
2980         if (mode == LCK_PR) {
2981                 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
2982                                      &res_id, type,
2983                                      policy, LCK_PW, lockh);
2984                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2985                         /* FIXME: This is not incredibly elegant, but it might
2986                          * be more elegant than adding another parameter to
2987                          * lock_match.  I want a second opinion. */
2988                         osc_set_data_with_check(lockh, data, lflags);
2989                         ldlm_lock_addref(lockh, LCK_PR);
2990                         ldlm_lock_decref(lockh, LCK_PW);
2991                 }
2992         }
2993         RETURN(rc);
2994 }
2995
2996 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2997                       __u32 mode, struct lustre_handle *lockh)
2998 {
2999         ENTRY;
3000
3001         if (unlikely(mode == LCK_GROUP))
3002                 ldlm_lock_decref_and_cancel(lockh, mode);
3003         else
3004                 ldlm_lock_decref(lockh, mode);
3005
3006         RETURN(0);
3007 }
3008
3009 static int osc_cancel_unused(struct obd_export *exp,
3010                              struct lov_stripe_md *lsm, int flags, void *opaque)
3011 {
3012         struct obd_device *obd = class_exp2obd(exp);
3013         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3014
3015         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
3016                                       opaque);
3017 }
3018
3019 static int osc_join_lru(struct obd_export *exp,
3020                         struct lov_stripe_md *lsm, int join)
3021 {
3022         struct obd_device *obd = class_exp2obd(exp);
3023         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3024
3025         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3026 }
3027
3028 static int osc_statfs_interpret(struct ptlrpc_request *req,
3029                                 struct osc_async_args *aa, int rc)
3030 {
3031         struct obd_statfs *msfs;
3032         ENTRY;
3033
3034         if (rc != 0)
3035                 GOTO(out, rc);
3036
3037         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3038                                   lustre_swab_obd_statfs);
3039         if (msfs == NULL) {
3040                 CERROR("Can't unpack obd_statfs\n");
3041                 GOTO(out, rc = -EPROTO);
3042         }
3043
3044         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3045 out:
3046         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3047         RETURN(rc);
3048 }
3049
3050 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3051                             __u64 max_age, struct ptlrpc_request_set *rqset)
3052 {
3053         struct ptlrpc_request *req;
3054         struct osc_async_args *aa;
3055         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3056         ENTRY;
3057
3058         /* We could possibly pass max_age in the request (as an absolute
3059          * timestamp or a "seconds.usec ago") so the target can avoid doing
3060          * extra calls into the filesystem if that isn't necessary (e.g.
3061          * during mount that would help a bit).  Having relative timestamps
3062          * is not so great if request processing is slow, while absolute
3063          * timestamps are not ideal because they need time synchronization. */
3064         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3065                               OST_STATFS, 1, NULL, NULL);
3066         if (!req)
3067                 RETURN(-ENOMEM);
3068
3069         ptlrpc_req_set_repsize(req, 2, size);
3070         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3071
3072         req->rq_interpret_reply = osc_statfs_interpret;
3073         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3074         aa = (struct osc_async_args *)&req->rq_async_args;
3075         aa->aa_oi = oinfo;
3076
3077         ptlrpc_set_add_req(rqset, req);
3078         RETURN(0);
3079 }
3080
3081 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3082                       __u64 max_age)
3083 {
3084         struct obd_statfs *msfs;
3085         struct ptlrpc_request *req;
3086         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3087         ENTRY;
3088
3089         /* We could possibly pass max_age in the request (as an absolute
3090          * timestamp or a "seconds.usec ago") so the target can avoid doing
3091          * extra calls into the filesystem if that isn't necessary (e.g.
3092          * during mount that would help a bit).  Having relative timestamps
3093          * is not so great if request processing is slow, while absolute
3094          * timestamps are not ideal because they need time synchronization. */
3095         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3096                               OST_STATFS, 1, NULL, NULL);
3097         if (!req)
3098                 RETURN(-ENOMEM);
3099
3100         ptlrpc_req_set_repsize(req, 2, size);
3101         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3102
3103         rc = ptlrpc_queue_wait(req);
3104         if (rc)
3105                 GOTO(out, rc);
3106
3107         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3108                                   lustre_swab_obd_statfs);
3109         if (msfs == NULL) {
3110                 CERROR("Can't unpack obd_statfs\n");
3111                 GOTO(out, rc = -EPROTO);
3112         }
3113
3114         memcpy(osfs, msfs, sizeof(*osfs));
3115
3116         EXIT;
3117  out:
3118         ptlrpc_req_finished(req);
3119         return rc;
3120 }
3121
3122 /* Retrieve object striping information.
3123  *
3124  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3125  * the maximum number of OST indices which will fit in the user buffer.
3126  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3127  */
3128 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3129 {
3130         struct lov_user_md lum, *lumk;
3131         int rc = 0, lum_size;
3132         ENTRY;
3133
3134         if (!lsm)
3135                 RETURN(-ENODATA);
3136
3137         if (copy_from_user(&lum, lump, sizeof(lum)))
3138                 RETURN(-EFAULT);
3139
3140         if (lum.lmm_magic != LOV_USER_MAGIC)
3141                 RETURN(-EINVAL);
3142
3143         if (lum.lmm_stripe_count > 0) {
3144                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3145                 OBD_ALLOC(lumk, lum_size);
3146                 if (!lumk)
3147                         RETURN(-ENOMEM);
3148
3149                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3150         } else {
3151                 lum_size = sizeof(lum);
3152                 lumk = &lum;
3153         }
3154
3155         lumk->lmm_object_id = lsm->lsm_object_id;
3156         lumk->lmm_stripe_count = 1;
3157
3158         if (copy_to_user(lump, lumk, lum_size))
3159                 rc = -EFAULT;
3160
3161         if (lumk != &lum)
3162                 OBD_FREE(lumk, lum_size);
3163
3164         RETURN(rc);
3165 }
3166
3167
3168 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3169                          void *karg, void *uarg)
3170 {
3171         struct obd_device *obd = exp->exp_obd;
3172         struct obd_ioctl_data *data = karg;
3173         int err = 0;
3174         ENTRY;
3175
3176 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3177         MOD_INC_USE_COUNT;
3178 #else
3179         if (!try_module_get(THIS_MODULE)) {
3180                 CERROR("Can't get module. Is it alive?");
3181                 return -EINVAL;
3182         }
3183 #endif
3184         switch (cmd) {
3185         case OBD_IOC_LOV_GET_CONFIG: {
3186                 char *buf;
3187                 struct lov_desc *desc;
3188                 struct obd_uuid uuid;
3189
3190                 buf = NULL;
3191                 len = 0;
3192                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3193                         GOTO(out, err = -EINVAL);
3194
3195                 data = (struct obd_ioctl_data *)buf;
3196
3197                 if (sizeof(*desc) > data->ioc_inllen1) {
3198                         obd_ioctl_freedata(buf, len);
3199                         GOTO(out, err = -EINVAL);
3200                 }
3201
3202                 if (data->ioc_inllen2 < sizeof(uuid)) {
3203                         obd_ioctl_freedata(buf, len);
3204                         GOTO(out, err = -EINVAL);
3205                 }
3206
3207                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3208                 desc->ld_tgt_count = 1;
3209                 desc->ld_active_tgt_count = 1;
3210                 desc->ld_default_stripe_count = 1;
3211                 desc->ld_default_stripe_size = 0;
3212                 desc->ld_default_stripe_offset = 0;
3213                 desc->ld_pattern = 0;
3214                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3215
3216                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3217
3218                 err = copy_to_user((void *)uarg, buf, len);
3219                 if (err)
3220                         err = -EFAULT;
3221                 obd_ioctl_freedata(buf, len);
3222                 GOTO(out, err);
3223         }
3224         case LL_IOC_LOV_SETSTRIPE:
3225                 err = obd_alloc_memmd(exp, karg);
3226                 if (err > 0)
3227                         err = 0;
3228                 GOTO(out, err);
3229         case LL_IOC_LOV_GETSTRIPE:
3230                 err = osc_getstripe(karg, uarg);
3231                 GOTO(out, err);
3232         case OBD_IOC_CLIENT_RECOVER:
3233                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3234                                             data->ioc_inlbuf1);
3235                 if (err > 0)
3236                         err = 0;
3237                 GOTO(out, err);
3238         case IOC_OSC_SET_ACTIVE:
3239                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3240                                                data->ioc_offset);
3241                 GOTO(out, err);
3242         case OBD_IOC_POLL_QUOTACHECK:
3243                 err = lquota_poll_check(quota_interface, exp,
3244                                         (struct if_quotacheck *)karg);
3245                 GOTO(out, err);
3246         case OBD_IOC_DESTROY: {
3247                 struct obdo            *oa;
3248
3249                 if (!capable (CAP_SYS_ADMIN))
3250                         GOTO (out, err = -EPERM);
3251                 oa = &data->ioc_obdo1;
3252                 oa->o_valid |= OBD_MD_FLGROUP;
3253
3254                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3255                 GOTO(out, err);
3256         }
3257         default:
3258                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3259                        cmd, cfs_curproc_comm());
3260                 GOTO(out, err = -ENOTTY);
3261         }
3262 out:
3263 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3264         MOD_DEC_USE_COUNT;
3265 #else
3266         module_put(THIS_MODULE);
3267 #endif
3268         return err;
3269 }
3270
3271 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3272                         void *key, __u32 *vallen, void *val)
3273 {
3274         ENTRY;
3275         if (!vallen || !val)
3276                 RETURN(-EFAULT);
3277
3278         if (keylen > strlen("lock_to_stripe") &&
3279             strcmp(key, "lock_to_stripe") == 0) {
3280                 __u32 *stripe = val;
3281                 *vallen = sizeof(*stripe);
3282                 *stripe = 0;
3283                 RETURN(0);
3284         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3285                 struct ptlrpc_request *req;
3286                 obd_id *reply;
3287                 char *bufs[2] = { NULL, key };
3288                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3289
3290                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3291                                       OST_GET_INFO, 2, size, bufs);
3292                 if (req == NULL)
3293                         RETURN(-ENOMEM);
3294
3295                 size[REPLY_REC_OFF] = *vallen;
3296                 ptlrpc_req_set_repsize(req, 2, size);
3297                 rc = ptlrpc_queue_wait(req);
3298                 if (rc)
3299                         GOTO(out, rc);
3300
3301                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3302                                            lustre_swab_ost_last_id);
3303                 if (reply == NULL) {
3304                         CERROR("Can't unpack OST last ID\n");
3305                         GOTO(out, rc = -EPROTO);
3306                 }
3307                 *((obd_id *)val) = *reply;
3308         out:
3309                 ptlrpc_req_finished(req);
3310                 RETURN(rc);
3311         }
3312         RETURN(-EINVAL);
3313 }
3314
3315 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3316                                           void *aa, int rc)
3317 {
3318         struct llog_ctxt *ctxt;
3319         struct obd_import *imp = req->rq_import;
3320         ENTRY;
3321
3322         if (rc != 0)
3323                 RETURN(rc);
3324
3325         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3326         if (ctxt) {
3327                 if (rc == 0)
3328                         rc = llog_initiator_connect(ctxt);
3329                 else
3330                         CERROR("cannot establish connection for "
3331                                "ctxt %p: %d\n", ctxt, rc);
3332         }
3333
3334         llog_ctxt_put(ctxt);
3335         spin_lock(&imp->imp_lock);
3336         imp->imp_server_timeout = 1;
3337         imp->imp_pingable = 1;
3338         spin_unlock(&imp->imp_lock);
3339         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3340
3341         RETURN(rc);
3342 }
3343
3344 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3345                               void *key, obd_count vallen, void *val,
3346                               struct ptlrpc_request_set *set)
3347 {
3348         struct ptlrpc_request *req;
3349         struct obd_device  *obd = exp->exp_obd;
3350         struct obd_import *imp = class_exp2cliimp(exp);
3351         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3352         char *bufs[3] = { NULL, key, val };
3353         ENTRY;
3354
3355         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3356
3357         if (KEY_IS(KEY_NEXT_ID)) {
3358                 if (vallen != sizeof(obd_id))
3359                         RETURN(-EINVAL);
3360                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3361                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3362                        exp->exp_obd->obd_name,
3363                        obd->u.cli.cl_oscc.oscc_next_id);
3364
3365                 RETURN(0);
3366         }
3367
3368         if (KEY_IS("unlinked")) {
3369                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3370                 spin_lock(&oscc->oscc_lock);
3371                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3372                 spin_unlock(&oscc->oscc_lock);
3373                 RETURN(0);
3374         }
3375
3376         if (KEY_IS(KEY_INIT_RECOV)) {
3377                 if (vallen != sizeof(int))
3378                         RETURN(-EINVAL);
3379                 spin_lock(&imp->imp_lock);
3380                 imp->imp_initial_recov = *(int *)val;
3381                 spin_unlock(&imp->imp_lock);
3382                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3383                        exp->exp_obd->obd_name,
3384                        imp->imp_initial_recov);
3385                 RETURN(0);
3386         }
3387
3388         if (KEY_IS("checksum")) {
3389                 if (vallen != sizeof(int))
3390                         RETURN(-EINVAL);
3391                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3392                 RETURN(0);
3393         }
3394
3395         if (!set)
3396                 RETURN(-EINVAL);
3397
3398         /* We pass all other commands directly to OST. Since nobody calls osc
3399            methods directly and everybody is supposed to go through LOV, we
3400            assume lov checked invalid values for us.
3401            The only recognised values so far are evict_by_nid and mds_conn.
3402            Even if something bad goes through, we'd get a -EINVAL from OST
3403            anyway. */
3404
3405         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3406                               bufs);
3407         if (req == NULL)
3408                 RETURN(-ENOMEM);
3409
3410         if (KEY_IS(KEY_MDS_CONN))
3411                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3412
3413         ptlrpc_req_set_repsize(req, 1, NULL);
3414         ptlrpc_set_add_req(set, req);
3415         ptlrpc_check_set(set);
3416
3417         RETURN(0);
3418 }
3419
3420
3421 static struct llog_operations osc_size_repl_logops = {
3422         lop_cancel: llog_obd_repl_cancel
3423 };
3424
3425 static struct llog_operations osc_mds_ost_orig_logops;
3426 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3427                          int count, struct llog_catid *catid, 
3428                          struct obd_uuid *uuid)
3429 {
3430         int rc;
3431         ENTRY;
3432
3433         spin_lock(&obd->obd_dev_lock);
3434         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3435                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3436                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3437                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3438                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3439                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3440         }
3441         spin_unlock(&obd->obd_dev_lock);
3442
3443         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3444                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3445         if (rc) {
3446                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3447                 GOTO (out, rc);
3448         }
3449
3450         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3451                         &osc_size_repl_logops);
3452         if (rc) 
3453                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3454 out:
3455         if (rc) {
3456                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3457                        obd->obd_name, tgt->obd_name, count, catid, rc);
3458                 CERROR("logid "LPX64":0x%x\n",
3459                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3460         }
3461         RETURN(rc);
3462 }
3463
3464 static int osc_llog_finish(struct obd_device *obd, int count)
3465 {
3466         struct llog_ctxt *ctxt;
3467         int rc = 0, rc2 = 0;
3468         ENTRY;
3469
3470         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3471         if (ctxt)
3472                 rc = llog_cleanup(ctxt);
3473
3474         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3475         if (ctxt)
3476                 rc2 = llog_cleanup(ctxt);
3477         if (!rc)
3478                 rc = rc2;
3479
3480         RETURN(rc);
3481 }
3482
3483 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3484                          struct obd_uuid *cluuid,
3485                          struct obd_connect_data *data)
3486 {
3487         struct client_obd *cli = &obd->u.cli;
3488
3489         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3490                 long lost_grant;
3491
3492                 client_obd_list_lock(&cli->cl_loi_list_lock);
3493                 data->ocd_grant = cli->cl_avail_grant ?:
3494                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3495                 lost_grant = cli->cl_lost_grant;
3496                 cli->cl_lost_grant = 0;
3497                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3498
3499                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3500                        "cl_lost_grant: %ld\n", data->ocd_grant,
3501                        cli->cl_avail_grant, lost_grant);
3502                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3503                        " ocd_grant: %d\n", data->ocd_connect_flags,
3504                        data->ocd_version, data->ocd_grant);
3505         }
3506
3507         RETURN(0);
3508 }
3509
3510 static int osc_disconnect(struct obd_export *exp)
3511 {
3512         struct obd_device *obd = class_exp2obd(exp);
3513         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3514         int rc;
3515
3516         if (obd->u.cli.cl_conn_count == 1)
3517                 /* flush any remaining cancel messages out to the target */
3518                 llog_sync(ctxt, exp);
3519         
3520         llog_ctxt_put(ctxt);
3521
3522         rc = client_disconnect_export(exp);
3523         return rc;
3524 }
3525
3526 static int osc_import_event(struct obd_device *obd,
3527                             struct obd_import *imp,
3528                             enum obd_import_event event)
3529 {
3530         struct client_obd *cli;
3531         int rc = 0;
3532
3533         ENTRY;
3534         LASSERT(imp->imp_obd == obd);
3535
3536         switch (event) {
3537         case IMP_EVENT_DISCON: {
3538                 /* Only do this on the MDS OSC's */
3539                 if (imp->imp_server_timeout) {
3540                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3541
3542                         spin_lock(&oscc->oscc_lock);
3543                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3544                         spin_unlock(&oscc->oscc_lock);
3545                 }
3546                 cli = &obd->u.cli;
3547                 client_obd_list_lock(&cli->cl_loi_list_lock);
3548                 cli->cl_avail_grant = 0;
3549                 cli->cl_lost_grant = 0;
3550                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3551                 ptlrpc_import_setasync(imp, -1);
3552
3553                 break;
3554         }
3555         case IMP_EVENT_INACTIVE: {
3556                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3557                 break;
3558         }
3559         case IMP_EVENT_INVALIDATE: {
3560                 struct ldlm_namespace *ns = obd->obd_namespace;
3561
3562                 /* Reset grants */
3563                 cli = &obd->u.cli;
3564                 client_obd_list_lock(&cli->cl_loi_list_lock);
3565                 /* all pages go to failing rpcs due to the invalid import */
3566                 osc_check_rpcs(cli);
3567                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3568
3569                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3570
3571                 break;
3572         }
3573         case IMP_EVENT_ACTIVE: {
3574                 /* Only do this on the MDS OSC's */
3575                 if (imp->imp_server_timeout) {
3576                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3577
3578                         spin_lock(&oscc->oscc_lock);
3579                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3580                         spin_unlock(&oscc->oscc_lock);
3581                 }
3582                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3583                 break;
3584         }
3585         case IMP_EVENT_OCD: {
3586                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3587
3588                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3589                         osc_init_grant(&obd->u.cli, ocd);
3590
3591                 /* See bug 7198 */
3592                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3593                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3594
3595                 ptlrpc_import_setasync(imp, 1);
3596                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3597                 break;
3598         }
3599         default:
3600                 CERROR("Unknown import event %d\n", event);
3601                 LBUG();
3602         }
3603         RETURN(rc);
3604 }
3605
3606 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3607 {
3608         int rc;
3609         ENTRY;
3610
3611         ENTRY;
3612         rc = ptlrpcd_addref();
3613         if (rc)
3614                 RETURN(rc);
3615
3616         rc = client_obd_setup(obd, len, buf);
3617         if (rc) {
3618                 ptlrpcd_decref();
3619         } else {
3620                 struct lprocfs_static_vars lvars;
3621                 struct client_obd *cli = &obd->u.cli;
3622
3623                 lprocfs_init_vars(osc, &lvars);
3624                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3625                         lproc_osc_attach_seqstat(obd);
3626                         ptlrpc_lprocfs_register_obd(obd);
3627                 }
3628
3629                 oscc_init(obd);
3630                 /* We need to allocate a few requests more, because
3631                    brw_interpret_oap tries to create new requests before freeing
3632                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3633                    reserved, but I afraid that might be too much wasted RAM
3634                    in fact, so 2 is just my guess and still should work. */
3635                 cli->cl_import->imp_rq_pool =
3636                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3637                                             OST_MAXREQSIZE,
3638                                             ptlrpc_add_rqs_to_pool);
3639         }
3640
3641         RETURN(rc);
3642 }
3643
3644 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3645 {
3646         int rc = 0;
3647         ENTRY;
3648
3649         switch (stage) {
3650         case OBD_CLEANUP_EARLY: {
3651                 struct obd_import *imp;
3652                 imp = obd->u.cli.cl_import;
3653                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3654                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3655                 ptlrpc_deactivate_import(imp);
3656                 break;
3657         }
3658         case OBD_CLEANUP_EXPORTS: {
3659                 /* If we set up but never connected, the
3660                    client import will not have been cleaned. */
3661                 if (obd->u.cli.cl_import) {
3662                         struct obd_import *imp;
3663                         imp = obd->u.cli.cl_import;
3664                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3665                                obd->obd_name);
3666                         ptlrpc_invalidate_import(imp);
3667                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3668                         class_destroy_import(imp);
3669                         obd->u.cli.cl_import = NULL;
3670                 }
3671                 break;
3672         }
3673         case OBD_CLEANUP_SELF_EXP:
3674                 rc = obd_llog_finish(obd, 0);
3675                 if (rc != 0)
3676                         CERROR("failed to cleanup llogging subsystems\n");
3677                 break;
3678         case OBD_CLEANUP_OBD:
3679                 break;
3680         }
3681         RETURN(rc);
3682 }
3683
3684 int osc_cleanup(struct obd_device *obd)
3685 {
3686         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3687         int rc;
3688
3689         ENTRY;
3690         ptlrpc_lprocfs_unregister_obd(obd);
3691         lprocfs_obd_cleanup(obd);
3692
3693         spin_lock(&oscc->oscc_lock);
3694         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3695         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3696         spin_unlock(&oscc->oscc_lock);
3697
3698         /* free memory of osc quota cache */
3699         lquota_cleanup(quota_interface, obd);
3700
3701         rc = client_obd_cleanup(obd);
3702
3703         ptlrpcd_decref();
3704         RETURN(rc);
3705 }
3706
3707 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3708 {
3709         struct lustre_cfg *lcfg = buf;
3710         struct lprocfs_static_vars lvars;
3711         int rc = 0;
3712
3713         lprocfs_init_vars(osc, &lvars);
3714
3715         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3716         return(rc);
3717 }
3718
3719 struct obd_ops osc_obd_ops = {
3720         .o_owner                = THIS_MODULE,
3721         .o_setup                = osc_setup,
3722         .o_precleanup           = osc_precleanup,
3723         .o_cleanup              = osc_cleanup,
3724         .o_add_conn             = client_import_add_conn,
3725         .o_del_conn             = client_import_del_conn,
3726         .o_connect              = client_connect_import,
3727         .o_reconnect            = osc_reconnect,
3728         .o_disconnect           = osc_disconnect,
3729         .o_statfs               = osc_statfs,
3730         .o_statfs_async         = osc_statfs_async,
3731         .o_packmd               = osc_packmd,
3732         .o_unpackmd             = osc_unpackmd,
3733         .o_precreate            = osc_precreate,
3734         .o_create               = osc_create,
3735         .o_destroy              = osc_destroy,
3736         .o_getattr              = osc_getattr,
3737         .o_getattr_async        = osc_getattr_async,
3738         .o_setattr              = osc_setattr,
3739         .o_setattr_async        = osc_setattr_async,
3740         .o_brw                  = osc_brw,
3741         .o_brw_async            = osc_brw_async,
3742         .o_prep_async_page      = osc_prep_async_page,
3743         .o_queue_async_io       = osc_queue_async_io,
3744         .o_set_async_flags      = osc_set_async_flags,
3745         .o_queue_group_io       = osc_queue_group_io,
3746         .o_trigger_group_io     = osc_trigger_group_io,
3747         .o_teardown_async_page  = osc_teardown_async_page,
3748         .o_punch                = osc_punch,
3749         .o_sync                 = osc_sync,
3750         .o_enqueue              = osc_enqueue,
3751         .o_match                = osc_match,
3752         .o_change_cbdata        = osc_change_cbdata,
3753         .o_cancel               = osc_cancel,
3754         .o_cancel_unused        = osc_cancel_unused,
3755         .o_join_lru             = osc_join_lru,
3756         .o_iocontrol            = osc_iocontrol,
3757         .o_get_info             = osc_get_info,
3758         .o_set_info_async       = osc_set_info_async,
3759         .o_import_event         = osc_import_event,
3760         .o_llog_init            = osc_llog_init,
3761         .o_llog_finish          = osc_llog_finish,
3762         .o_process_config       = osc_process_config,
3763 };
3764 int __init osc_init(void)
3765 {
3766         struct lprocfs_static_vars lvars;
3767         int rc;
3768         ENTRY;
3769
3770         lprocfs_init_vars(osc, &lvars);
3771
3772         request_module("lquota");
3773         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3774         lquota_init(quota_interface);
3775         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3776
3777         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3778                                  LUSTRE_OSC_NAME);
3779         if (rc) {
3780                 if (quota_interface)
3781                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3782                 RETURN(rc);
3783         }
3784
3785         RETURN(rc);
3786 }
3787
3788 #ifdef __KERNEL__
3789 static void /*__exit*/ osc_exit(void)
3790 {
3791         lquota_exit(quota_interface);
3792         if (quota_interface)
3793                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3794
3795         class_unregister_type(LUSTRE_OSC_NAME);
3796 }
3797
3798 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3799 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3800 MODULE_LICENSE("GPL");
3801
3802 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3803 #endif