Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* by default 10s */
67 atomic_t osc_resend_time; 
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
183 }
184
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186                                  struct osc_async_args *aa, int rc)
187 {
188         struct ost_body *body;
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195                                   lustre_swab_ost_body);
196         if (body) {
197                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
199
200                 /* This should really be sent by the OST */
201                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
203         } else {
204                 CERROR("can't unpack ost_body\n");
205                 rc = -EPROTO;
206                 aa->aa_oi->oi_oa->o_valid = 0;
207         }
208 out:
209         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210         RETURN(rc);
211 }
212
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214                              struct ptlrpc_request_set *set)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         struct osc_async_args *aa;
220         ENTRY;
221
222         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224                               OST_GETATTR, 3, size,NULL);
225         if (!req)
226                 RETURN(-ENOMEM);
227
228         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
229
230         ptlrpc_req_set_repsize(req, 2, size);
231         req->rq_interpret_reply = osc_getattr_interpret;
232
233         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234         aa = (struct osc_async_args *)&req->rq_async_args;
235         aa->aa_oi = oinfo;
236
237         ptlrpc_set_add_req(set, req);
238         RETURN (0);
239 }
240
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
242 {
243         struct ptlrpc_request *req;
244         struct ost_body *body;
245         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
246         ENTRY;
247
248         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250                               OST_GETATTR, 3, size, NULL);
251         if (!req)
252                 RETURN(-ENOMEM);
253
254         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
255
256         ptlrpc_req_set_repsize(req, 2, size);
257
258         rc = ptlrpc_queue_wait(req);
259         if (rc) {
260                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261                 GOTO(out, rc);
262         }
263
264         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265                                   lustre_swab_ost_body);
266         if (body == NULL) {
267                 CERROR ("can't unpack ost_body\n");
268                 GOTO (out, rc = -EPROTO);
269         }
270
271         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272         *oinfo->oi_oa = body->oa;
273
274         /* This should really be sent by the OST */
275         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277
278         EXIT;
279  out:
280         ptlrpc_req_finished(req);
281         return rc;
282 }
283
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285                        struct obd_trans_info *oti)
286 {
287         struct ptlrpc_request *req;
288         struct ost_body *body;
289         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
290         ENTRY;
291
292         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293                                         oinfo->oi_oa->o_gr > 0);
294         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296                               OST_SETATTR, 3, size, NULL);
297         if (!req)
298                 RETURN(-ENOMEM);
299
300         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
301
302         ptlrpc_req_set_repsize(req, 2, size);
303
304         rc = ptlrpc_queue_wait(req);
305         if (rc)
306                 GOTO(out, rc);
307
308         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309                                   lustre_swab_ost_body);
310         if (body == NULL)
311                 GOTO(out, rc = -EPROTO);
312
313         *oinfo->oi_oa = body->oa;
314
315         EXIT;
316 out:
317         ptlrpc_req_finished(req);
318         RETURN(rc);
319 }
320
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322                                  struct osc_async_args *aa, int rc)
323 {
324         struct ost_body *body;
325         ENTRY;
326
327         if (rc != 0)
328                 GOTO(out, rc);
329
330         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331                                   lustre_swab_ost_body);
332         if (body == NULL) {
333                 CERROR("can't unpack ost_body\n");
334                 GOTO(out, rc = -EPROTO);
335         }
336
337         *aa->aa_oi->oi_oa = body->oa;
338 out:
339         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340         RETURN(rc);
341 }
342
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344                              struct obd_trans_info *oti,
345                              struct ptlrpc_request_set *rqset)
346 {
347         struct ptlrpc_request *req;
348         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349         struct osc_async_args *aa;
350         ENTRY;
351
352         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354                               OST_SETATTR, 3, size, NULL);
355         if (!req)
356                 RETURN(-ENOMEM);
357
358         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
360                 LASSERT(oti);
361                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
362         }
363
364         ptlrpc_req_set_repsize(req, 2, size);
365         /* do mds to ost setattr asynchronouly */
366         if (!rqset) {
367                 /* Do not wait for response. */
368                 ptlrpcd_add_req(req);
369         } else {
370                 req->rq_interpret_reply = osc_setattr_interpret;
371
372                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373                 aa = (struct osc_async_args *)&req->rq_async_args;
374                 aa->aa_oi = oinfo;
375
376                 ptlrpc_set_add_req(rqset, req);
377         }
378
379         RETURN(0);
380 }
381
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
384 {
385         struct ptlrpc_request *req;
386         struct ost_body *body;
387         struct lov_stripe_md *lsm;
388         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
389         ENTRY;
390
391         LASSERT(oa);
392         LASSERT(ea);
393
394         lsm = *ea;
395         if (!lsm) {
396                 rc = obd_alloc_memmd(exp, &lsm);
397                 if (rc < 0)
398                         RETURN(rc);
399         }
400
401         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402                               OST_CREATE, 2, size, NULL);
403         if (!req)
404                 GOTO(out, rc = -ENOMEM);
405
406         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
407         body->oa = *oa;
408
409         ptlrpc_req_set_repsize(req, 2, size);
410         if (oa->o_valid & OBD_MD_FLINLINE) {
411                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412                         oa->o_flags == OBD_FL_DELORPHAN);
413                 DEBUG_REQ(D_HA, req,
414                           "delorphan from OST integration");
415                 /* Don't resend the delorphan req */
416                 req->rq_no_resend = req->rq_no_delay = 1;
417         }
418
419         rc = ptlrpc_queue_wait(req);
420         if (rc)
421                 GOTO(out_req, rc);
422
423         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424                                   lustre_swab_ost_body);
425         if (body == NULL) {
426                 CERROR ("can't unpack ost_body\n");
427                 GOTO (out_req, rc = -EPROTO);
428         }
429
430         *oa = body->oa;
431
432         /* This should really be sent by the OST */
433         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434         oa->o_valid |= OBD_MD_FLBLKSZ;
435
436         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437          * have valid lsm_oinfo data structs, so don't go touching that.
438          * This needs to be fixed in a big way.
439          */
440         lsm->lsm_object_id = oa->o_id;
441         lsm->lsm_object_gr = oa->o_gr;
442         *ea = lsm;
443
444         if (oti != NULL) {
445                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
446
447                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448                         if (!oti->oti_logcookies)
449                                 oti_alloc_cookies(oti, 1);
450                         *oti->oti_logcookies = *obdo_logcookie(oa);
451                 }
452         }
453
454         CDEBUG(D_HA, "transno: "LPD64"\n",
455                lustre_msg_get_transno(req->rq_repmsg));
456         EXIT;
457 out_req:
458         ptlrpc_req_finished(req);
459 out:
460         if (rc && !*ea)
461                 obd_free_memmd(exp, &lsm);
462         return rc;
463 }
464
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466                                struct osc_async_args *aa, int rc)
467 {
468         struct ost_body *body;
469         ENTRY;
470
471         if (rc != 0)
472                 GOTO(out, rc);
473
474         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475                                   lustre_swab_ost_body);
476         if (body == NULL) {
477                 CERROR ("can't unpack ost_body\n");
478                 GOTO(out, rc = -EPROTO);
479         }
480
481         *aa->aa_oi->oi_oa = body->oa;
482 out:
483         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484         RETURN(rc);
485 }
486
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488                      struct obd_trans_info *oti,
489                      struct ptlrpc_request_set *rqset)
490 {
491         struct ptlrpc_request *req;
492         struct osc_async_args *aa;
493         struct ost_body *body;
494         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
495         ENTRY;
496
497         if (!oinfo->oi_oa) {
498                 CERROR("oa NULL\n");
499                 RETURN(-EINVAL);
500         }
501
502         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504                               OST_PUNCH, 3, size, NULL);
505         if (!req)
506                 RETURN(-ENOMEM);
507
508         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
509
510         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511         /* overload the size and blocks fields in the oa with start/end */
512         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513         body->oa.o_size = oinfo->oi_policy.l_extent.start;
514         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
516
517         ptlrpc_req_set_repsize(req, 2, size);
518
519         req->rq_interpret_reply = osc_punch_interpret;
520         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521         aa = (struct osc_async_args *)&req->rq_async_args;
522         aa->aa_oi = oinfo;
523         ptlrpc_set_add_req(rqset, req);
524
525         RETURN(0);
526 }
527
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529                     struct lov_stripe_md *md, obd_size start, obd_size end,
530                     void *capa)
531 {
532         struct ptlrpc_request *req;
533         struct ost_body *body;
534         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
535         ENTRY;
536
537         if (!oa) {
538                 CERROR("oa NULL\n");
539                 RETURN(-EINVAL);
540         }
541
542         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
543
544         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545                               OST_SYNC, 3, size, NULL);
546         if (!req)
547                 RETURN(-ENOMEM);
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
551         body->oa = *oa;
552         body->oa.o_size = start;
553         body->oa.o_blocks = end;
554         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
555
556         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
557
558         ptlrpc_req_set_repsize(req, 2, size);
559
560         rc = ptlrpc_queue_wait(req);
561         if (rc)
562                 GOTO(out, rc);
563
564         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565                                   lustre_swab_ost_body);
566         if (body == NULL) {
567                 CERROR ("can't unpack ost_body\n");
568                 GOTO (out, rc = -EPROTO);
569         }
570
571         *oa = body->oa;
572
573         EXIT;
574  out:
575         ptlrpc_req_finished(req);
576         return rc;
577 }
578
579 /* Find and cancel locally locks matched by @mode in the resource found by
580  * @objid. Found locks are added into @cancel list. Returns the amount of
581  * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583                                    struct list_head *cancels, ldlm_mode_t mode,
584                                    int lock_flags)
585 {
586         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
589         int count;
590         ENTRY;
591
592         if (res == NULL)
593                 RETURN(0);
594
595         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596                                            lock_flags, 0, NULL);
597         ldlm_resource_putref(res);
598         RETURN(count);
599 }
600
601 /* Destroy requests can be async always on the client, and we don't even really
602  * care about the return code since the client cannot do anything at all about
603  * a destroy failure.
604  * When the MDS is unlinking a filename, it saves the file objects into a
605  * recovery llog, and these object records are cancelled when the OST reports
606  * they were destroyed and sync'd to disk (i.e. transaction committed).
607  * If the client dies, or the OST is down when the object should be destroyed,
608  * the records are not cancelled, and when the OST reconnects to the MDS next,
609  * it will retrieve the llog unlink logs and then sends the log cancellation
610  * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
613                        struct obd_export *md_export)
614 {
615         CFS_LIST_HEAD(cancels);
616         struct ptlrpc_request *req;
617         struct ost_body *body;
618         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619         int count, bufcount = 2;
620         ENTRY;
621
622         if (!oa) {
623                 CERROR("oa NULL\n");
624                 RETURN(-EINVAL);
625         }
626
627         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628                                         LDLM_FL_DISCARD_DATA);
629         if (exp_connect_cancelset(exp) && count) {
630                 bufcount = 3;
631                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
632         }
633         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634                               OST_DESTROY, bufcount, size, NULL);
635         if (exp_connect_cancelset(exp) && req)
636                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
637         else
638                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
639
640         if (!req)
641                 RETURN(-ENOMEM);
642
643         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
644
645         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648                        sizeof(*oti->oti_logcookies));
649         body->oa = *oa;
650
651         ptlrpc_req_set_repsize(req, 2, size);
652
653         ptlrpcd_add_req(req);
654         RETURN(0);
655 }
656
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658                                 long writing_bytes)
659 {
660         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
661
662         LASSERT(!(oa->o_valid & bits));
663
664         oa->o_valid |= bits;
665         client_obd_list_lock(&cli->cl_loi_list_lock);
666         oa->o_dirty = cli->cl_dirty;
667         if (cli->cl_dirty > cli->cl_dirty_max) {
668                 CERROR("dirty %lu > dirty_max %lu\n",
669                        cli->cl_dirty, cli->cl_dirty_max);
670                 oa->o_undirty = 0;
671         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672                 CERROR("dirty %d > system dirty_max %d\n",
673                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
674                 oa->o_undirty = 0;
675         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676                 CERROR("dirty %lu - dirty_max %lu too big???\n",
677                        cli->cl_dirty, cli->cl_dirty_max);
678                 oa->o_undirty = 0;
679         } else {
680                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681                                 (cli->cl_max_rpcs_in_flight + 1);
682                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
683         }
684         oa->o_grant = cli->cl_avail_grant;
685         oa->o_dropped = cli->cl_lost_grant;
686         cli->cl_lost_grant = 0;
687         client_obd_list_unlock(&cli->cl_loi_list_lock);
688         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
690 }
691
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694                                     struct brw_page *pga)
695 {
696         atomic_inc(&obd_dirty_pages);
697         cli->cl_dirty += CFS_PAGE_SIZE;
698         cli->cl_avail_grant -= CFS_PAGE_SIZE;
699         pga->flag |= OBD_BRW_FROM_GRANT;
700         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701                CFS_PAGE_SIZE, pga, pga->pg);
702         LASSERT(cli->cl_avail_grant >= 0);
703 }
704
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706  * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708                                     struct brw_page *pga, int sent)
709 {
710         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
711         ENTRY;
712
713         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
714                 EXIT;
715                 return;
716         }
717
718         pga->flag &= ~OBD_BRW_FROM_GRANT;
719         atomic_dec(&obd_dirty_pages);
720         cli->cl_dirty -= CFS_PAGE_SIZE;
721         if (!sent) {
722                 cli->cl_lost_grant += CFS_PAGE_SIZE;
723                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726                 /* For short writes we shouldn't count parts of pages that
727                  * span a whole block on the OST side, or our accounting goes
728                  * wrong.  Should match the code in filter_grant_check. */
729                 int offset = pga->off & ~CFS_PAGE_MASK;
730                 int count = pga->count + (offset & (blocksize - 1));
731                 int end = (offset + pga->count) & (blocksize - 1);
732                 if (end)
733                         count += blocksize - end;
734
735                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738                        cli->cl_avail_grant, cli->cl_dirty);
739         }
740
741         EXIT;
742 }
743
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
745 {
746         return cli->cl_r_in_flight + cli->cl_w_in_flight;
747 }
748
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
751 {
752         struct list_head *l, *tmp;
753         struct osc_cache_waiter *ocw;
754
755         ENTRY;
756         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757                 /* if we can't dirty more, we must wait until some is written */
758                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761                                "osc max %ld, sys max %d\n", cli->cl_dirty,
762                                cli->cl_dirty_max, obd_max_dirty_pages);
763                         return;
764                 }
765
766                 /* if still dirty cache but no grant wait for pending RPCs that
767                  * may yet return us some grant before doing sync writes */
768                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770                                cli->cl_w_in_flight);
771                         return;
772                 }
773
774                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775                 list_del_init(&ocw->ocw_entry);
776                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777                         /* no more RPCs in flight to return grant, do sync IO */
778                         ocw->ocw_rc = -EDQUOT;
779                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
780                 } else {
781                         osc_consume_write_grant(cli,
782                                                 &ocw->ocw_oap->oap_brw_page);
783                 }
784
785                 cfs_waitq_signal(&ocw->ocw_waitq);
786         }
787
788         EXIT;
789 }
790
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
792 {
793         client_obd_list_lock(&cli->cl_loi_list_lock);
794         cli->cl_avail_grant = ocd->ocd_grant;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796
797         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798                cli->cl_avail_grant, cli->cl_lost_grant);
799         LASSERT(cli->cl_avail_grant >= 0);
800 }
801
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
803 {
804         client_obd_list_lock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806         if (body->oa.o_valid & OBD_MD_FLGRANT)
807                 cli->cl_avail_grant += body->oa.o_grant;
808         /* waiters are woken in brw_interpret_oap */
809         client_obd_list_unlock(&cli->cl_loi_list_lock);
810 }
811
812 /* We assume that the reason this OSC got a short read is because it read
813  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
814  * via the LOV, and it _knows_ it's reading inside the file, it's just that
815  * this stripe never got written at or beyond this stripe offset yet. */
816 static void handle_short_read(int nob_read, obd_count page_count,
817                               struct brw_page **pga)
818 {
819         char *ptr;
820         int i = 0;
821
822         /* skip bytes read OK */
823         while (nob_read > 0) {
824                 LASSERT (page_count > 0);
825
826                 if (pga[i]->count > nob_read) {
827                         /* EOF inside this page */
828                         ptr = cfs_kmap(pga[i]->pg) +
829                                 (pga[i]->off & ~CFS_PAGE_MASK);
830                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
831                         cfs_kunmap(pga[i]->pg);
832                         page_count--;
833                         i++;
834                         break;
835                 }
836
837                 nob_read -= pga[i]->count;
838                 page_count--;
839                 i++;
840         }
841
842         /* zero remaining pages */
843         while (page_count-- > 0) {
844                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
845                 memset(ptr, 0, pga[i]->count);
846                 cfs_kunmap(pga[i]->pg);
847                 i++;
848         }
849 }
850
851 static int check_write_rcs(struct ptlrpc_request *req,
852                            int requested_nob, int niocount,
853                            obd_count page_count, struct brw_page **pga)
854 {
855         int    *remote_rcs, i;
856
857         /* return error if any niobuf was in error */
858         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
859                                         sizeof(*remote_rcs) * niocount, NULL);
860         if (remote_rcs == NULL) {
861                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
862                 return(-EPROTO);
863         }
864         if (lustre_msg_swabbed(req->rq_repmsg))
865                 for (i = 0; i < niocount; i++)
866                         __swab32s(&remote_rcs[i]);
867
868         for (i = 0; i < niocount; i++) {
869                 if (remote_rcs[i] < 0)
870                         return(remote_rcs[i]);
871
872                 if (remote_rcs[i] != 0) {
873                         CERROR("rc[%d] invalid (%d) req %p\n",
874                                 i, remote_rcs[i], req);
875                         return(-EPROTO);
876                 }
877         }
878
879         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881                        requested_nob, req->rq_bulk->bd_nob_transferred);
882                 return(-EPROTO);
883         }
884
885         return (0);
886 }
887
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 {
890         if (p1->flag != p2->flag) {
891                 unsigned mask = ~OBD_BRW_FROM_GRANT;
892
893                 /* warn if we try to combine flags that we don't know to be
894                  * safe to combine */
895                 if ((p1->flag & mask) != (p2->flag & mask))
896                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
897                                "same brw?\n", p1->flag, p2->flag);
898                 return 0;
899         }
900
901         return (p1->off + p1->count == p2->off);
902 }
903
904 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
905                                    struct brw_page **pga, int opc)
906 {
907         __u32 cksum = ~0;
908         int i = 0;
909
910         LASSERT (pg_count > 0);
911         while (nob > 0 && pg_count > 0) {
912                 char *ptr = cfs_kmap(pga[i]->pg);
913                 int off = pga[i]->off & ~CFS_PAGE_MASK;
914                 int count = pga[i]->count > nob ? nob : pga[i]->count;
915
916                 /* corrupt the data before we compute the checksum, to
917                  * simulate an OST->client data error */
918                 if (i == 0 && opc == OST_READ &&
919                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
920                         memcpy(ptr + off, "bad1", min(4, nob));
921                 cksum = crc32_le(cksum, ptr + off, count);
922                 cfs_kunmap(pga[i]->pg);
923                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
924                                off, cksum);
925
926                 nob -= pga[i]->count;
927                 pg_count--;
928                 i++;
929         }
930         /* For sending we only compute the wrong checksum instead
931          * of corrupting the data so it is still correct on a redo */
932         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
933                 cksum++;
934
935         return cksum;
936 }
937
938 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
939                                 struct lov_stripe_md *lsm, obd_count page_count,
940                                 struct brw_page **pga, 
941                                 struct ptlrpc_request **reqp,
942                                 struct obd_capa *ocapa)
943 {
944         struct ptlrpc_request   *req;
945         struct ptlrpc_bulk_desc *desc;
946         struct ost_body         *body;
947         struct obd_ioobj        *ioobj;
948         struct niobuf_remote    *niobuf;
949         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
950         int niocount, i, requested_nob, opc, rc;
951         struct ptlrpc_request_pool *pool;
952         struct lustre_capa      *capa;
953         struct osc_brw_async_args *aa;
954
955         ENTRY;
956         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
957         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
958
959         if ((cmd & OBD_BRW_WRITE) != 0) {
960                 opc = OST_WRITE;
961                 pool = cli->cl_import->imp_rq_pool;
962         } else {
963                 opc = OST_READ;
964                 pool = NULL;
965         }
966
967         for (niocount = i = 1; i < page_count; i++) {
968                 if (!can_merge_pages(pga[i - 1], pga[i]))
969                         niocount++;
970         }
971
972         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
973         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
974         if (ocapa)
975                 size[REQ_REC_OFF + 3] = sizeof(*capa);
976
977         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
978                                    size, NULL, pool, NULL);
979         if (req == NULL)
980                 RETURN (-ENOMEM);
981
982         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
983
984         if (opc == OST_WRITE)
985                 desc = ptlrpc_prep_bulk_imp (req, page_count,
986                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
987         else
988                 desc = ptlrpc_prep_bulk_imp (req, page_count,
989                                              BULK_PUT_SINK, OST_BULK_PORTAL);
990         if (desc == NULL)
991                 GOTO(out, rc = -ENOMEM);
992         /* NB request now owns desc and will free it when it gets freed */
993
994         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
995         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
996         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
997                                 niocount * sizeof(*niobuf));
998
999         body->oa = *oa;
1000
1001         obdo_to_ioobj(oa, ioobj);
1002         ioobj->ioo_bufcnt = niocount;
1003         if (ocapa) {
1004                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1005                                       sizeof(*capa));
1006                 capa_cpy(capa, ocapa);
1007                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1008         }
1009
1010         LASSERT (page_count > 0);
1011         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1012                 struct brw_page *pg = pga[i];
1013                 struct brw_page *pg_prev = pga[i - 1];
1014
1015                 LASSERT(pg->count > 0);
1016                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1017                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1018                          pg->off, pg->count);
1019 #ifdef __LINUX__
1020                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1021                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1022                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1023                          i, page_count,
1024                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1025                          pg_prev->pg, page_private(pg_prev->pg),
1026                          pg_prev->pg->index, pg_prev->off);
1027 #else
1028                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1029                          "i %d p_c %u\n", i, page_count);
1030 #endif
1031                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1032                         (pg->flag & OBD_BRW_SRVLOCK));
1033
1034                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1035                                       pg->count);
1036                 requested_nob += pg->count;
1037
1038                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1039                         niobuf--;
1040                         niobuf->len += pg->count;
1041                 } else {
1042                         niobuf->offset = pg->off;
1043                         niobuf->len    = pg->count;
1044                         niobuf->flags  = pg->flag;
1045                 }
1046         }
1047
1048         LASSERT((void *)(niobuf - niocount) ==
1049                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1050                                niocount * sizeof(*niobuf)));
1051         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1052
1053         /* size[REQ_REC_OFF] still sizeof (*body) */
1054         if (opc == OST_WRITE) {
1055                 if (unlikely(cli->cl_checksum)) {
1056                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1057                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1058                                                              page_count, pga,
1059                                                              OST_WRITE);
1060                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1061                                body->oa.o_cksum);
1062                         /* save this in 'oa', too, for later checking */
1063                         oa->o_valid |= OBD_MD_FLCKSUM;
1064                 } else {
1065                         /* clear out the checksum flag, in case this is a
1066                          * resend but cl_checksum is no longer set. b=11238 */
1067                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1068                 }
1069                 oa->o_cksum = body->oa.o_cksum;
1070                 /* 1 RC per niobuf */
1071                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1072                 ptlrpc_req_set_repsize(req, 3, size);
1073         } else {
1074                 if (unlikely(cli->cl_checksum))
1075                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1076                 /* 1 RC for the whole I/O */
1077                 ptlrpc_req_set_repsize(req, 2, size);
1078         }
1079
1080         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1081         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1082         aa->aa_oa = oa;
1083         aa->aa_requested_nob = requested_nob;
1084         aa->aa_nio_count = niocount;
1085         aa->aa_page_count = page_count;
1086         aa->aa_resends = 0;
1087         aa->aa_ppga = pga;
1088         aa->aa_cli = cli;
1089         INIT_LIST_HEAD(&aa->aa_oaps);
1090
1091         *reqp = req;
1092         RETURN (0);
1093
1094  out:
1095         ptlrpc_req_finished (req);
1096         RETURN (rc);
1097 }
1098
1099 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1100                                 __u32 client_cksum, __u32 server_cksum,
1101                                 int nob, obd_count page_count,
1102                                 struct brw_page **pga)
1103 {
1104         __u32 new_cksum;
1105         char *msg;
1106
1107         if (server_cksum == client_cksum) {
1108                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1109                 return 0;
1110         }
1111
1112         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1113
1114         if (new_cksum == server_cksum)
1115                 msg = "changed on the client after we checksummed it - "
1116                       "likely false positive due to mmap IO (bug 11742)";
1117         else if (new_cksum == client_cksum)
1118                 msg = "changed in transit before arrival at OST";
1119         else
1120                 msg = "changed in transit AND doesn't match the original - "
1121                       "likely false positive due to mmap IO (bug 11742)";
1122
1123         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1124                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1125                            "["LPU64"-"LPU64"]\n",
1126                            msg, libcfs_nid2str(peer->nid),
1127                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1128                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1129                                                         (__u64)0,
1130                            oa->o_id,
1131                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1132                            pga[0]->off,
1133                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1134         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1135                client_cksum, server_cksum, new_cksum);
1136         return 1;        
1137 }
1138
1139 /* Note rc enters this function as number of bytes transferred */
1140 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1141 {
1142         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1143         const lnet_process_id_t *peer =
1144                         &req->rq_import->imp_connection->c_peer;
1145         struct client_obd *cli = aa->aa_cli;
1146         struct ost_body *body;
1147         __u32 client_cksum = 0;
1148         ENTRY;
1149
1150         if (rc < 0 && rc != -EDQUOT)
1151                 RETURN(rc);
1152
1153         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1154         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1155                                   lustre_swab_ost_body);
1156         if (body == NULL) {
1157                 CERROR ("Can't unpack body\n");
1158                 RETURN(-EPROTO);
1159         }
1160
1161         /* set/clear over quota flag for a uid/gid */
1162         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1163             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1164                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1165                              body->oa.o_gid, body->oa.o_valid,
1166                              body->oa.o_flags);
1167
1168         if (rc < 0)
1169                 RETURN(rc);
1170
1171         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1172                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1173
1174         osc_update_grant(cli, body);
1175
1176         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1177                 if (rc > 0) {
1178                         CERROR ("Unexpected +ve rc %d\n", rc);
1179                         RETURN(-EPROTO);
1180                 }
1181                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1182
1183                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1184                              client_cksum &&
1185                              check_write_checksum(&body->oa, peer, client_cksum,
1186                                                   body->oa.o_cksum,
1187                                                   aa->aa_requested_nob,
1188                                                   aa->aa_page_count,
1189                                                   aa->aa_ppga)))
1190                         RETURN(-EAGAIN);
1191
1192                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1193                         RETURN(-EAGAIN);
1194
1195                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1196                                      aa->aa_page_count, aa->aa_ppga);
1197                 GOTO(out, rc);
1198         }
1199
1200         /* The rest of this function executes only for OST_READs */
1201         if (rc > aa->aa_requested_nob) {
1202                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1203                        aa->aa_requested_nob);
1204                 RETURN(-EPROTO);
1205         }
1206
1207         if (rc != req->rq_bulk->bd_nob_transferred) {
1208                 CERROR ("Unexpected rc %d (%d transferred)\n",
1209                         rc, req->rq_bulk->bd_nob_transferred);
1210                 return (-EPROTO);
1211         }
1212
1213         if (rc < aa->aa_requested_nob)
1214                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1215
1216         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1217                                          aa->aa_ppga))
1218                 GOTO(out, rc = -EAGAIN);
1219
1220         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1221                 static int cksum_counter;
1222                 __u32      server_cksum = body->oa.o_cksum;
1223                 char      *via;
1224                 char      *router;
1225
1226                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1227                                                  aa->aa_ppga, OST_READ);
1228
1229                 if (peer->nid == req->rq_bulk->bd_sender) {
1230                         via = router = "";
1231                 } else {
1232                         via = " via ";
1233                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1234                 }
1235
1236                 if (server_cksum == ~0 && rc > 0) {
1237                         CERROR("Protocol error: server %s set the 'checksum' "
1238                                "bit, but didn't send a checksum.  Not fatal, "
1239                                "but please tell CFS.\n",
1240                                libcfs_nid2str(peer->nid));
1241                 } else if (server_cksum != client_cksum) {
1242                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1243                                            "%s%s%s inum "LPU64"/"LPU64" object "
1244                                            LPU64"/"LPU64" extent "
1245                                            "["LPU64"-"LPU64"]\n",
1246                                            req->rq_import->imp_obd->obd_name,
1247                                            libcfs_nid2str(peer->nid),
1248                                            via, router,
1249                                            body->oa.o_valid & OBD_MD_FLFID ?
1250                                                 body->oa.o_fid : (__u64)0,
1251                                            body->oa.o_valid & OBD_MD_FLFID ?
1252                                                 body->oa.o_generation :(__u64)0,
1253                                            body->oa.o_id,
1254                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1255                                                 body->oa.o_gr : (__u64)0,
1256                                            aa->aa_ppga[0]->off,
1257                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1258                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1259                                                                         1);
1260                         CERROR("client %x, server %x\n",
1261                                client_cksum, server_cksum);
1262                         cksum_counter = 0;
1263                         aa->aa_oa->o_cksum = client_cksum;
1264                         rc = -EAGAIN;
1265                 } else {
1266                         cksum_counter++;
1267                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1268                         rc = 0;
1269                 }
1270         } else if (unlikely(client_cksum)) {
1271                 static int cksum_missed;
1272
1273                 cksum_missed++;
1274                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1275                         CERROR("Checksum %u requested from %s but not sent\n",
1276                                cksum_missed, libcfs_nid2str(peer->nid));
1277         } else {
1278                 rc = 0;
1279         }
1280 out:
1281         if (rc >= 0)
1282                 *aa->aa_oa = body->oa;
1283
1284         RETURN(rc);
1285 }
1286
1287 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1288                             struct lov_stripe_md *lsm,
1289                             obd_count page_count, struct brw_page **pga,
1290                             struct obd_capa *ocapa)
1291 {
1292         struct ptlrpc_request *req;
1293         int                    rc;
1294         cfs_waitq_t            waitq;
1295         int                    resends = 0;
1296         struct l_wait_info     lwi;
1297
1298         ENTRY;
1299
1300         cfs_waitq_init(&waitq);
1301
1302 restart_bulk:
1303         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1304                                   page_count, pga, &req, ocapa);
1305         if (rc != 0)
1306                 return (rc);
1307
1308         rc = ptlrpc_queue_wait(req);
1309
1310         if (rc == -ETIMEDOUT && req->rq_resend) {
1311                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1312                 ptlrpc_req_finished(req);
1313                 goto restart_bulk;
1314         }
1315
1316         rc = osc_brw_fini_request(req, rc);
1317
1318         ptlrpc_req_finished(req);
1319         if (osc_recoverable_error(rc)) {
1320                 resends++;
1321                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1322                         CERROR("too many resend retries, returning error\n");
1323                         RETURN(-EIO);
1324                 }
1325
1326                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1327                 l_wait_event(waitq, 0, &lwi);
1328
1329                 goto restart_bulk;
1330         }
1331         
1332         RETURN (rc);
1333 }
1334
1335 int osc_brw_redo_request(struct ptlrpc_request *request,
1336                          struct osc_brw_async_args *aa)
1337 {
1338         struct ptlrpc_request *new_req;
1339         struct ptlrpc_request_set *set = request->rq_set;
1340         struct osc_brw_async_args *new_aa;
1341         struct osc_async_page *oap;
1342         int rc = 0;
1343         ENTRY;
1344
1345         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1346                 CERROR("too many resend retries, returning error\n");
1347                 RETURN(-EIO);
1348         }
1349         
1350         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1351 /*
1352         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1353         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1354                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1355                                            REQ_REC_OFF + 3);
1356 */
1357         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1358                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1359                                   aa->aa_cli, aa->aa_oa,
1360                                   NULL /* lsm unused by osc currently */,
1361                                   aa->aa_page_count, aa->aa_ppga, 
1362                                   &new_req, NULL /* ocapa */);
1363         if (rc)
1364                 RETURN(rc);
1365
1366         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1367    
1368         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1369                 if (oap->oap_request != NULL) {
1370                         LASSERTF(request == oap->oap_request,
1371                                  "request %p != oap_request %p\n",
1372                                  request, oap->oap_request);
1373                         if (oap->oap_interrupted) {
1374                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1375                                 ptlrpc_req_finished(new_req);                        
1376                                 RETURN(-EINTR);
1377                         }
1378                 }
1379         }
1380         /* New request takes over pga and oaps from old request.
1381          * Note that copying a list_head doesn't work, need to move it... */
1382         aa->aa_resends++;
1383         new_req->rq_interpret_reply = request->rq_interpret_reply;
1384         new_req->rq_async_args = request->rq_async_args;
1385         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1386
1387         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1388
1389         INIT_LIST_HEAD(&new_aa->aa_oaps);
1390         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1391         INIT_LIST_HEAD(&aa->aa_oaps);
1392
1393         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1394                 if (oap->oap_request) {
1395                         ptlrpc_req_finished(oap->oap_request);
1396                         oap->oap_request = ptlrpc_request_addref(new_req);
1397                 }
1398         }
1399         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1400
1401         DEBUG_REQ(D_INFO, new_req, "new request");
1402
1403         ptlrpc_set_add_req(set, new_req);
1404
1405         RETURN(0);
1406 }
1407
1408 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1409 {
1410         struct osc_brw_async_args *aa = data;
1411         int                        i;
1412         int                        nob = rc;
1413         ENTRY;
1414
1415         rc = osc_brw_fini_request(req, rc);
1416         if (osc_recoverable_error(rc)) {
1417                 rc = osc_brw_redo_request(req, aa);
1418                 if (rc == 0)
1419                         RETURN(0);
1420         }
1421         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1422                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1423
1424         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1425         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1426                 aa->aa_cli->cl_w_in_flight--;
1427         else
1428                 aa->aa_cli->cl_r_in_flight--;
1429         for (i = 0; i < aa->aa_page_count; i++)
1430                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1431         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1432
1433         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1434
1435         RETURN(rc);
1436 }
1437
1438 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1439                           struct lov_stripe_md *lsm, obd_count page_count,
1440                           struct brw_page **pga, struct ptlrpc_request_set *set,
1441                           struct obd_capa *ocapa)
1442 {
1443         struct ptlrpc_request     *req;
1444         struct client_obd         *cli = &exp->exp_obd->u.cli;
1445         int                        rc, i;
1446         struct osc_brw_async_args *aa;
1447         ENTRY;
1448
1449         /* Consume write credits even if doing a sync write -
1450          * otherwise we may run out of space on OST due to grant. */
1451         if (cmd == OBD_BRW_WRITE) {
1452                 spin_lock(&cli->cl_loi_list_lock);
1453                 for (i = 0; i < page_count; i++) {
1454                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1455                                 osc_consume_write_grant(cli, pga[i]);
1456                 }
1457                 spin_unlock(&cli->cl_loi_list_lock);
1458         }
1459
1460         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1461                                   &req, ocapa);
1462
1463         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1464         if (cmd == OBD_BRW_READ) {
1465                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1466                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1467                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1468         } else {
1469                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1470                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1471                                  cli->cl_w_in_flight);
1472                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1473         }
1474
1475         if (rc == 0) {
1476                 req->rq_interpret_reply = brw_interpret;
1477                 ptlrpc_set_add_req(set, req);
1478                 client_obd_list_lock(&cli->cl_loi_list_lock);
1479                 if (cmd == OBD_BRW_READ)
1480                         cli->cl_r_in_flight++;
1481                 else
1482                         cli->cl_w_in_flight++;
1483                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1484         } else if (cmd == OBD_BRW_WRITE) {
1485                 client_obd_list_lock(&cli->cl_loi_list_lock);
1486                 for (i = 0; i < page_count; i++)
1487                         osc_release_write_grant(cli, pga[i], 0);
1488                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1489         }
1490         RETURN (rc);
1491 }
1492
1493 /*
1494  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1495  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1496  * fine for our small page arrays and doesn't require allocation.  its an
1497  * insertion sort that swaps elements that are strides apart, shrinking the
1498  * stride down until its '1' and the array is sorted.
1499  */
1500 static void sort_brw_pages(struct brw_page **array, int num)
1501 {
1502         int stride, i, j;
1503         struct brw_page *tmp;
1504
1505         if (num == 1)
1506                 return;
1507         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1508                 ;
1509
1510         do {
1511                 stride /= 3;
1512                 for (i = stride ; i < num ; i++) {
1513                         tmp = array[i];
1514                         j = i;
1515                         while (j >= stride && array[j - stride]->off > tmp->off) {
1516                                 array[j] = array[j - stride];
1517                                 j -= stride;
1518                         }
1519                         array[j] = tmp;
1520                 }
1521         } while (stride > 1);
1522 }
1523
1524 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1525 {
1526         int count = 1;
1527         int offset;
1528         int i = 0;
1529
1530         LASSERT (pages > 0);
1531         offset = pg[i]->off & ~CFS_PAGE_MASK;
1532
1533         for (;;) {
1534                 pages--;
1535                 if (pages == 0)         /* that's all */
1536                         return count;
1537
1538                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1539                         return count;   /* doesn't end on page boundary */
1540
1541                 i++;
1542                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1543                 if (offset != 0)        /* doesn't start on page boundary */
1544                         return count;
1545
1546                 count++;
1547         }
1548 }
1549
1550 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1551 {
1552         struct brw_page **ppga;
1553         int i;
1554
1555         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1556         if (ppga == NULL)
1557                 return NULL;
1558
1559         for (i = 0; i < count; i++)
1560                 ppga[i] = pga + i;
1561         return ppga;
1562 }
1563
1564 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1565 {
1566         LASSERT(ppga != NULL);
1567         OBD_FREE(ppga, sizeof(*ppga) * count);
1568 }
1569
1570 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1571                    obd_count page_count, struct brw_page *pga,
1572                    struct obd_trans_info *oti)
1573 {
1574         struct obdo *saved_oa = NULL;
1575         struct brw_page **ppga, **orig;
1576         struct obd_import *imp = class_exp2cliimp(exp);
1577         struct client_obd *cli = &imp->imp_obd->u.cli;
1578         int rc, page_count_orig;
1579         ENTRY;
1580
1581         if (cmd & OBD_BRW_CHECK) {
1582                 /* The caller just wants to know if there's a chance that this
1583                  * I/O can succeed */
1584
1585                 if (imp == NULL || imp->imp_invalid)
1586                         RETURN(-EIO);
1587                 RETURN(0);
1588         }
1589
1590         /* test_brw with a failed create can trip this, maybe others. */
1591         LASSERT(cli->cl_max_pages_per_rpc);
1592
1593         rc = 0;
1594
1595         orig = ppga = osc_build_ppga(pga, page_count);
1596         if (ppga == NULL)
1597                 RETURN(-ENOMEM);
1598         page_count_orig = page_count;
1599
1600         sort_brw_pages(ppga, page_count);
1601         while (page_count) {
1602                 obd_count pages_per_brw;
1603
1604                 if (page_count > cli->cl_max_pages_per_rpc)
1605                         pages_per_brw = cli->cl_max_pages_per_rpc;
1606                 else
1607                         pages_per_brw = page_count;
1608
1609                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1610
1611                 if (saved_oa != NULL) {
1612                         /* restore previously saved oa */
1613                         *oinfo->oi_oa = *saved_oa;
1614                 } else if (page_count > pages_per_brw) {
1615                         /* save a copy of oa (brw will clobber it) */
1616                         OBDO_ALLOC(saved_oa);
1617                         if (saved_oa == NULL)
1618                                 GOTO(out, rc = -ENOMEM);
1619                         *saved_oa = *oinfo->oi_oa;
1620                 }
1621
1622                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1623                                       pages_per_brw, ppga, oinfo->oi_capa);
1624
1625                 if (rc != 0)
1626                         break;
1627
1628                 page_count -= pages_per_brw;
1629                 ppga += pages_per_brw;
1630         }
1631
1632 out:
1633         osc_release_ppga(orig, page_count_orig);
1634
1635         if (saved_oa != NULL)
1636                 OBDO_FREE(saved_oa);
1637
1638         RETURN(rc);
1639 }
1640
1641 static int osc_brw_async(int cmd, struct obd_export *exp,
1642                          struct obd_info *oinfo, obd_count page_count,
1643                          struct brw_page *pga, struct obd_trans_info *oti,
1644                          struct ptlrpc_request_set *set)
1645 {
1646         struct brw_page **ppga, **orig;
1647         struct client_obd *cli = &exp->exp_obd->u.cli;
1648         int page_count_orig;
1649         int rc = 0;
1650         ENTRY;
1651
1652         if (cmd & OBD_BRW_CHECK) {
1653                 struct obd_import *imp = class_exp2cliimp(exp);
1654                 /* The caller just wants to know if there's a chance that this
1655                  * I/O can succeed */
1656
1657                 if (imp == NULL || imp->imp_invalid)
1658                         RETURN(-EIO);
1659                 RETURN(0);
1660         }
1661
1662         orig = ppga = osc_build_ppga(pga, page_count);
1663         if (ppga == NULL)
1664                 RETURN(-ENOMEM);
1665         page_count_orig = page_count;
1666
1667         sort_brw_pages(ppga, page_count);
1668         while (page_count) {
1669                 struct brw_page **copy;
1670                 obd_count pages_per_brw;
1671
1672                 pages_per_brw = min_t(obd_count, page_count,
1673                                       cli->cl_max_pages_per_rpc);
1674
1675                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1676
1677                 /* use ppga only if single RPC is going to fly */
1678                 if (pages_per_brw != page_count_orig || ppga != orig) {
1679                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1680                         if (copy == NULL)
1681                                 GOTO(out, rc = -ENOMEM);
1682                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1683                 } else
1684                         copy = ppga;
1685
1686                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1687                                     pages_per_brw, copy, set, oinfo->oi_capa);
1688
1689                 if (rc != 0) {
1690                         if (copy != ppga)
1691                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1692                         break;
1693                 }
1694                 if (copy == orig) {
1695                         /* we passed it to async_internal() which is
1696                          * now responsible for releasing memory */
1697                         orig = NULL;
1698                 }
1699
1700                 page_count -= pages_per_brw;
1701                 ppga += pages_per_brw;
1702         }
1703 out:
1704         if (orig)
1705                 osc_release_ppga(orig, page_count_orig);
1706         RETURN(rc);
1707 }
1708
1709 static void osc_check_rpcs(struct client_obd *cli);
1710
1711 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1712  * the dirty accounting.  Writeback completes or truncate happens before
1713  * writing starts.  Must be called with the loi lock held. */
1714 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1715                            int sent)
1716 {
1717         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1718 }
1719
1720
1721 /* This maintains the lists of pending pages to read/write for a given object
1722  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1723  * to quickly find objects that are ready to send an RPC. */
1724 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1725                          int cmd)
1726 {
1727         int optimal;
1728         ENTRY;
1729
1730         if (lop->lop_num_pending == 0)
1731                 RETURN(0);
1732
1733         /* if we have an invalid import we want to drain the queued pages
1734          * by forcing them through rpcs that immediately fail and complete
1735          * the pages.  recovery relies on this to empty the queued pages
1736          * before canceling the locks and evicting down the llite pages */
1737         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1738                 RETURN(1);
1739
1740         /* stream rpcs in queue order as long as as there is an urgent page
1741          * queued.  this is our cheap solution for good batching in the case
1742          * where writepage marks some random page in the middle of the file
1743          * as urgent because of, say, memory pressure */
1744         if (!list_empty(&lop->lop_urgent)) {
1745                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1746                 RETURN(1);
1747         }
1748         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1749         optimal = cli->cl_max_pages_per_rpc;
1750         if (cmd & OBD_BRW_WRITE) {
1751                 /* trigger a write rpc stream as long as there are dirtiers
1752                  * waiting for space.  as they're waiting, they're not going to
1753                  * create more pages to coallesce with what's waiting.. */
1754                 if (!list_empty(&cli->cl_cache_waiters)) {
1755                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1756                         RETURN(1);
1757                 }
1758                 /* +16 to avoid triggering rpcs that would want to include pages
1759                  * that are being queued but which can't be made ready until
1760                  * the queuer finishes with the page. this is a wart for
1761                  * llite::commit_write() */
1762                 optimal += 16;
1763         }
1764         if (lop->lop_num_pending >= optimal)
1765                 RETURN(1);
1766
1767         RETURN(0);
1768 }
1769
1770 static void on_list(struct list_head *item, struct list_head *list,
1771                     int should_be_on)
1772 {
1773         if (list_empty(item) && should_be_on)
1774                 list_add_tail(item, list);
1775         else if (!list_empty(item) && !should_be_on)
1776                 list_del_init(item);
1777 }
1778
1779 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1780  * can find pages to build into rpcs quickly */
1781 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1782 {
1783         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1784                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1785                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1786
1787         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1788                 loi->loi_write_lop.lop_num_pending);
1789
1790         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1791                 loi->loi_read_lop.lop_num_pending);
1792 }
1793
1794 static void lop_update_pending(struct client_obd *cli,
1795                                struct loi_oap_pages *lop, int cmd, int delta)
1796 {
1797         lop->lop_num_pending += delta;
1798         if (cmd & OBD_BRW_WRITE)
1799                 cli->cl_pending_w_pages += delta;
1800         else
1801                 cli->cl_pending_r_pages += delta;
1802 }
1803
1804 /* this is called when a sync waiter receives an interruption.  Its job is to
1805  * get the caller woken as soon as possible.  If its page hasn't been put in an
1806  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1807  * desiring interruption which will forcefully complete the rpc once the rpc
1808  * has timed out */
1809 static void osc_occ_interrupted(struct oig_callback_context *occ)
1810 {
1811         struct osc_async_page *oap;
1812         struct loi_oap_pages *lop;
1813         struct lov_oinfo *loi;
1814         ENTRY;
1815
1816         /* XXX member_of() */
1817         oap = list_entry(occ, struct osc_async_page, oap_occ);
1818
1819         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1820
1821         oap->oap_interrupted = 1;
1822
1823         /* ok, it's been put in an rpc. only one oap gets a request reference */
1824         if (oap->oap_request != NULL) {
1825                 ptlrpc_mark_interrupted(oap->oap_request);
1826                 ptlrpcd_wake(oap->oap_request);
1827                 GOTO(unlock, 0);
1828         }
1829
1830         /* we don't get interruption callbacks until osc_trigger_group_io()
1831          * has been called and put the sync oaps in the pending/urgent lists.*/
1832         if (!list_empty(&oap->oap_pending_item)) {
1833                 list_del_init(&oap->oap_pending_item);
1834                 list_del_init(&oap->oap_urgent_item);
1835
1836                 loi = oap->oap_loi;
1837                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1838                         &loi->loi_write_lop : &loi->loi_read_lop;
1839                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1840                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1841
1842                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1843                 oap->oap_oig = NULL;
1844         }
1845
1846 unlock:
1847         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1848 }
1849
1850 /* this is trying to propogate async writeback errors back up to the
1851  * application.  As an async write fails we record the error code for later if
1852  * the app does an fsync.  As long as errors persist we force future rpcs to be
1853  * sync so that the app can get a sync error and break the cycle of queueing
1854  * pages for which writeback will fail. */
1855 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1856                            int rc)
1857 {
1858         if (rc) {
1859                 if (!ar->ar_rc)
1860                         ar->ar_rc = rc;
1861
1862                 ar->ar_force_sync = 1;
1863                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1864                 return;
1865
1866         }
1867
1868         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1869                 ar->ar_force_sync = 0;
1870 }
1871
1872 static void osc_oap_to_pending(struct osc_async_page *oap)
1873 {
1874         struct loi_oap_pages *lop;
1875
1876         if (oap->oap_cmd & OBD_BRW_WRITE)
1877                 lop = &oap->oap_loi->loi_write_lop;
1878         else
1879                 lop = &oap->oap_loi->loi_read_lop;
1880
1881         if (oap->oap_async_flags & ASYNC_URGENT)
1882                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1883         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1884         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1885 }
1886
1887 /* this must be called holding the loi list lock to give coverage to exit_cache,
1888  * async_flag maintenance, and oap_request */
1889 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1890                               struct osc_async_page *oap, int sent, int rc)
1891 {
1892         __u64 xid = 0;
1893
1894         ENTRY;
1895         if (oap->oap_request != NULL) {
1896                 xid = ptlrpc_req_xid(oap->oap_request);
1897                 ptlrpc_req_finished(oap->oap_request);
1898                 oap->oap_request = NULL;
1899         }
1900
1901         oap->oap_async_flags = 0;
1902         oap->oap_interrupted = 0;
1903
1904         if (oap->oap_cmd & OBD_BRW_WRITE) {
1905                 osc_process_ar(&cli->cl_ar, xid, rc);
1906                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1907         }
1908
1909         if (rc == 0 && oa != NULL) {
1910                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1911                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1912                 if (oa->o_valid & OBD_MD_FLMTIME)
1913                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1914                 if (oa->o_valid & OBD_MD_FLATIME)
1915                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1916                 if (oa->o_valid & OBD_MD_FLCTIME)
1917                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1918         }
1919
1920         if (oap->oap_oig) {
1921                 osc_exit_cache(cli, oap, sent);
1922                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1923                 oap->oap_oig = NULL;
1924                 EXIT;
1925                 return;
1926         }
1927
1928         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1929                                                 oap->oap_cmd, oa, rc);
1930
1931         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1932          * I/O on the page could start, but OSC calls it under lock
1933          * and thus we can add oap back to pending safely */
1934         if (rc)
1935                 /* upper layer wants to leave the page on pending queue */
1936                 osc_oap_to_pending(oap);
1937         else
1938                 osc_exit_cache(cli, oap, sent);
1939         EXIT;
1940 }
1941
1942 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1943 {
1944         struct osc_async_page *oap, *tmp;
1945         struct osc_brw_async_args *aa = data;
1946         struct client_obd *cli;
1947         ENTRY;
1948
1949         rc = osc_brw_fini_request(req, rc);
1950         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1951         if (osc_recoverable_error(rc)) {
1952                 rc = osc_brw_redo_request(req, aa);
1953                 if (rc == 0)
1954                         RETURN(0);
1955         }
1956
1957         cli = aa->aa_cli;
1958
1959         client_obd_list_lock(&cli->cl_loi_list_lock);
1960
1961         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1962          * is called so we know whether to go to sync BRWs or wait for more
1963          * RPCs to complete */
1964         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1965                 cli->cl_w_in_flight--;
1966         else
1967                 cli->cl_r_in_flight--;
1968
1969         /* the caller may re-use the oap after the completion call so
1970          * we need to clean it up a little */
1971         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1972                 list_del_init(&oap->oap_rpc_item);
1973                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1974         }
1975
1976         osc_wake_cache_waiters(cli);
1977         osc_check_rpcs(cli);
1978
1979         client_obd_list_unlock(&cli->cl_loi_list_lock);
1980
1981         OBDO_FREE(aa->aa_oa);
1982         
1983         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1984         RETURN(rc);
1985 }
1986
1987 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1988                                             struct list_head *rpc_list,
1989                                             int page_count, int cmd)
1990 {
1991         struct ptlrpc_request *req;
1992         struct brw_page **pga = NULL;
1993         struct osc_brw_async_args *aa;
1994         struct obdo *oa = NULL;
1995         struct obd_async_page_ops *ops = NULL;
1996         void *caller_data = NULL;
1997         struct obd_capa *ocapa;
1998         struct osc_async_page *oap;
1999         int i, rc;
2000
2001         ENTRY;
2002         LASSERT(!list_empty(rpc_list));
2003
2004         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2005         if (pga == NULL)
2006                 RETURN(ERR_PTR(-ENOMEM));
2007
2008         OBDO_ALLOC(oa);
2009         if (oa == NULL)
2010                 GOTO(out, req = ERR_PTR(-ENOMEM));
2011
2012         i = 0;
2013         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2014                 if (ops == NULL) {
2015                         ops = oap->oap_caller_ops;
2016                         caller_data = oap->oap_caller_data;
2017                 }
2018                 pga[i] = &oap->oap_brw_page;
2019                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2020                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2021                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2022                 i++;
2023         }
2024
2025         /* always get the data for the obdo for the rpc */
2026         LASSERT(ops != NULL);
2027         ops->ap_fill_obdo(caller_data, cmd, oa);
2028         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2029
2030         sort_brw_pages(pga, page_count);
2031         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2032                                   pga, &req, ocapa);
2033         capa_put(ocapa);
2034         if (rc != 0) {
2035                 CERROR("prep_req failed: %d\n", rc);
2036                 GOTO(out, req = ERR_PTR(rc));
2037         }
2038
2039         /* Need to update the timestamps after the request is built in case
2040          * we race with setattr (locally or in queue at OST).  If OST gets
2041          * later setattr before earlier BRW (as determined by the request xid),
2042          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2043          * way to do this in a single call.  bug 10150 */
2044         ops->ap_update_obdo(caller_data, cmd, oa,
2045                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2046
2047         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2048         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2049         INIT_LIST_HEAD(&aa->aa_oaps);
2050         list_splice(rpc_list, &aa->aa_oaps);
2051         INIT_LIST_HEAD(rpc_list);
2052
2053 out:
2054         if (IS_ERR(req)) {
2055                 if (oa)
2056                         OBDO_FREE(oa);
2057                 if (pga)
2058                         OBD_FREE(pga, sizeof(*pga) * page_count);
2059         }
2060         RETURN(req);
2061 }
2062
2063 /* the loi lock is held across this function but it's allowed to release
2064  * and reacquire it during its work */
2065 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2066                             int cmd, struct loi_oap_pages *lop)
2067 {
2068         struct ptlrpc_request *req;
2069         obd_count page_count = 0;
2070         struct osc_async_page *oap = NULL, *tmp;
2071         struct osc_brw_async_args *aa;
2072         struct obd_async_page_ops *ops;
2073         CFS_LIST_HEAD(rpc_list);
2074         unsigned int ending_offset;
2075         unsigned  starting_offset = 0;
2076         ENTRY;
2077
2078         /* first we find the pages we're allowed to work with */
2079         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2080                                  oap_pending_item) {
2081                 ops = oap->oap_caller_ops;
2082
2083                 LASSERT(oap->oap_magic == OAP_MAGIC);
2084
2085                 /* in llite being 'ready' equates to the page being locked
2086                  * until completion unlocks it.  commit_write submits a page
2087                  * as not ready because its unlock will happen unconditionally
2088                  * as the call returns.  if we race with commit_write giving
2089                  * us that page we dont' want to create a hole in the page
2090                  * stream, so we stop and leave the rpc to be fired by
2091                  * another dirtier or kupdated interval (the not ready page
2092                  * will still be on the dirty list).  we could call in
2093                  * at the end of ll_file_write to process the queue again. */
2094                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2095                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2096                         if (rc < 0)
2097                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2098                                                 "instead of ready\n", oap,
2099                                                 oap->oap_page, rc);
2100                         switch (rc) {
2101                         case -EAGAIN:
2102                                 /* llite is telling us that the page is still
2103                                  * in commit_write and that we should try
2104                                  * and put it in an rpc again later.  we
2105                                  * break out of the loop so we don't create
2106                                  * a hole in the sequence of pages in the rpc
2107                                  * stream.*/
2108                                 oap = NULL;
2109                                 break;
2110                         case -EINTR:
2111                                 /* the io isn't needed.. tell the checks
2112                                  * below to complete the rpc with EINTR */
2113                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2114                                 oap->oap_count = -EINTR;
2115                                 break;
2116                         case 0:
2117                                 oap->oap_async_flags |= ASYNC_READY;
2118                                 break;
2119                         default:
2120                                 LASSERTF(0, "oap %p page %p returned %d "
2121                                             "from make_ready\n", oap,
2122                                             oap->oap_page, rc);
2123                                 break;
2124                         }
2125                 }
2126                 if (oap == NULL)
2127                         break;
2128                 /*
2129                  * Page submitted for IO has to be locked. Either by
2130                  * ->ap_make_ready() or by higher layers.
2131                  *
2132                  * XXX nikita: this assertion should be adjusted when lustre
2133                  * starts using PG_writeback for pages being written out.
2134                  */
2135 #if defined(__KERNEL__) && defined(__LINUX__)
2136                 LASSERT(PageLocked(oap->oap_page));
2137 #endif
2138                 /* If there is a gap at the start of this page, it can't merge
2139                  * with any previous page, so we'll hand the network a
2140                  * "fragmented" page array that it can't transfer in 1 RDMA */
2141                 if (page_count != 0 && oap->oap_page_off != 0)
2142                         break;
2143
2144                 /* take the page out of our book-keeping */
2145                 list_del_init(&oap->oap_pending_item);
2146                 lop_update_pending(cli, lop, cmd, -1);
2147                 list_del_init(&oap->oap_urgent_item);
2148
2149                 if (page_count == 0)
2150                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2151                                           (PTLRPC_MAX_BRW_SIZE - 1);
2152
2153                 /* ask the caller for the size of the io as the rpc leaves. */
2154                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2155                         oap->oap_count =
2156                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2157                 if (oap->oap_count <= 0) {
2158                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2159                                oap->oap_count);
2160                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2161                         continue;
2162                 }
2163
2164                 /* now put the page back in our accounting */
2165                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2166                 if (++page_count >= cli->cl_max_pages_per_rpc)
2167                         break;
2168
2169                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2170                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2171                  * have the same alignment as the initial writes that allocated
2172                  * extents on the server. */
2173                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2174                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2175                 if (ending_offset == 0)
2176                         break;
2177
2178                 /* If there is a gap at the end of this page, it can't merge
2179                  * with any subsequent pages, so we'll hand the network a
2180                  * "fragmented" page array that it can't transfer in 1 RDMA */
2181                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2182                         break;
2183         }
2184
2185         osc_wake_cache_waiters(cli);
2186
2187         if (page_count == 0)
2188                 RETURN(0);
2189
2190         loi_list_maint(cli, loi);
2191
2192         client_obd_list_unlock(&cli->cl_loi_list_lock);
2193
2194         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2195         if (IS_ERR(req)) {
2196                 /* this should happen rarely and is pretty bad, it makes the
2197                  * pending list not follow the dirty order */
2198                 client_obd_list_lock(&cli->cl_loi_list_lock);
2199                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2200                         list_del_init(&oap->oap_rpc_item);
2201
2202                         /* queued sync pages can be torn down while the pages
2203                          * were between the pending list and the rpc */
2204                         if (oap->oap_interrupted) {
2205                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2206                                 osc_ap_completion(cli, NULL, oap, 0,
2207                                                   oap->oap_count);
2208                                 continue;
2209                         }
2210                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2211                 }
2212                 loi_list_maint(cli, loi);
2213                 RETURN(PTR_ERR(req));
2214         }
2215
2216         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2217
2218         if (cmd == OBD_BRW_READ) {
2219                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2220                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2221                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2222                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2223                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2224         } else {
2225                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2226                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2227                                  cli->cl_w_in_flight);
2228                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2229                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2230                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2231         }
2232
2233         client_obd_list_lock(&cli->cl_loi_list_lock);
2234
2235         if (cmd == OBD_BRW_READ)
2236                 cli->cl_r_in_flight++;
2237         else
2238                 cli->cl_w_in_flight++;
2239
2240         /* queued sync pages can be torn down while the pages
2241          * were between the pending list and the rpc */
2242         tmp = NULL;
2243         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2244                 /* only one oap gets a request reference */
2245                 if (tmp == NULL)
2246                         tmp = oap;
2247                 if (oap->oap_interrupted && !req->rq_intr) {
2248                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2249                                oap, req);
2250                         ptlrpc_mark_interrupted(req);
2251                 }
2252         }
2253         if (tmp != NULL)
2254                 tmp->oap_request = ptlrpc_request_addref(req);
2255
2256         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2257                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2258
2259         req->rq_interpret_reply = brw_interpret_oap;
2260         ptlrpcd_add_req(req);
2261         RETURN(1);
2262 }
2263
2264 #define LOI_DEBUG(LOI, STR, args...)                                     \
2265         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2266                !list_empty(&(LOI)->loi_cli_item),                        \
2267                (LOI)->loi_write_lop.lop_num_pending,                     \
2268                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2269                (LOI)->loi_read_lop.lop_num_pending,                      \
2270                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2271                args)                                                     \
2272
2273 /* This is called by osc_check_rpcs() to find which objects have pages that
2274  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2275 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2276 {
2277         ENTRY;
2278         /* first return all objects which we already know to have
2279          * pages ready to be stuffed into rpcs */
2280         if (!list_empty(&cli->cl_loi_ready_list))
2281                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2282                                   struct lov_oinfo, loi_cli_item));
2283
2284         /* then if we have cache waiters, return all objects with queued
2285          * writes.  This is especially important when many small files
2286          * have filled up the cache and not been fired into rpcs because
2287          * they don't pass the nr_pending/object threshhold */
2288         if (!list_empty(&cli->cl_cache_waiters) &&
2289             !list_empty(&cli->cl_loi_write_list))
2290                 RETURN(list_entry(cli->cl_loi_write_list.next,
2291                                   struct lov_oinfo, loi_write_item));
2292
2293         /* then return all queued objects when we have an invalid import
2294          * so that they get flushed */
2295         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2296                 if (!list_empty(&cli->cl_loi_write_list))
2297                         RETURN(list_entry(cli->cl_loi_write_list.next,
2298                                           struct lov_oinfo, loi_write_item));
2299                 if (!list_empty(&cli->cl_loi_read_list))
2300                         RETURN(list_entry(cli->cl_loi_read_list.next,
2301                                           struct lov_oinfo, loi_read_item));
2302         }
2303         RETURN(NULL);
2304 }
2305
2306 /* called with the loi list lock held */
2307 static void osc_check_rpcs(struct client_obd *cli)
2308 {
2309         struct lov_oinfo *loi;
2310         int rc = 0, race_counter = 0;
2311         ENTRY;
2312
2313         while ((loi = osc_next_loi(cli)) != NULL) {
2314                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2315
2316                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2317                         break;
2318
2319                 /* attempt some read/write balancing by alternating between
2320                  * reads and writes in an object.  The makes_rpc checks here
2321                  * would be redundant if we were getting read/write work items
2322                  * instead of objects.  we don't want send_oap_rpc to drain a
2323                  * partial read pending queue when we're given this object to
2324                  * do io on writes while there are cache waiters */
2325                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2326                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2327                                               &loi->loi_write_lop);
2328                         if (rc < 0)
2329                                 break;
2330                         if (rc > 0)
2331                                 race_counter = 0;
2332                         else
2333                                 race_counter++;
2334                 }
2335                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2336                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2337                                               &loi->loi_read_lop);
2338                         if (rc < 0)
2339                                 break;
2340                         if (rc > 0)
2341                                 race_counter = 0;
2342                         else
2343                                 race_counter++;
2344                 }
2345
2346                 /* attempt some inter-object balancing by issueing rpcs
2347                  * for each object in turn */
2348                 if (!list_empty(&loi->loi_cli_item))
2349                         list_del_init(&loi->loi_cli_item);
2350                 if (!list_empty(&loi->loi_write_item))
2351                         list_del_init(&loi->loi_write_item);
2352                 if (!list_empty(&loi->loi_read_item))
2353                         list_del_init(&loi->loi_read_item);
2354
2355                 loi_list_maint(cli, loi);
2356
2357                 /* send_oap_rpc fails with 0 when make_ready tells it to
2358                  * back off.  llite's make_ready does this when it tries
2359                  * to lock a page queued for write that is already locked.
2360                  * we want to try sending rpcs from many objects, but we
2361                  * don't want to spin failing with 0.  */
2362                 if (race_counter == 10)
2363                         break;
2364         }
2365         EXIT;
2366 }
2367
2368 /* we're trying to queue a page in the osc so we're subject to the
2369  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2370  * If the osc's queued pages are already at that limit, then we want to sleep
2371  * until there is space in the osc's queue for us.  We also may be waiting for
2372  * write credits from the OST if there are RPCs in flight that may return some
2373  * before we fall back to sync writes.
2374  *
2375  * We need this know our allocation was granted in the presence of signals */
2376 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2377 {
2378         int rc;
2379         ENTRY;
2380         client_obd_list_lock(&cli->cl_loi_list_lock);
2381         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2382         client_obd_list_unlock(&cli->cl_loi_list_lock);
2383         RETURN(rc);
2384 };
2385
2386 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2387  * grant or cache space. */
2388 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2389                            struct osc_async_page *oap)
2390 {
2391         struct osc_cache_waiter ocw;
2392         struct l_wait_info lwi = { 0 };
2393
2394         ENTRY;
2395
2396         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2397                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2398                cli->cl_dirty_max, obd_max_dirty_pages,
2399                cli->cl_lost_grant, cli->cl_avail_grant);
2400
2401         /* force the caller to try sync io.  this can jump the list
2402          * of queued writes and create a discontiguous rpc stream */
2403         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2404             loi->loi_ar.ar_force_sync)
2405                 RETURN(-EDQUOT);
2406
2407         /* Hopefully normal case - cache space and write credits available */
2408         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2409             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2410             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2411                 /* account for ourselves */
2412                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2413                 RETURN(0);
2414         }
2415
2416         /* Make sure that there are write rpcs in flight to wait for.  This
2417          * is a little silly as this object may not have any pending but
2418          * other objects sure might. */
2419         if (cli->cl_w_in_flight) {
2420                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2421                 cfs_waitq_init(&ocw.ocw_waitq);
2422                 ocw.ocw_oap = oap;
2423                 ocw.ocw_rc = 0;
2424
2425                 loi_list_maint(cli, loi);
2426                 osc_check_rpcs(cli);
2427                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2428
2429                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2430                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2431
2432                 client_obd_list_lock(&cli->cl_loi_list_lock);
2433                 if (!list_empty(&ocw.ocw_entry)) {
2434                         list_del(&ocw.ocw_entry);
2435                         RETURN(-EINTR);
2436                 }
2437                 RETURN(ocw.ocw_rc);
2438         }
2439
2440         RETURN(-EDQUOT);
2441 }
2442
2443 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2444                         struct lov_oinfo *loi, cfs_page_t *page,
2445                         obd_off offset, struct obd_async_page_ops *ops,
2446                         void *data, void **res)
2447 {
2448         struct osc_async_page *oap;
2449         ENTRY;
2450
2451         if (!page)
2452                 return size_round(sizeof(*oap));
2453
2454         oap = *res;
2455         oap->oap_magic = OAP_MAGIC;
2456         oap->oap_cli = &exp->exp_obd->u.cli;
2457         oap->oap_loi = loi;
2458
2459         oap->oap_caller_ops = ops;
2460         oap->oap_caller_data = data;
2461
2462         oap->oap_page = page;
2463         oap->oap_obj_off = offset;
2464
2465         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2466         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2467         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2468
2469         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2470
2471         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2472         RETURN(0);
2473 }
2474
2475 struct osc_async_page *oap_from_cookie(void *cookie)
2476 {
2477         struct osc_async_page *oap = cookie;
2478         if (oap->oap_magic != OAP_MAGIC)
2479                 return ERR_PTR(-EINVAL);
2480         return oap;
2481 };
2482
2483 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2484                               struct lov_oinfo *loi, void *cookie,
2485                               int cmd, obd_off off, int count,
2486                               obd_flag brw_flags, enum async_flags async_flags)
2487 {
2488         struct client_obd *cli = &exp->exp_obd->u.cli;
2489         struct osc_async_page *oap;
2490         int rc = 0;
2491         ENTRY;
2492
2493         oap = oap_from_cookie(cookie);
2494         if (IS_ERR(oap))
2495                 RETURN(PTR_ERR(oap));
2496
2497         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2498                 RETURN(-EIO);
2499
2500         if (!list_empty(&oap->oap_pending_item) ||
2501             !list_empty(&oap->oap_urgent_item) ||
2502             !list_empty(&oap->oap_rpc_item))
2503                 RETURN(-EBUSY);
2504
2505         /* check if the file's owner/group is over quota */
2506 #ifdef HAVE_QUOTA_SUPPORT
2507         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2508                 struct obd_async_page_ops *ops;
2509                 struct obdo *oa;
2510
2511                 OBDO_ALLOC(oa);
2512                 if (oa == NULL)
2513                         RETURN(-ENOMEM);
2514
2515                 ops = oap->oap_caller_ops;
2516                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2517                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2518                     NO_QUOTA)
2519                         rc = -EDQUOT;
2520
2521                 OBDO_FREE(oa);
2522                 if (rc)
2523                         RETURN(rc);
2524         }
2525 #endif
2526
2527         if (loi == NULL)
2528                 loi = lsm->lsm_oinfo[0];
2529
2530         client_obd_list_lock(&cli->cl_loi_list_lock);
2531
2532         oap->oap_cmd = cmd;
2533         oap->oap_page_off = off;
2534         oap->oap_count = count;
2535         oap->oap_brw_flags = brw_flags;
2536         oap->oap_async_flags = async_flags;
2537
2538         if (cmd & OBD_BRW_WRITE) {
2539                 rc = osc_enter_cache(cli, loi, oap);
2540                 if (rc) {
2541                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2542                         RETURN(rc);
2543                 }
2544         }
2545
2546         osc_oap_to_pending(oap);
2547         loi_list_maint(cli, loi);
2548
2549         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2550                   cmd);
2551
2552         osc_check_rpcs(cli);
2553         client_obd_list_unlock(&cli->cl_loi_list_lock);
2554
2555         RETURN(0);
2556 }
2557
2558 /* aka (~was & now & flag), but this is more clear :) */
2559 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2560
2561 static int osc_set_async_flags(struct obd_export *exp,
2562                                struct lov_stripe_md *lsm,
2563                                struct lov_oinfo *loi, void *cookie,
2564                                obd_flag async_flags)
2565 {
2566         struct client_obd *cli = &exp->exp_obd->u.cli;
2567         struct loi_oap_pages *lop;
2568         struct osc_async_page *oap;
2569         int rc = 0;
2570         ENTRY;
2571
2572         oap = oap_from_cookie(cookie);
2573         if (IS_ERR(oap))
2574                 RETURN(PTR_ERR(oap));
2575
2576         /*
2577          * bug 7311: OST-side locking is only supported for liblustre for now
2578          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2579          * implementation has to handle case where OST-locked page was picked
2580          * up by, e.g., ->writepage().
2581          */
2582         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2583         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2584                                      * tread here. */
2585
2586         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2587                 RETURN(-EIO);
2588
2589         if (loi == NULL)
2590                 loi = lsm->lsm_oinfo[0];
2591
2592         if (oap->oap_cmd & OBD_BRW_WRITE) {
2593                 lop = &loi->loi_write_lop;
2594         } else {
2595                 lop = &loi->loi_read_lop;
2596         }
2597
2598         client_obd_list_lock(&cli->cl_loi_list_lock);
2599
2600         if (list_empty(&oap->oap_pending_item))
2601                 GOTO(out, rc = -EINVAL);
2602
2603         if ((oap->oap_async_flags & async_flags) == async_flags)
2604                 GOTO(out, rc = 0);
2605
2606         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2607                 oap->oap_async_flags |= ASYNC_READY;
2608
2609         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2610                 if (list_empty(&oap->oap_rpc_item)) {
2611                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2612                         loi_list_maint(cli, loi);
2613                 }
2614         }
2615
2616         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2617                         oap->oap_async_flags);
2618 out:
2619         osc_check_rpcs(cli);
2620         client_obd_list_unlock(&cli->cl_loi_list_lock);
2621         RETURN(rc);
2622 }
2623
2624 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2625                              struct lov_oinfo *loi,
2626                              struct obd_io_group *oig, void *cookie,
2627                              int cmd, obd_off off, int count,
2628                              obd_flag brw_flags,
2629                              obd_flag async_flags)
2630 {
2631         struct client_obd *cli = &exp->exp_obd->u.cli;
2632         struct osc_async_page *oap;
2633         struct loi_oap_pages *lop;
2634         int rc = 0;
2635         ENTRY;
2636
2637         oap = oap_from_cookie(cookie);
2638         if (IS_ERR(oap))
2639                 RETURN(PTR_ERR(oap));
2640
2641         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2642                 RETURN(-EIO);
2643
2644         if (!list_empty(&oap->oap_pending_item) ||
2645             !list_empty(&oap->oap_urgent_item) ||
2646             !list_empty(&oap->oap_rpc_item))
2647                 RETURN(-EBUSY);
2648
2649         if (loi == NULL)
2650                 loi = lsm->lsm_oinfo[0];
2651
2652         client_obd_list_lock(&cli->cl_loi_list_lock);
2653
2654         oap->oap_cmd = cmd;
2655         oap->oap_page_off = off;
2656         oap->oap_count = count;
2657         oap->oap_brw_flags = brw_flags;
2658         oap->oap_async_flags = async_flags;
2659
2660         if (cmd & OBD_BRW_WRITE)
2661                 lop = &loi->loi_write_lop;
2662         else
2663                 lop = &loi->loi_read_lop;
2664
2665         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2666         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2667                 oap->oap_oig = oig;
2668                 rc = oig_add_one(oig, &oap->oap_occ);
2669         }
2670
2671         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2672                   oap, oap->oap_page, rc);
2673
2674         client_obd_list_unlock(&cli->cl_loi_list_lock);
2675
2676         RETURN(rc);
2677 }
2678
2679 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2680                                  struct loi_oap_pages *lop, int cmd)
2681 {
2682         struct list_head *pos, *tmp;
2683         struct osc_async_page *oap;
2684
2685         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2686                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2687                 list_del(&oap->oap_pending_item);
2688                 osc_oap_to_pending(oap);
2689         }
2690         loi_list_maint(cli, loi);
2691 }
2692
2693 static int osc_trigger_group_io(struct obd_export *exp,
2694                                 struct lov_stripe_md *lsm,
2695                                 struct lov_oinfo *loi,
2696                                 struct obd_io_group *oig)
2697 {
2698         struct client_obd *cli = &exp->exp_obd->u.cli;
2699         ENTRY;
2700
2701         if (loi == NULL)
2702                 loi = lsm->lsm_oinfo[0];
2703
2704         client_obd_list_lock(&cli->cl_loi_list_lock);
2705
2706         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2707         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2708
2709         osc_check_rpcs(cli);
2710         client_obd_list_unlock(&cli->cl_loi_list_lock);
2711
2712         RETURN(0);
2713 }
2714
2715 static int osc_teardown_async_page(struct obd_export *exp,
2716                                    struct lov_stripe_md *lsm,
2717                                    struct lov_oinfo *loi, void *cookie)
2718 {
2719         struct client_obd *cli = &exp->exp_obd->u.cli;
2720         struct loi_oap_pages *lop;
2721         struct osc_async_page *oap;
2722         int rc = 0;
2723         ENTRY;
2724
2725         oap = oap_from_cookie(cookie);
2726         if (IS_ERR(oap))
2727                 RETURN(PTR_ERR(oap));
2728
2729         if (loi == NULL)
2730                 loi = lsm->lsm_oinfo[0];
2731
2732         if (oap->oap_cmd & OBD_BRW_WRITE) {
2733                 lop = &loi->loi_write_lop;
2734         } else {
2735                 lop = &loi->loi_read_lop;
2736         }
2737
2738         client_obd_list_lock(&cli->cl_loi_list_lock);
2739
2740         if (!list_empty(&oap->oap_rpc_item))
2741                 GOTO(out, rc = -EBUSY);
2742
2743         osc_exit_cache(cli, oap, 0);
2744         osc_wake_cache_waiters(cli);
2745
2746         if (!list_empty(&oap->oap_urgent_item)) {
2747                 list_del_init(&oap->oap_urgent_item);
2748                 oap->oap_async_flags &= ~ASYNC_URGENT;
2749         }
2750         if (!list_empty(&oap->oap_pending_item)) {
2751                 list_del_init(&oap->oap_pending_item);
2752                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2753         }
2754         loi_list_maint(cli, loi);
2755
2756         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2757 out:
2758         client_obd_list_unlock(&cli->cl_loi_list_lock);
2759         RETURN(rc);
2760 }
2761
2762 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2763                                     int flags)
2764 {
2765         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2766
2767         if (lock == NULL) {
2768                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2769                 return;
2770         }
2771         lock_res_and_lock(lock);
2772 #ifdef __KERNEL__
2773 #ifdef __LINUX__
2774         /* Liang XXX: Darwin and Winnt checking should be added */
2775         if (lock->l_ast_data && lock->l_ast_data != data) {
2776                 struct inode *new_inode = data;
2777                 struct inode *old_inode = lock->l_ast_data;
2778                 if (!(old_inode->i_state & I_FREEING))
2779                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2780                 LASSERTF(old_inode->i_state & I_FREEING,
2781                          "Found existing inode %p/%lu/%u state %lu in lock: "
2782                          "setting data to %p/%lu/%u\n", old_inode,
2783                          old_inode->i_ino, old_inode->i_generation,
2784                          old_inode->i_state,
2785                          new_inode, new_inode->i_ino, new_inode->i_generation);
2786         }
2787 #endif
2788 #endif
2789         lock->l_ast_data = data;
2790         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2791         unlock_res_and_lock(lock);
2792         LDLM_LOCK_PUT(lock);
2793 }
2794
2795 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2796                              ldlm_iterator_t replace, void *data)
2797 {
2798         struct ldlm_res_id res_id = { .name = {0} };
2799         struct obd_device *obd = class_exp2obd(exp);
2800
2801         res_id.name[0] = lsm->lsm_object_id;
2802         res_id.name[2] = lsm->lsm_object_gr;
2803
2804         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2805         return 0;
2806 }
2807
2808 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2809                             int intent, int rc)
2810 {
2811         ENTRY;
2812
2813         if (intent) {
2814                 /* The request was created before ldlm_cli_enqueue call. */
2815                 if (rc == ELDLM_LOCK_ABORTED) {
2816                         struct ldlm_reply *rep;
2817
2818                         /* swabbed by ldlm_cli_enqueue() */
2819                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2820                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2821                                              sizeof(*rep));
2822                         LASSERT(rep != NULL);
2823                         if (rep->lock_policy_res1)
2824                                 rc = rep->lock_policy_res1;
2825                 }
2826         }
2827
2828         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2829                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2830                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2831                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2832                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2833         }
2834
2835         /* Call the update callback. */
2836         rc = oinfo->oi_cb_up(oinfo, rc);
2837         RETURN(rc);
2838 }
2839
2840 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2841                                  struct osc_enqueue_args *aa, int rc)
2842 {
2843         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2844         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2845         struct ldlm_lock *lock;
2846
2847         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2848          * be valid. */
2849         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2850
2851         /* Complete obtaining the lock procedure. */
2852         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2853                                    aa->oa_ei->ei_mode,
2854                                    &aa->oa_oi->oi_flags,
2855                                    &lsm->lsm_oinfo[0]->loi_lvb,
2856                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2857                                    lustre_swab_ost_lvb,
2858                                    aa->oa_oi->oi_lockh, rc);
2859
2860         /* Complete osc stuff. */
2861         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2862
2863         /* Release the lock for async request. */
2864         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2865                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2866
2867         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2868                  aa->oa_oi->oi_lockh, req, aa);
2869         LDLM_LOCK_PUT(lock);
2870         return rc;
2871 }
2872
2873 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2874  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2875  * other synchronous requests, however keeping some locks and trying to obtain
2876  * others may take a considerable amount of time in a case of ost failure; and
2877  * when other sync requests do not get released lock from a client, the client
2878  * is excluded from the cluster -- such scenarious make the life difficult, so
2879  * release locks just after they are obtained. */
2880 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2881                        struct ldlm_enqueue_info *einfo,
2882                        struct ptlrpc_request_set *rqset)
2883 {
2884         struct ldlm_res_id res_id = { .name = {0} };
2885         struct obd_device *obd = exp->exp_obd;
2886         struct ldlm_reply *rep;
2887         struct ptlrpc_request *req = NULL;
2888         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2889         int rc;
2890         ENTRY;
2891
2892         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2893         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2894
2895         /* Filesystem lock extents are extended to page boundaries so that
2896          * dealing with the page cache is a little smoother.  */
2897         oinfo->oi_policy.l_extent.start -=
2898                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2899         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2900
2901         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2902                 goto no_match;
2903
2904         /* Next, search for already existing extent locks that will cover us */
2905         rc = ldlm_lock_match(obd->obd_namespace,
2906                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2907                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2908                              oinfo->oi_lockh);
2909         if (rc == 1) {
2910                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2911                                         oinfo->oi_flags);
2912                 if (intent) {
2913                         /* I would like to be able to ASSERT here that rss <=
2914                          * kms, but I can't, for reasons which are explained in
2915                          * lov_enqueue() */
2916                 }
2917
2918                 /* We already have a lock, and it's referenced */
2919                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2920
2921                 /* For async requests, decref the lock. */
2922                 if (rqset)
2923                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2924
2925                 RETURN(ELDLM_OK);
2926         }
2927
2928         /* If we're trying to read, we also search for an existing PW lock.  The
2929          * VFS and page cache already protect us locally, so lots of readers/
2930          * writers can share a single PW lock.
2931          *
2932          * There are problems with conversion deadlocks, so instead of
2933          * converting a read lock to a write lock, we'll just enqueue a new
2934          * one.
2935          *
2936          * At some point we should cancel the read lock instead of making them
2937          * send us a blocking callback, but there are problems with canceling
2938          * locks out from other users right now, too. */
2939
2940         if (einfo->ei_mode == LCK_PR) {
2941                 rc = ldlm_lock_match(obd->obd_namespace,
2942                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2943                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2944                                      LCK_PW, oinfo->oi_lockh);
2945                 if (rc == 1) {
2946                         /* FIXME: This is not incredibly elegant, but it might
2947                          * be more elegant than adding another parameter to
2948                          * lock_match.  I want a second opinion. */
2949                         /* addref the lock only if not async requests. */
2950                         if (!rqset)
2951                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2952                         osc_set_data_with_check(oinfo->oi_lockh,
2953                                                 einfo->ei_cbdata,
2954                                                 oinfo->oi_flags);
2955                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2956                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2957                         RETURN(ELDLM_OK);
2958                 }
2959         }
2960
2961  no_match:
2962         if (intent) {
2963                 int size[3] = {
2964                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2965                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2966                         [DLM_LOCKREQ_OFF + 1] = 0 };
2967
2968                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2969                 if (req == NULL)
2970                         RETURN(-ENOMEM);
2971
2972                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2973                 size[DLM_REPLY_REC_OFF] =
2974                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2975                 ptlrpc_req_set_repsize(req, 3, size);
2976         }
2977
2978         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2979         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2980
2981         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2982                               &oinfo->oi_policy, &oinfo->oi_flags,
2983                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2984                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2985                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2986                               rqset ? 1 : 0);
2987         if (rqset) {
2988                 if (!rc) {
2989                         struct osc_enqueue_args *aa;
2990                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2991                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2992                         aa->oa_oi = oinfo;
2993                         aa->oa_ei = einfo;
2994                         aa->oa_exp = exp;
2995
2996                         req->rq_interpret_reply = osc_enqueue_interpret;
2997                         ptlrpc_set_add_req(rqset, req);
2998                 } else if (intent) {
2999                         ptlrpc_req_finished(req);
3000                 }
3001                 RETURN(rc);
3002         }
3003
3004         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3005         if (intent)
3006                 ptlrpc_req_finished(req);
3007
3008         RETURN(rc);
3009 }
3010
3011 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3012                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3013                      int *flags, void *data, struct lustre_handle *lockh)
3014 {
3015         struct ldlm_res_id res_id = { .name = {0} };
3016         struct obd_device *obd = exp->exp_obd;
3017         int rc;
3018         int lflags = *flags;
3019         ENTRY;
3020
3021         res_id.name[0] = lsm->lsm_object_id;
3022         res_id.name[2] = lsm->lsm_object_gr;
3023
3024         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3025
3026         /* Filesystem lock extents are extended to page boundaries so that
3027          * dealing with the page cache is a little smoother */
3028         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3029         policy->l_extent.end |= ~CFS_PAGE_MASK;
3030
3031         /* Next, search for already existing extent locks that will cover us */
3032         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3033                              &res_id, type, policy, mode, lockh);
3034         if (rc) {
3035                 //if (!(*flags & LDLM_FL_TEST_LOCK))
3036                         osc_set_data_with_check(lockh, data, lflags);
3037                 RETURN(rc);
3038         }
3039         /* If we're trying to read, we also search for an existing PW lock.  The
3040          * VFS and page cache already protect us locally, so lots of readers/
3041          * writers can share a single PW lock. */
3042         if (mode == LCK_PR) {
3043                 rc = ldlm_lock_match(obd->obd_namespace,
3044                                      lflags | LDLM_FL_LVB_READY, &res_id,
3045                                      type, policy, LCK_PW, lockh);
3046                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3047                         /* FIXME: This is not incredibly elegant, but it might
3048                          * be more elegant than adding another parameter to
3049                          * lock_match.  I want a second opinion. */
3050                         osc_set_data_with_check(lockh, data, lflags);
3051                         ldlm_lock_addref(lockh, LCK_PR);
3052                         ldlm_lock_decref(lockh, LCK_PW);
3053                 }
3054         }
3055         RETURN(rc);
3056 }
3057
3058 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3059                       __u32 mode, struct lustre_handle *lockh)
3060 {
3061         ENTRY;
3062
3063         if (unlikely(mode == LCK_GROUP))
3064                 ldlm_lock_decref_and_cancel(lockh, mode);
3065         else
3066                 ldlm_lock_decref(lockh, mode);
3067
3068         RETURN(0);
3069 }
3070
3071 static int osc_cancel_unused(struct obd_export *exp,
3072                              struct lov_stripe_md *lsm, int flags,
3073                              void *opaque)
3074 {
3075         struct obd_device *obd = class_exp2obd(exp);
3076         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3077
3078         if (lsm != NULL) {
3079                 res_id.name[0] = lsm->lsm_object_id;
3080                 res_id.name[2] = lsm->lsm_object_gr;
3081                 resp = &res_id;
3082         }
3083
3084         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3085 }
3086
3087 static int osc_join_lru(struct obd_export *exp,
3088                         struct lov_stripe_md *lsm, int join)
3089 {
3090         struct obd_device *obd = class_exp2obd(exp);
3091         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3092
3093         if (lsm != NULL) {
3094                 res_id.name[0] = lsm->lsm_object_id;
3095                 res_id.name[2] = lsm->lsm_object_gr;
3096                 resp = &res_id;
3097         }
3098
3099         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3100 }
3101
3102 static int osc_statfs_interpret(struct ptlrpc_request *req,
3103                                 struct osc_async_args *aa, int rc)
3104 {
3105         struct obd_statfs *msfs;
3106         ENTRY;
3107
3108         if (rc != 0)
3109                 GOTO(out, rc);
3110
3111         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3112                                   lustre_swab_obd_statfs);
3113         if (msfs == NULL) {
3114                 CERROR("Can't unpack obd_statfs\n");
3115                 GOTO(out, rc = -EPROTO);
3116         }
3117
3118         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3119 out:
3120         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3121         RETURN(rc);
3122 }
3123
3124 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3125                             __u64 max_age, struct ptlrpc_request_set *rqset)
3126 {
3127         struct ptlrpc_request *req;
3128         struct osc_async_args *aa;
3129         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3130         ENTRY;
3131
3132         /* We could possibly pass max_age in the request (as an absolute
3133          * timestamp or a "seconds.usec ago") so the target can avoid doing
3134          * extra calls into the filesystem if that isn't necessary (e.g.
3135          * during mount that would help a bit).  Having relative timestamps
3136          * is not so great if request processing is slow, while absolute
3137          * timestamps are not ideal because they need time synchronization. */
3138         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3139                               OST_STATFS, 1, NULL, NULL);
3140         if (!req)
3141                 RETURN(-ENOMEM);
3142
3143         ptlrpc_req_set_repsize(req, 2, size);
3144         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3145
3146         req->rq_interpret_reply = osc_statfs_interpret;
3147         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3148         aa = (struct osc_async_args *)&req->rq_async_args;
3149         aa->aa_oi = oinfo;
3150
3151         ptlrpc_set_add_req(rqset, req);
3152         RETURN(0);
3153 }
3154
3155 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3156                       __u64 max_age)
3157 {
3158         struct obd_statfs *msfs;
3159         struct ptlrpc_request *req;
3160         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3161         ENTRY;
3162
3163         /* We could possibly pass max_age in the request (as an absolute
3164          * timestamp or a "seconds.usec ago") so the target can avoid doing
3165          * extra calls into the filesystem if that isn't necessary (e.g.
3166          * during mount that would help a bit).  Having relative timestamps
3167          * is not so great if request processing is slow, while absolute
3168          * timestamps are not ideal because they need time synchronization. */
3169         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3170                               OST_STATFS, 1, NULL, NULL);
3171         if (!req)
3172                 RETURN(-ENOMEM);
3173
3174         ptlrpc_req_set_repsize(req, 2, size);
3175         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3176
3177         rc = ptlrpc_queue_wait(req);
3178         if (rc)
3179                 GOTO(out, rc);
3180
3181         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3182                                   lustre_swab_obd_statfs);
3183         if (msfs == NULL) {
3184                 CERROR("Can't unpack obd_statfs\n");
3185                 GOTO(out, rc = -EPROTO);
3186         }
3187
3188         memcpy(osfs, msfs, sizeof(*osfs));
3189
3190         EXIT;
3191  out:
3192         ptlrpc_req_finished(req);
3193         return rc;
3194 }
3195
3196 /* Retrieve object striping information.
3197  *
3198  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3199  * the maximum number of OST indices which will fit in the user buffer.
3200  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3201  */
3202 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3203 {
3204         struct lov_user_md lum, *lumk;
3205         int rc = 0, lum_size;
3206         ENTRY;
3207
3208         if (!lsm)
3209                 RETURN(-ENODATA);
3210
3211         if (copy_from_user(&lum, lump, sizeof(lum)))
3212                 RETURN(-EFAULT);
3213
3214         if (lum.lmm_magic != LOV_USER_MAGIC)
3215                 RETURN(-EINVAL);
3216
3217         if (lum.lmm_stripe_count > 0) {
3218                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3219                 OBD_ALLOC(lumk, lum_size);
3220                 if (!lumk)
3221                         RETURN(-ENOMEM);
3222
3223                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3224                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3225         } else {
3226                 lum_size = sizeof(lum);
3227                 lumk = &lum;
3228         }
3229
3230         lumk->lmm_object_id = lsm->lsm_object_id;
3231         lumk->lmm_object_gr = lsm->lsm_object_gr;
3232         lumk->lmm_stripe_count = 1;
3233
3234         if (copy_to_user(lump, lumk, lum_size))
3235                 rc = -EFAULT;
3236
3237         if (lumk != &lum)
3238                 OBD_FREE(lumk, lum_size);
3239
3240         RETURN(rc);
3241 }
3242
3243
3244 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3245                          void *karg, void *uarg)
3246 {
3247         struct obd_device *obd = exp->exp_obd;
3248         struct obd_ioctl_data *data = karg;
3249         int err = 0;
3250         ENTRY;
3251
3252 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3253         MOD_INC_USE_COUNT;
3254 #else
3255         if (!try_module_get(THIS_MODULE)) {
3256                 CERROR("Can't get module. Is it alive?");
3257                 return -EINVAL;
3258         }
3259 #endif
3260         switch (cmd) {
3261         case OBD_IOC_LOV_GET_CONFIG: {
3262                 char *buf;
3263                 struct lov_desc *desc;
3264                 struct obd_uuid uuid;
3265
3266                 buf = NULL;
3267                 len = 0;
3268                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3269                         GOTO(out, err = -EINVAL);
3270
3271                 data = (struct obd_ioctl_data *)buf;
3272
3273                 if (sizeof(*desc) > data->ioc_inllen1) {
3274                         obd_ioctl_freedata(buf, len);
3275                         GOTO(out, err = -EINVAL);
3276                 }
3277
3278                 if (data->ioc_inllen2 < sizeof(uuid)) {
3279                         obd_ioctl_freedata(buf, len);
3280                         GOTO(out, err = -EINVAL);
3281                 }
3282
3283                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3284                 desc->ld_tgt_count = 1;
3285                 desc->ld_active_tgt_count = 1;
3286                 desc->ld_default_stripe_count = 1;
3287                 desc->ld_default_stripe_size = 0;
3288                 desc->ld_default_stripe_offset = 0;
3289                 desc->ld_pattern = 0;
3290                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3291
3292                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3293
3294                 err = copy_to_user((void *)uarg, buf, len);
3295                 if (err)
3296                         err = -EFAULT;
3297                 obd_ioctl_freedata(buf, len);
3298                 GOTO(out, err);
3299         }
3300         case LL_IOC_LOV_SETSTRIPE:
3301                 err = obd_alloc_memmd(exp, karg);
3302                 if (err > 0)
3303                         err = 0;
3304                 GOTO(out, err);
3305         case LL_IOC_LOV_GETSTRIPE:
3306                 err = osc_getstripe(karg, uarg);
3307                 GOTO(out, err);
3308         case OBD_IOC_CLIENT_RECOVER:
3309                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3310                                             data->ioc_inlbuf1);
3311                 if (err > 0)
3312                         err = 0;
3313                 GOTO(out, err);
3314         case IOC_OSC_SET_ACTIVE:
3315                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3316                                                data->ioc_offset);
3317                 GOTO(out, err);
3318         case OBD_IOC_POLL_QUOTACHECK:
3319                 err = lquota_poll_check(quota_interface, exp,
3320                                         (struct if_quotacheck *)karg);
3321                 GOTO(out, err);
3322         default:
3323                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3324                        cmd, cfs_curproc_comm());
3325                 GOTO(out, err = -ENOTTY);
3326         }
3327 out:
3328 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3329         MOD_DEC_USE_COUNT;
3330 #else
3331         module_put(THIS_MODULE);
3332 #endif
3333         return err;
3334 }
3335
3336 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3337                         void *key, __u32 *vallen, void *val)
3338 {
3339         ENTRY;
3340         if (!vallen || !val)
3341                 RETURN(-EFAULT);
3342
3343         if (keylen > strlen("lock_to_stripe") &&
3344             strcmp(key, "lock_to_stripe") == 0) {
3345                 __u32 *stripe = val;
3346                 *vallen = sizeof(*stripe);
3347                 *stripe = 0;
3348                 RETURN(0);
3349         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3350                 struct ptlrpc_request *req;
3351                 obd_id *reply;
3352                 char *bufs[2] = { NULL, key };
3353                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3354
3355                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3356                                       OST_GET_INFO, 2, size, bufs);
3357                 if (req == NULL)
3358                         RETURN(-ENOMEM);
3359
3360                 size[REPLY_REC_OFF] = *vallen;
3361                 ptlrpc_req_set_repsize(req, 2, size);
3362                 rc = ptlrpc_queue_wait(req);
3363                 if (rc)
3364                         GOTO(out, rc);
3365
3366                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3367                                            lustre_swab_ost_last_id);
3368                 if (reply == NULL) {
3369                         CERROR("Can't unpack OST last ID\n");
3370                         GOTO(out, rc = -EPROTO);
3371                 }
3372                 *((obd_id *)val) = *reply;
3373         out:
3374                 ptlrpc_req_finished(req);
3375                 RETURN(rc);
3376         }
3377         RETURN(-EINVAL);
3378 }
3379
3380 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3381                                           void *aa, int rc)
3382 {
3383         struct llog_ctxt *ctxt;
3384         struct obd_import *imp = req->rq_import;
3385         ENTRY;
3386
3387         if (rc != 0)
3388                 RETURN(rc);
3389
3390         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3391         if (ctxt) {
3392                 if (rc == 0)
3393                         rc = llog_initiator_connect(ctxt);
3394                 else
3395                         CERROR("cannot establish connection for "
3396                                "ctxt %p: %d\n", ctxt, rc);
3397         }
3398
3399         spin_lock(&imp->imp_lock);
3400         imp->imp_server_timeout = 1;
3401         imp->imp_pingable = 1;
3402         spin_unlock(&imp->imp_lock);
3403         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3404
3405         RETURN(rc);
3406 }
3407
3408 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3409                               void *key, obd_count vallen, void *val,
3410                               struct ptlrpc_request_set *set)
3411 {
3412         struct ptlrpc_request *req;
3413         struct obd_device  *obd = exp->exp_obd;
3414         struct obd_import *imp = class_exp2cliimp(exp);
3415         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3416         char *bufs[3] = { NULL, key, val };
3417         ENTRY;
3418
3419         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3420
3421         if (KEY_IS(KEY_NEXT_ID)) {
3422                 if (vallen != sizeof(obd_id))
3423                         RETURN(-EINVAL);
3424                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3425                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3426                        exp->exp_obd->obd_name,
3427                        obd->u.cli.cl_oscc.oscc_next_id);
3428
3429                 RETURN(0);
3430         }
3431
3432         if (KEY_IS("unlinked")) {
3433                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3434                 spin_lock(&oscc->oscc_lock);
3435                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3436                 spin_unlock(&oscc->oscc_lock);
3437                 RETURN(0);
3438         }
3439
3440         if (KEY_IS(KEY_INIT_RECOV)) {
3441                 if (vallen != sizeof(int))
3442                         RETURN(-EINVAL);
3443                 spin_lock(&imp->imp_lock);
3444                 imp->imp_initial_recov = *(int *)val;
3445                 spin_unlock(&imp->imp_lock);
3446                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3447                        exp->exp_obd->obd_name,
3448                        imp->imp_initial_recov);
3449                 RETURN(0);
3450         }
3451
3452         if (KEY_IS("checksum")) {
3453                 if (vallen != sizeof(int))
3454                         RETURN(-EINVAL);
3455                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3456                 RETURN(0);
3457         }
3458
3459         if (KEY_IS(KEY_FLUSH_CTX)) {
3460                 sptlrpc_import_flush_my_ctx(imp);
3461                 RETURN(0);
3462         }
3463
3464         if (!set)
3465                 RETURN(-EINVAL);
3466
3467         /* We pass all other commands directly to OST. Since nobody calls osc
3468            methods directly and everybody is supposed to go through LOV, we
3469            assume lov checked invalid values for us.
3470            The only recognised values so far are evict_by_nid and mds_conn.
3471            Even if something bad goes through, we'd get a -EINVAL from OST
3472            anyway. */
3473
3474         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3475                               bufs);
3476         if (req == NULL)
3477                 RETURN(-ENOMEM);
3478
3479         if (KEY_IS("mds_conn")) {
3480                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3481
3482                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3483                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3484                 LASSERT(oscc->oscc_oa.o_gr > 0);
3485                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3486         }
3487
3488         ptlrpc_req_set_repsize(req, 1, NULL);
3489         ptlrpc_set_add_req(set, req);
3490         ptlrpc_check_set(set);
3491
3492         RETURN(0);
3493 }
3494
3495
3496 static struct llog_operations osc_size_repl_logops = {
3497         lop_cancel: llog_obd_repl_cancel
3498 };
3499
3500 static struct llog_operations osc_mds_ost_orig_logops;
3501 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3502                          struct obd_device *tgt, int count,
3503                          struct llog_catid *catid, struct obd_uuid *uuid)
3504 {
3505         int rc;
3506         ENTRY;
3507
3508         spin_lock(&obd->obd_dev_lock);
3509         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3510                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3511                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3512                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3513                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3514                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3515         }
3516         spin_unlock(&obd->obd_dev_lock);
3517
3518         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3519                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3520         if (rc) {
3521                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3522                 GOTO (out, rc);
3523         }
3524
3525         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3526                         &osc_size_repl_logops);
3527         if (rc)
3528                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3529 out:
3530         if (rc) {
3531                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3532                        obd->obd_name, tgt->obd_name, count, catid, rc);
3533                 CERROR("logid "LPX64":0x%x\n",
3534                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3535         }
3536         RETURN(rc);
3537 }
3538
3539 static int osc_llog_finish(struct obd_device *obd, int count)
3540 {
3541         struct llog_ctxt *ctxt;
3542         int rc = 0, rc2 = 0;
3543         ENTRY;
3544
3545         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3546         if (ctxt)
3547                 rc = llog_cleanup(ctxt);
3548
3549         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3550         if (ctxt)
3551                 rc2 = llog_cleanup(ctxt);
3552         if (!rc)
3553                 rc = rc2;
3554
3555         RETURN(rc);
3556 }
3557
3558 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3559                          struct obd_uuid *cluuid,
3560                          struct obd_connect_data *data)
3561 {
3562         struct client_obd *cli = &obd->u.cli;
3563
3564         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3565                 long lost_grant;
3566
3567                 client_obd_list_lock(&cli->cl_loi_list_lock);
3568                 data->ocd_grant = cli->cl_avail_grant ?:
3569                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3570                 lost_grant = cli->cl_lost_grant;
3571                 cli->cl_lost_grant = 0;
3572                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3573
3574                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3575                        "cl_lost_grant: %ld\n", data->ocd_grant,
3576                        cli->cl_avail_grant, lost_grant);
3577                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3578                        " ocd_grant: %d\n", data->ocd_connect_flags,
3579                        data->ocd_version, data->ocd_grant);
3580         }
3581
3582         RETURN(0);
3583 }
3584
3585 static int osc_disconnect(struct obd_export *exp)
3586 {
3587         struct obd_device *obd = class_exp2obd(exp);
3588         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3589         int rc;
3590
3591         if (obd->u.cli.cl_conn_count == 1)
3592                 /* flush any remaining cancel messages out to the target */
3593                 llog_sync(ctxt, exp);
3594
3595         rc = client_disconnect_export(exp);
3596         return rc;
3597 }
3598
3599 static int osc_import_event(struct obd_device *obd,
3600                             struct obd_import *imp,
3601                             enum obd_import_event event)
3602 {
3603         struct client_obd *cli;
3604         int rc = 0;
3605
3606         ENTRY;
3607         LASSERT(imp->imp_obd == obd);
3608
3609         switch (event) {
3610         case IMP_EVENT_DISCON: {
3611                 /* Only do this on the MDS OSC's */
3612                 if (imp->imp_server_timeout) {
3613                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3614
3615                         spin_lock(&oscc->oscc_lock);
3616                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3617                         spin_unlock(&oscc->oscc_lock);
3618                 }
3619                 cli = &obd->u.cli;
3620                 client_obd_list_lock(&cli->cl_loi_list_lock);
3621                 cli->cl_avail_grant = 0;
3622                 cli->cl_lost_grant = 0;
3623                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3624                 break;
3625         }
3626         case IMP_EVENT_INACTIVE: {
3627                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3628                 break;
3629         }
3630         case IMP_EVENT_INVALIDATE: {
3631                 struct ldlm_namespace *ns = obd->obd_namespace;
3632
3633                 /* Reset grants */
3634                 cli = &obd->u.cli;
3635                 client_obd_list_lock(&cli->cl_loi_list_lock);
3636                 /* all pages go to failing rpcs due to the invalid import */
3637                 osc_check_rpcs(cli);
3638                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3639
3640                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3641
3642                 break;
3643         }
3644         case IMP_EVENT_ACTIVE: {
3645                 /* Only do this on the MDS OSC's */
3646                 if (imp->imp_server_timeout) {
3647                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3648
3649                         spin_lock(&oscc->oscc_lock);
3650                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3651                         spin_unlock(&oscc->oscc_lock);
3652                 }
3653                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3654                 break;
3655         }
3656         case IMP_EVENT_OCD: {
3657                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3658
3659                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3660                         osc_init_grant(&obd->u.cli, ocd);
3661
3662                 /* See bug 7198 */
3663                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3664                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3665
3666                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3667                 break;
3668         }
3669         default:
3670                 CERROR("Unknown import event %d\n", event);
3671                 LBUG();
3672         }
3673         RETURN(rc);
3674 }
3675
3676 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3677 {
3678         int rc;
3679         ENTRY;
3680
3681         ENTRY;
3682         rc = ptlrpcd_addref();
3683         if (rc)
3684                 RETURN(rc);
3685
3686         rc = client_obd_setup(obd, lcfg);
3687         if (rc) {
3688                 ptlrpcd_decref();
3689         } else {
3690                 struct lprocfs_static_vars lvars;
3691                 struct client_obd *cli = &obd->u.cli;
3692
3693                 lprocfs_init_vars(osc, &lvars);
3694                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3695                         lproc_osc_attach_seqstat(obd);
3696                         ptlrpc_lprocfs_register_obd(obd);
3697                 }
3698
3699                 oscc_init(obd);
3700                 /* We need to allocate a few requests more, because
3701                    brw_interpret_oap tries to create new requests before freeing
3702                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3703                    reserved, but I afraid that might be too much wasted RAM
3704                    in fact, so 2 is just my guess and still should work. */
3705                 cli->cl_import->imp_rq_pool =
3706                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3707                                             OST_MAXREQSIZE,
3708                                             ptlrpc_add_rqs_to_pool);
3709         }
3710
3711         RETURN(rc);
3712 }
3713
3714 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3715 {
3716         int rc = 0;
3717         ENTRY;
3718
3719         switch (stage) {
3720         case OBD_CLEANUP_EARLY: {
3721                 struct obd_import *imp;
3722                 imp = obd->u.cli.cl_import;
3723                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3724                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3725                 ptlrpc_deactivate_import(imp);
3726                 spin_lock(&imp->imp_lock);
3727                 imp->imp_pingable = 0;
3728                 spin_unlock(&imp->imp_lock);
3729                 break;
3730         }
3731         case OBD_CLEANUP_EXPORTS: {
3732                 /* If we set up but never connected, the
3733                    client import will not have been cleaned. */
3734                 if (obd->u.cli.cl_import) {
3735                         struct obd_import *imp;
3736                         imp = obd->u.cli.cl_import;
3737                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3738                                obd->obd_name);
3739                         ptlrpc_invalidate_import(imp);
3740                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3741                         class_destroy_import(imp);
3742                         obd->u.cli.cl_import = NULL;
3743                 }
3744                 break;
3745         }
3746         case OBD_CLEANUP_SELF_EXP:
3747                 rc = obd_llog_finish(obd, 0);
3748                 if (rc != 0)
3749                         CERROR("failed to cleanup llogging subsystems\n");
3750                 break;
3751         case OBD_CLEANUP_OBD:
3752                 break;
3753         }
3754         RETURN(rc);
3755 }
3756
3757 int osc_cleanup(struct obd_device *obd)
3758 {
3759         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3760         int rc;
3761
3762         ENTRY;
3763         ptlrpc_lprocfs_unregister_obd(obd);
3764         lprocfs_obd_cleanup(obd);
3765
3766         spin_lock(&oscc->oscc_lock);
3767         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3768         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3769         spin_unlock(&oscc->oscc_lock);
3770
3771         /* free memory of osc quota cache */
3772         lquota_cleanup(quota_interface, obd);
3773
3774         rc = client_obd_cleanup(obd);
3775
3776         ptlrpcd_decref();
3777         RETURN(rc);
3778 }
3779
3780 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3781 {
3782         struct lustre_cfg *lcfg = buf;
3783         struct lprocfs_static_vars lvars;
3784         int rc = 0;
3785
3786         lprocfs_init_vars(osc, &lvars);
3787
3788         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3789         return(rc);
3790 }
3791
3792 struct obd_ops osc_obd_ops = {
3793         .o_owner                = THIS_MODULE,
3794         .o_setup                = osc_setup,
3795         .o_precleanup           = osc_precleanup,
3796         .o_cleanup              = osc_cleanup,
3797         .o_add_conn             = client_import_add_conn,
3798         .o_del_conn             = client_import_del_conn,
3799         .o_connect              = client_connect_import,
3800         .o_reconnect            = osc_reconnect,
3801         .o_disconnect           = osc_disconnect,
3802         .o_statfs               = osc_statfs,
3803         .o_statfs_async         = osc_statfs_async,
3804         .o_packmd               = osc_packmd,
3805         .o_unpackmd             = osc_unpackmd,
3806         .o_precreate            = osc_precreate,
3807         .o_create               = osc_create,
3808         .o_destroy              = osc_destroy,
3809         .o_getattr              = osc_getattr,
3810         .o_getattr_async        = osc_getattr_async,
3811         .o_setattr              = osc_setattr,
3812         .o_setattr_async        = osc_setattr_async,
3813         .o_brw                  = osc_brw,
3814         .o_brw_async            = osc_brw_async,
3815         .o_prep_async_page      = osc_prep_async_page,
3816         .o_queue_async_io       = osc_queue_async_io,
3817         .o_set_async_flags      = osc_set_async_flags,
3818         .o_queue_group_io       = osc_queue_group_io,
3819         .o_trigger_group_io     = osc_trigger_group_io,
3820         .o_teardown_async_page  = osc_teardown_async_page,
3821         .o_punch                = osc_punch,
3822         .o_sync                 = osc_sync,
3823         .o_enqueue              = osc_enqueue,
3824         .o_match                = osc_match,
3825         .o_change_cbdata        = osc_change_cbdata,
3826         .o_cancel               = osc_cancel,
3827         .o_cancel_unused        = osc_cancel_unused,
3828         .o_join_lru             = osc_join_lru,
3829         .o_iocontrol            = osc_iocontrol,
3830         .o_get_info             = osc_get_info,
3831         .o_set_info_async       = osc_set_info_async,
3832         .o_import_event         = osc_import_event,
3833         .o_llog_init            = osc_llog_init,
3834         .o_llog_finish          = osc_llog_finish,
3835         .o_process_config       = osc_process_config,
3836 };
3837 int __init osc_init(void)
3838 {
3839         struct lprocfs_static_vars lvars;
3840         int rc;
3841         ENTRY;
3842
3843         lprocfs_init_vars(osc, &lvars);
3844
3845         request_module("lquota");
3846         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3847         lquota_init(quota_interface);
3848         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3849
3850         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3851                                  LUSTRE_OSC_NAME, NULL);
3852         if (rc) {
3853                 if (quota_interface)
3854                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3855                 RETURN(rc);
3856         }
3857
3858         RETURN(rc);
3859 }
3860
3861 #ifdef __KERNEL__
3862 static void /*__exit*/ osc_exit(void)
3863 {
3864         lquota_exit(quota_interface);
3865         if (quota_interface)
3866                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3867
3868         class_unregister_type(LUSTRE_OSC_NAME);
3869 }
3870
3871 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3872 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3873 MODULE_LICENSE("GPL");
3874
3875 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3876 #endif