Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         RETURN(rc);
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         *aa->aa_oi->oi_oa = body->oa;
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
505
506         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507         /* overload the size and blocks fields in the oa with start/end */
508         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509         body->oa.o_size = oinfo->oi_policy.l_extent.start;
510         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
512
513         ptlrpc_req_set_repsize(req, 2, size);
514
515         req->rq_interpret_reply = osc_punch_interpret;
516         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517         aa = (struct osc_async_args *)&req->rq_async_args;
518         aa->aa_oi = oinfo;
519         ptlrpc_set_add_req(rqset, req);
520
521         RETURN(0);
522 }
523
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525                     struct lov_stripe_md *md, obd_size start, obd_size end,
526                     void *capa)
527 {
528         struct ptlrpc_request *req;
529         struct ost_body *body;
530         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
531         ENTRY;
532
533         if (!oa) {
534                 CERROR("oa NULL\n");
535                 RETURN(-EINVAL);
536         }
537
538         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
539
540         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541                               OST_SYNC, 3, size, NULL);
542         if (!req)
543                 RETURN(-ENOMEM);
544
545         /* overload the size and blocks fields in the oa with start/end */
546         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
547         body->oa = *oa;
548         body->oa.o_size = start;
549         body->oa.o_blocks = end;
550         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
551
552         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
553
554         ptlrpc_req_set_repsize(req, 2, size);
555
556         rc = ptlrpc_queue_wait(req);
557         if (rc)
558                 GOTO(out, rc);
559
560         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561                                   lustre_swab_ost_body);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO (out, rc = -EPROTO);
565         }
566
567         *oa = body->oa;
568
569         EXIT;
570  out:
571         ptlrpc_req_finished(req);
572         return rc;
573 }
574
575 /* Find and cancel locally locks matched by @mode in the resource found by
576  * @objid. Found locks are added into @cancel list. Returns the amount of
577  * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579                                    struct list_head *cancels, ldlm_mode_t mode,
580                                    int lock_flags)
581 {
582         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         int count;
586         ENTRY;
587
588         if (res == NULL)
589                 RETURN(0);
590
591         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592                                            lock_flags, 0, NULL);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
598                                  int rc)
599 {
600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601
602         atomic_dec(&cli->cl_destroy_in_flight);
603         cfs_waitq_signal(&cli->cl_destroy_waitq);
604         return 0;
605 }
606
607 static int osc_can_send_destroy(struct client_obd *cli)
608 {
609         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
610             cli->cl_max_rpcs_in_flight) {
611                 /* The destroy request can be sent */
612                 return 1;
613         }
614         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
615             cli->cl_max_rpcs_in_flight) {
616                 /*
617                  * The counter has been modified between the two atomic
618                  * operations.
619                  */
620                 cfs_waitq_signal(&cli->cl_destroy_waitq);
621         }
622         return 0;
623 }
624
625 /* Destroy requests can be async always on the client, and we don't even really
626  * care about the return code since the client cannot do anything at all about
627  * a destroy failure.
628  * When the MDS is unlinking a filename, it saves the file objects into a
629  * recovery llog, and these object records are cancelled when the OST reports
630  * they were destroyed and sync'd to disk (i.e. transaction committed).
631  * If the client dies, or the OST is down when the object should be destroyed,
632  * the records are not cancelled, and when the OST reconnects to the MDS next,
633  * it will retrieve the llog unlink logs and then sends the log cancellation
634  * cookies to the MDS after committing destroy transactions. */
635 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
636                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
637                        struct obd_export *md_export)
638 {
639         CFS_LIST_HEAD(cancels);
640         struct ptlrpc_request *req;
641         struct ost_body *body;
642         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
643         int count, bufcount = 2;
644         struct client_obd *cli = &exp->exp_obd->u.cli;
645         ENTRY;
646
647         if (!oa) {
648                 CERROR("oa NULL\n");
649                 RETURN(-EINVAL);
650         }
651
652         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
653                                         LDLM_FL_DISCARD_DATA);
654         if (exp_connect_cancelset(exp) && count)
655                 bufcount = 3;
656         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
657                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
658         if (!req)
659                 RETURN(-ENOMEM);
660
661         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
662         req->rq_interpret_reply = osc_destroy_interpret;
663
664         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
665         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
666                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
667                        sizeof(*oti->oti_logcookies));
668         body->oa = *oa;
669
670         ptlrpc_req_set_repsize(req, 2, size);
671
672         if (!osc_can_send_destroy(cli)) {
673                 struct l_wait_info lwi = { 0 };
674
675                 /*
676                  * Wait until the number of on-going destroy RPCs drops
677                  * under max_rpc_in_flight
678                  */
679                 l_wait_event_exclusive(cli->cl_destroy_waitq,
680                                        osc_can_send_destroy(cli), &lwi);
681         }
682
683         /* Do not wait for response */
684         ptlrpcd_add_req(req);
685         RETURN(0);
686 }
687
688 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
689                                 long writing_bytes)
690 {
691         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
692
693         LASSERT(!(oa->o_valid & bits));
694
695         oa->o_valid |= bits;
696         client_obd_list_lock(&cli->cl_loi_list_lock);
697         oa->o_dirty = cli->cl_dirty;
698         if (cli->cl_dirty > cli->cl_dirty_max) {
699                 CERROR("dirty %lu > dirty_max %lu\n",
700                        cli->cl_dirty, cli->cl_dirty_max);
701                 oa->o_undirty = 0;
702         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
703                 CERROR("dirty %d > system dirty_max %d\n",
704                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
705                 oa->o_undirty = 0;
706         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
707                 CERROR("dirty %lu - dirty_max %lu too big???\n",
708                        cli->cl_dirty, cli->cl_dirty_max);
709                 oa->o_undirty = 0;
710         } else {
711                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
712                                 (cli->cl_max_rpcs_in_flight + 1);
713                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
714         }
715         oa->o_grant = cli->cl_avail_grant;
716         oa->o_dropped = cli->cl_lost_grant;
717         cli->cl_lost_grant = 0;
718         client_obd_list_unlock(&cli->cl_loi_list_lock);
719         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
720                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
721 }
722
723 /* caller must hold loi_list_lock */
724 static void osc_consume_write_grant(struct client_obd *cli,
725                                     struct brw_page *pga)
726 {
727         atomic_inc(&obd_dirty_pages);
728         cli->cl_dirty += CFS_PAGE_SIZE;
729         cli->cl_avail_grant -= CFS_PAGE_SIZE;
730         pga->flag |= OBD_BRW_FROM_GRANT;
731         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
732                CFS_PAGE_SIZE, pga, pga->pg);
733         LASSERT(cli->cl_avail_grant >= 0);
734 }
735
736 /* the companion to osc_consume_write_grant, called when a brw has completed.
737  * must be called with the loi lock held. */
738 static void osc_release_write_grant(struct client_obd *cli,
739                                     struct brw_page *pga, int sent)
740 {
741         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
742         ENTRY;
743
744         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
745                 EXIT;
746                 return;
747         }
748
749         pga->flag &= ~OBD_BRW_FROM_GRANT;
750         atomic_dec(&obd_dirty_pages);
751         cli->cl_dirty -= CFS_PAGE_SIZE;
752         if (!sent) {
753                 cli->cl_lost_grant += CFS_PAGE_SIZE;
754                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
755                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
756         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
757                 /* For short writes we shouldn't count parts of pages that
758                  * span a whole block on the OST side, or our accounting goes
759                  * wrong.  Should match the code in filter_grant_check. */
760                 int offset = pga->off & ~CFS_PAGE_MASK;
761                 int count = pga->count + (offset & (blocksize - 1));
762                 int end = (offset + pga->count) & (blocksize - 1);
763                 if (end)
764                         count += blocksize - end;
765
766                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
767                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
768                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
769                        cli->cl_avail_grant, cli->cl_dirty);
770         }
771
772         EXIT;
773 }
774
775 static unsigned long rpcs_in_flight(struct client_obd *cli)
776 {
777         return cli->cl_r_in_flight + cli->cl_w_in_flight;
778 }
779
780 /* caller must hold loi_list_lock */
781 void osc_wake_cache_waiters(struct client_obd *cli)
782 {
783         struct list_head *l, *tmp;
784         struct osc_cache_waiter *ocw;
785
786         ENTRY;
787         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
788                 /* if we can't dirty more, we must wait until some is written */
789                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
790                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
791                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
792                                "osc max %ld, sys max %d\n", cli->cl_dirty,
793                                cli->cl_dirty_max, obd_max_dirty_pages);
794                         return;
795                 }
796
797                 /* if still dirty cache but no grant wait for pending RPCs that
798                  * may yet return us some grant before doing sync writes */
799                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
800                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
801                                cli->cl_w_in_flight);
802                         return;
803                 }
804
805                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
806                 list_del_init(&ocw->ocw_entry);
807                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
808                         /* no more RPCs in flight to return grant, do sync IO */
809                         ocw->ocw_rc = -EDQUOT;
810                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
811                 } else {
812                         osc_consume_write_grant(cli,
813                                                 &ocw->ocw_oap->oap_brw_page);
814                 }
815
816                 cfs_waitq_signal(&ocw->ocw_waitq);
817         }
818
819         EXIT;
820 }
821
822 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
823 {
824         client_obd_list_lock(&cli->cl_loi_list_lock);
825         cli->cl_avail_grant = ocd->ocd_grant;
826         client_obd_list_unlock(&cli->cl_loi_list_lock);
827
828         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
829                cli->cl_avail_grant, cli->cl_lost_grant);
830         LASSERT(cli->cl_avail_grant >= 0);
831 }
832
833 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
834 {
835         client_obd_list_lock(&cli->cl_loi_list_lock);
836         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
837         if (body->oa.o_valid & OBD_MD_FLGRANT)
838                 cli->cl_avail_grant += body->oa.o_grant;
839         /* waiters are woken in brw_interpret_oap */
840         client_obd_list_unlock(&cli->cl_loi_list_lock);
841 }
842
843 /* We assume that the reason this OSC got a short read is because it read
844  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
845  * via the LOV, and it _knows_ it's reading inside the file, it's just that
846  * this stripe never got written at or beyond this stripe offset yet. */
847 static void handle_short_read(int nob_read, obd_count page_count,
848                               struct brw_page **pga)
849 {
850         char *ptr;
851         int i = 0;
852
853         /* skip bytes read OK */
854         while (nob_read > 0) {
855                 LASSERT (page_count > 0);
856
857                 if (pga[i]->count > nob_read) {
858                         /* EOF inside this page */
859                         ptr = cfs_kmap(pga[i]->pg) +
860                                 (pga[i]->off & ~CFS_PAGE_MASK);
861                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
862                         cfs_kunmap(pga[i]->pg);
863                         page_count--;
864                         i++;
865                         break;
866                 }
867
868                 nob_read -= pga[i]->count;
869                 page_count--;
870                 i++;
871         }
872
873         /* zero remaining pages */
874         while (page_count-- > 0) {
875                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
876                 memset(ptr, 0, pga[i]->count);
877                 cfs_kunmap(pga[i]->pg);
878                 i++;
879         }
880 }
881
882 static int check_write_rcs(struct ptlrpc_request *req,
883                            int requested_nob, int niocount,
884                            obd_count page_count, struct brw_page **pga)
885 {
886         int    *remote_rcs, i;
887
888         /* return error if any niobuf was in error */
889         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
890                                         sizeof(*remote_rcs) * niocount, NULL);
891         if (remote_rcs == NULL) {
892                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
893                 return(-EPROTO);
894         }
895         if (lustre_msg_swabbed(req->rq_repmsg))
896                 for (i = 0; i < niocount; i++)
897                         __swab32s(&remote_rcs[i]);
898
899         for (i = 0; i < niocount; i++) {
900                 if (remote_rcs[i] < 0)
901                         return(remote_rcs[i]);
902
903                 if (remote_rcs[i] != 0) {
904                         CERROR("rc[%d] invalid (%d) req %p\n",
905                                 i, remote_rcs[i], req);
906                         return(-EPROTO);
907                 }
908         }
909
910         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
911                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
912                        requested_nob, req->rq_bulk->bd_nob_transferred);
913                 return(-EPROTO);
914         }
915
916         return (0);
917 }
918
919 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
920 {
921         if (p1->flag != p2->flag) {
922                 unsigned mask = ~OBD_BRW_FROM_GRANT;
923
924                 /* warn if we try to combine flags that we don't know to be
925                  * safe to combine */
926                 if ((p1->flag & mask) != (p2->flag & mask))
927                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
928                                "same brw?\n", p1->flag, p2->flag);
929                 return 0;
930         }
931
932         return (p1->off + p1->count == p2->off);
933 }
934
935 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
936                                    struct brw_page **pga, int opc)
937 {
938         __u32 cksum = ~0;
939         int i = 0;
940
941         LASSERT (pg_count > 0);
942         while (nob > 0 && pg_count > 0) {
943                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
944                 int off = pga[i]->off & ~CFS_PAGE_MASK;
945                 int count = pga[i]->count > nob ? nob : pga[i]->count;
946
947                 /* corrupt the data before we compute the checksum, to
948                  * simulate an OST->client data error */
949                 if (i == 0 && opc == OST_READ &&
950                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
951                         memcpy(ptr + off, "bad1", min(4, nob));
952                 cksum = crc32_le(cksum, ptr + off, count);
953                 cfs_kunmap(pga[i]->pg);
954                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
955                                off, cksum);
956
957                 nob -= pga[i]->count;
958                 pg_count--;
959                 i++;
960         }
961         /* For sending we only compute the wrong checksum instead
962          * of corrupting the data so it is still correct on a redo */
963         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
964                 cksum++;
965
966         return cksum;
967 }
968
969 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
970                                 struct lov_stripe_md *lsm, obd_count page_count,
971                                 struct brw_page **pga, 
972                                 struct ptlrpc_request **reqp,
973                                 struct obd_capa *ocapa)
974 {
975         struct ptlrpc_request   *req;
976         struct ptlrpc_bulk_desc *desc;
977         struct ost_body         *body;
978         struct obd_ioobj        *ioobj;
979         struct niobuf_remote    *niobuf;
980         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
981         int niocount, i, requested_nob, opc, rc;
982         struct ptlrpc_request_pool *pool;
983         struct lustre_capa      *capa;
984         struct osc_brw_async_args *aa;
985
986         ENTRY;
987         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
988                 RETURN(-ENOMEM); /* Recoverable */
989         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
990                 RETURN(-EINVAL); /* Fatal */
991
992         if ((cmd & OBD_BRW_WRITE) != 0) {
993                 opc = OST_WRITE;
994                 pool = cli->cl_import->imp_rq_pool;
995         } else {
996                 opc = OST_READ;
997                 pool = NULL;
998         }
999
1000         for (niocount = i = 1; i < page_count; i++) {
1001                 if (!can_merge_pages(pga[i - 1], pga[i]))
1002                         niocount++;
1003         }
1004
1005         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1006         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1007         if (ocapa)
1008                 size[REQ_REC_OFF + 3] = sizeof(*capa);
1009
1010         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
1011                                    size, NULL, pool, NULL);
1012         if (req == NULL)
1013                 RETURN (-ENOMEM);
1014
1015         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1016
1017         if (opc == OST_WRITE)
1018                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1019                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1020         else
1021                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1022                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1023         if (desc == NULL)
1024                 GOTO(out, rc = -ENOMEM);
1025         /* NB request now owns desc and will free it when it gets freed */
1026
1027         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1028         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1029         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1030                                 niocount * sizeof(*niobuf));
1031
1032         body->oa = *oa;
1033
1034         obdo_to_ioobj(oa, ioobj);
1035         ioobj->ioo_bufcnt = niocount;
1036         if (ocapa) {
1037                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1038                                       sizeof(*capa));
1039                 capa_cpy(capa, ocapa);
1040                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1041         }
1042
1043         LASSERT (page_count > 0);
1044         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1045                 struct brw_page *pg = pga[i];
1046                 struct brw_page *pg_prev = pga[i - 1];
1047
1048                 LASSERT(pg->count > 0);
1049                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1050                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1051                          pg->off, pg->count);
1052 #ifdef __linux__
1053                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1054                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1055                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1056                          i, page_count,
1057                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1058                          pg_prev->pg, page_private(pg_prev->pg),
1059                          pg_prev->pg->index, pg_prev->off);
1060 #else
1061                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1062                          "i %d p_c %u\n", i, page_count);
1063 #endif
1064                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1065                         (pg->flag & OBD_BRW_SRVLOCK));
1066
1067                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1068                                       pg->count);
1069                 requested_nob += pg->count;
1070
1071                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1072                         niobuf--;
1073                         niobuf->len += pg->count;
1074                 } else {
1075                         niobuf->offset = pg->off;
1076                         niobuf->len    = pg->count;
1077                         niobuf->flags  = pg->flag;
1078                 }
1079         }
1080
1081         LASSERT((void *)(niobuf - niocount) ==
1082                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1083                                niocount * sizeof(*niobuf)));
1084         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1085
1086         /* size[REQ_REC_OFF] still sizeof (*body) */
1087         if (opc == OST_WRITE) {
1088                 if (unlikely(cli->cl_checksum)) {
1089                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1090                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1091                                                              page_count, pga,
1092                                                              OST_WRITE);
1093                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1094                                body->oa.o_cksum);
1095                         /* save this in 'oa', too, for later checking */
1096                         oa->o_valid |= OBD_MD_FLCKSUM;
1097                 } else {
1098                         /* clear out the checksum flag, in case this is a
1099                          * resend but cl_checksum is no longer set. b=11238 */
1100                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1101                 }
1102                 oa->o_cksum = body->oa.o_cksum;
1103                 /* 1 RC per niobuf */
1104                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1105                 ptlrpc_req_set_repsize(req, 3, size);
1106         } else {
1107                 if (unlikely(cli->cl_checksum))
1108                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1109                 /* 1 RC for the whole I/O */
1110                 ptlrpc_req_set_repsize(req, 2, size);
1111         }
1112
1113         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1114         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1115         aa->aa_oa = oa;
1116         aa->aa_requested_nob = requested_nob;
1117         aa->aa_nio_count = niocount;
1118         aa->aa_page_count = page_count;
1119         aa->aa_resends = 0;
1120         aa->aa_ppga = pga;
1121         aa->aa_cli = cli;
1122         INIT_LIST_HEAD(&aa->aa_oaps);
1123
1124         *reqp = req;
1125         RETURN (0);
1126
1127  out:
1128         ptlrpc_req_finished (req);
1129         RETURN (rc);
1130 }
1131
1132 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1133                                 __u32 client_cksum, __u32 server_cksum,
1134                                 int nob, obd_count page_count,
1135                                 struct brw_page **pga)
1136 {
1137         __u32 new_cksum;
1138         char *msg;
1139
1140         if (server_cksum == client_cksum) {
1141                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1142                 return 0;
1143         }
1144
1145         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1146
1147         if (new_cksum == server_cksum)
1148                 msg = "changed on the client after we checksummed it - "
1149                       "likely false positive due to mmap IO (bug 11742)";
1150         else if (new_cksum == client_cksum)
1151                 msg = "changed in transit before arrival at OST";
1152         else
1153                 msg = "changed in transit AND doesn't match the original - "
1154                       "likely false positive due to mmap IO (bug 11742)";
1155
1156         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1157                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1158                            "["LPU64"-"LPU64"]\n",
1159                            msg, libcfs_nid2str(peer->nid),
1160                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1161                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1162                                                         (__u64)0,
1163                            oa->o_id,
1164                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1165                            pga[0]->off,
1166                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1167         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1168                client_cksum, server_cksum, new_cksum);
1169         return 1;        
1170 }
1171
1172 /* Note rc enters this function as number of bytes transferred */
1173 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1174 {
1175         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1176         const lnet_process_id_t *peer =
1177                         &req->rq_import->imp_connection->c_peer;
1178         struct client_obd *cli = aa->aa_cli;
1179         struct ost_body *body;
1180         __u32 client_cksum = 0;
1181         ENTRY;
1182
1183         if (rc < 0 && rc != -EDQUOT)
1184                 RETURN(rc);
1185
1186         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1187         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1188                                   lustre_swab_ost_body);
1189         if (body == NULL) {
1190                 CERROR ("Can't unpack body\n");
1191                 RETURN(-EPROTO);
1192         }
1193
1194         /* set/clear over quota flag for a uid/gid */
1195         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1196             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1197                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1198                              body->oa.o_gid, body->oa.o_valid,
1199                              body->oa.o_flags);
1200
1201         if (rc < 0)
1202                 RETURN(rc);
1203
1204         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1205                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1206
1207         osc_update_grant(cli, body);
1208
1209         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1210                 if (rc > 0) {
1211                         CERROR ("Unexpected +ve rc %d\n", rc);
1212                         RETURN(-EPROTO);
1213                 }
1214                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1215
1216                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1217                              client_cksum &&
1218                              check_write_checksum(&body->oa, peer, client_cksum,
1219                                                   body->oa.o_cksum,
1220                                                   aa->aa_requested_nob,
1221                                                   aa->aa_page_count,
1222                                                   aa->aa_ppga)))
1223                         RETURN(-EAGAIN);
1224
1225                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1226                         RETURN(-EAGAIN);
1227
1228                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1229                                      aa->aa_page_count, aa->aa_ppga);
1230                 GOTO(out, rc);
1231         }
1232
1233         /* The rest of this function executes only for OST_READs */
1234         if (rc > aa->aa_requested_nob) {
1235                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1236                        aa->aa_requested_nob);
1237                 RETURN(-EPROTO);
1238         }
1239
1240         if (rc != req->rq_bulk->bd_nob_transferred) {
1241                 CERROR ("Unexpected rc %d (%d transferred)\n",
1242                         rc, req->rq_bulk->bd_nob_transferred);
1243                 return (-EPROTO);
1244         }
1245
1246         if (rc < aa->aa_requested_nob)
1247                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1248
1249         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1250                                          aa->aa_ppga))
1251                 GOTO(out, rc = -EAGAIN);
1252
1253         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1254                 static int cksum_counter;
1255                 __u32      server_cksum = body->oa.o_cksum;
1256                 char      *via;
1257                 char      *router;
1258
1259                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1260                                                  aa->aa_ppga, OST_READ);
1261
1262                 if (peer->nid == req->rq_bulk->bd_sender) {
1263                         via = router = "";
1264                 } else {
1265                         via = " via ";
1266                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1267                 }
1268
1269                 if (server_cksum == ~0 && rc > 0) {
1270                         CERROR("Protocol error: server %s set the 'checksum' "
1271                                "bit, but didn't send a checksum.  Not fatal, "
1272                                "but please tell CFS.\n",
1273                                libcfs_nid2str(peer->nid));
1274                 } else if (server_cksum != client_cksum) {
1275                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1276                                            "%s%s%s inum "LPU64"/"LPU64" object "
1277                                            LPU64"/"LPU64" extent "
1278                                            "["LPU64"-"LPU64"]\n",
1279                                            req->rq_import->imp_obd->obd_name,
1280                                            libcfs_nid2str(peer->nid),
1281                                            via, router,
1282                                            body->oa.o_valid & OBD_MD_FLFID ?
1283                                                 body->oa.o_fid : (__u64)0,
1284                                            body->oa.o_valid & OBD_MD_FLFID ?
1285                                                 body->oa.o_generation :(__u64)0,
1286                                            body->oa.o_id,
1287                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1288                                                 body->oa.o_gr : (__u64)0,
1289                                            aa->aa_ppga[0]->off,
1290                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1291                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1292                                                                         1);
1293                         CERROR("client %x, server %x\n",
1294                                client_cksum, server_cksum);
1295                         cksum_counter = 0;
1296                         aa->aa_oa->o_cksum = client_cksum;
1297                         rc = -EAGAIN;
1298                 } else {
1299                         cksum_counter++;
1300                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1301                         rc = 0;
1302                 }
1303         } else if (unlikely(client_cksum)) {
1304                 static int cksum_missed;
1305
1306                 cksum_missed++;
1307                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1308                         CERROR("Checksum %u requested from %s but not sent\n",
1309                                cksum_missed, libcfs_nid2str(peer->nid));
1310         } else {
1311                 rc = 0;
1312         }
1313 out:
1314         if (rc >= 0)
1315                 *aa->aa_oa = body->oa;
1316
1317         RETURN(rc);
1318 }
1319
1320 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1321                             struct lov_stripe_md *lsm,
1322                             obd_count page_count, struct brw_page **pga,
1323                             struct obd_capa *ocapa)
1324 {
1325         struct ptlrpc_request *req;
1326         int                    rc;
1327         cfs_waitq_t            waitq;
1328         int                    resends = 0;
1329         struct l_wait_info     lwi;
1330
1331         ENTRY;
1332
1333         cfs_waitq_init(&waitq);
1334
1335 restart_bulk:
1336         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1337                                   page_count, pga, &req, ocapa);
1338         if (rc != 0)
1339                 return (rc);
1340
1341         rc = ptlrpc_queue_wait(req);
1342
1343         if (rc == -ETIMEDOUT && req->rq_resend) {
1344                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1345                 ptlrpc_req_finished(req);
1346                 goto restart_bulk;
1347         }
1348
1349         rc = osc_brw_fini_request(req, rc);
1350
1351         ptlrpc_req_finished(req);
1352         if (osc_recoverable_error(rc)) {
1353                 resends++;
1354                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1355                         CERROR("too many resend retries, returning error\n");
1356                         RETURN(-EIO);
1357                 }
1358
1359                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1360                 l_wait_event(waitq, 0, &lwi);
1361
1362                 goto restart_bulk;
1363         }
1364         
1365         RETURN (rc);
1366 }
1367
1368 int osc_brw_redo_request(struct ptlrpc_request *request,
1369                          struct osc_brw_async_args *aa)
1370 {
1371         struct ptlrpc_request *new_req;
1372         struct ptlrpc_request_set *set = request->rq_set;
1373         struct osc_brw_async_args *new_aa;
1374         struct osc_async_page *oap;
1375         int rc = 0;
1376         ENTRY;
1377
1378         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1379                 CERROR("too many resend retries, returning error\n");
1380                 RETURN(-EIO);
1381         }
1382         
1383         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1384 /*
1385         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1386         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1387                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1388                                            REQ_REC_OFF + 3);
1389 */
1390         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1391                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1392                                   aa->aa_cli, aa->aa_oa,
1393                                   NULL /* lsm unused by osc currently */,
1394                                   aa->aa_page_count, aa->aa_ppga, 
1395                                   &new_req, NULL /* ocapa */);
1396         if (rc)
1397                 RETURN(rc);
1398
1399         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1400    
1401         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1402                 if (oap->oap_request != NULL) {
1403                         LASSERTF(request == oap->oap_request,
1404                                  "request %p != oap_request %p\n",
1405                                  request, oap->oap_request);
1406                         if (oap->oap_interrupted) {
1407                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1408                                 ptlrpc_req_finished(new_req);                        
1409                                 RETURN(-EINTR);
1410                         }
1411                 }
1412         }
1413         /* New request takes over pga and oaps from old request.
1414          * Note that copying a list_head doesn't work, need to move it... */
1415         aa->aa_resends++;
1416         new_req->rq_interpret_reply = request->rq_interpret_reply;
1417         new_req->rq_async_args = request->rq_async_args;
1418         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1419
1420         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1421
1422         INIT_LIST_HEAD(&new_aa->aa_oaps);
1423         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1424         INIT_LIST_HEAD(&aa->aa_oaps);
1425
1426         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1427                 if (oap->oap_request) {
1428                         ptlrpc_req_finished(oap->oap_request);
1429                         oap->oap_request = ptlrpc_request_addref(new_req);
1430                 }
1431         }
1432
1433         /* use ptlrpc_set_add_req is safe because interpret functions work 
1434          * in check_set context. only one way exist with access to request 
1435          * from different thread got -EINTR - this way protected with 
1436          * cl_loi_list_lock */
1437         ptlrpc_set_add_req(set, new_req);
1438
1439         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1440
1441         DEBUG_REQ(D_INFO, new_req, "new request");
1442         RETURN(0);
1443 }
1444
1445 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1446 {
1447         struct osc_brw_async_args *aa = data;
1448         int                        i;
1449         ENTRY;
1450
1451         rc = osc_brw_fini_request(req, rc);
1452         if (osc_recoverable_error(rc)) {
1453                 rc = osc_brw_redo_request(req, aa);
1454                 if (rc == 0)
1455                         RETURN(0);
1456         }
1457
1458         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1459         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1460                 aa->aa_cli->cl_w_in_flight--;
1461         else
1462                 aa->aa_cli->cl_r_in_flight--;
1463         for (i = 0; i < aa->aa_page_count; i++)
1464                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1465         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1466
1467         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1468
1469         RETURN(rc);
1470 }
1471
1472 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1473                           struct lov_stripe_md *lsm, obd_count page_count,
1474                           struct brw_page **pga, struct ptlrpc_request_set *set,
1475                           struct obd_capa *ocapa)
1476 {
1477         struct ptlrpc_request     *req;
1478         struct client_obd         *cli = &exp->exp_obd->u.cli;
1479         int                        rc, i;
1480         struct osc_brw_async_args *aa;
1481         ENTRY;
1482
1483         /* Consume write credits even if doing a sync write -
1484          * otherwise we may run out of space on OST due to grant. */
1485         if (cmd == OBD_BRW_WRITE) {
1486                 spin_lock(&cli->cl_loi_list_lock);
1487                 for (i = 0; i < page_count; i++) {
1488                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1489                                 osc_consume_write_grant(cli, pga[i]);
1490                 }
1491                 spin_unlock(&cli->cl_loi_list_lock);
1492         }
1493
1494         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1495                                   &req, ocapa);
1496
1497         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1498         if (cmd == OBD_BRW_READ) {
1499                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1500                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1501                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1502         } else {
1503                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1504                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1505                                  cli->cl_w_in_flight);
1506                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1507         }
1508
1509         if (rc == 0) {
1510                 req->rq_interpret_reply = brw_interpret;
1511                 ptlrpc_set_add_req(set, req);
1512                 client_obd_list_lock(&cli->cl_loi_list_lock);
1513                 if (cmd == OBD_BRW_READ)
1514                         cli->cl_r_in_flight++;
1515                 else
1516                         cli->cl_w_in_flight++;
1517                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1518         } else if (cmd == OBD_BRW_WRITE) {
1519                 client_obd_list_lock(&cli->cl_loi_list_lock);
1520                 for (i = 0; i < page_count; i++)
1521                         osc_release_write_grant(cli, pga[i], 0);
1522                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1523         }
1524         RETURN (rc);
1525 }
1526
1527 /*
1528  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1529  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1530  * fine for our small page arrays and doesn't require allocation.  its an
1531  * insertion sort that swaps elements that are strides apart, shrinking the
1532  * stride down until its '1' and the array is sorted.
1533  */
1534 static void sort_brw_pages(struct brw_page **array, int num)
1535 {
1536         int stride, i, j;
1537         struct brw_page *tmp;
1538
1539         if (num == 1)
1540                 return;
1541         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1542                 ;
1543
1544         do {
1545                 stride /= 3;
1546                 for (i = stride ; i < num ; i++) {
1547                         tmp = array[i];
1548                         j = i;
1549                         while (j >= stride && array[j - stride]->off > tmp->off) {
1550                                 array[j] = array[j - stride];
1551                                 j -= stride;
1552                         }
1553                         array[j] = tmp;
1554                 }
1555         } while (stride > 1);
1556 }
1557
1558 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1559 {
1560         int count = 1;
1561         int offset;
1562         int i = 0;
1563
1564         LASSERT (pages > 0);
1565         offset = pg[i]->off & ~CFS_PAGE_MASK;
1566
1567         for (;;) {
1568                 pages--;
1569                 if (pages == 0)         /* that's all */
1570                         return count;
1571
1572                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1573                         return count;   /* doesn't end on page boundary */
1574
1575                 i++;
1576                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1577                 if (offset != 0)        /* doesn't start on page boundary */
1578                         return count;
1579
1580                 count++;
1581         }
1582 }
1583
1584 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1585 {
1586         struct brw_page **ppga;
1587         int i;
1588
1589         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1590         if (ppga == NULL)
1591                 return NULL;
1592
1593         for (i = 0; i < count; i++)
1594                 ppga[i] = pga + i;
1595         return ppga;
1596 }
1597
1598 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1599 {
1600         LASSERT(ppga != NULL);
1601         OBD_FREE(ppga, sizeof(*ppga) * count);
1602 }
1603
1604 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1605                    obd_count page_count, struct brw_page *pga,
1606                    struct obd_trans_info *oti)
1607 {
1608         struct obdo *saved_oa = NULL;
1609         struct brw_page **ppga, **orig;
1610         struct obd_import *imp = class_exp2cliimp(exp);
1611         struct client_obd *cli = &imp->imp_obd->u.cli;
1612         int rc, page_count_orig;
1613         ENTRY;
1614
1615         if (cmd & OBD_BRW_CHECK) {
1616                 /* The caller just wants to know if there's a chance that this
1617                  * I/O can succeed */
1618
1619                 if (imp == NULL || imp->imp_invalid)
1620                         RETURN(-EIO);
1621                 RETURN(0);
1622         }
1623
1624         /* test_brw with a failed create can trip this, maybe others. */
1625         LASSERT(cli->cl_max_pages_per_rpc);
1626
1627         rc = 0;
1628
1629         orig = ppga = osc_build_ppga(pga, page_count);
1630         if (ppga == NULL)
1631                 RETURN(-ENOMEM);
1632         page_count_orig = page_count;
1633
1634         sort_brw_pages(ppga, page_count);
1635         while (page_count) {
1636                 obd_count pages_per_brw;
1637
1638                 if (page_count > cli->cl_max_pages_per_rpc)
1639                         pages_per_brw = cli->cl_max_pages_per_rpc;
1640                 else
1641                         pages_per_brw = page_count;
1642
1643                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1644
1645                 if (saved_oa != NULL) {
1646                         /* restore previously saved oa */
1647                         *oinfo->oi_oa = *saved_oa;
1648                 } else if (page_count > pages_per_brw) {
1649                         /* save a copy of oa (brw will clobber it) */
1650                         OBDO_ALLOC(saved_oa);
1651                         if (saved_oa == NULL)
1652                                 GOTO(out, rc = -ENOMEM);
1653                         *saved_oa = *oinfo->oi_oa;
1654                 }
1655
1656                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1657                                       pages_per_brw, ppga, oinfo->oi_capa);
1658
1659                 if (rc != 0)
1660                         break;
1661
1662                 page_count -= pages_per_brw;
1663                 ppga += pages_per_brw;
1664         }
1665
1666 out:
1667         osc_release_ppga(orig, page_count_orig);
1668
1669         if (saved_oa != NULL)
1670                 OBDO_FREE(saved_oa);
1671
1672         RETURN(rc);
1673 }
1674
1675 static int osc_brw_async(int cmd, struct obd_export *exp,
1676                          struct obd_info *oinfo, obd_count page_count,
1677                          struct brw_page *pga, struct obd_trans_info *oti,
1678                          struct ptlrpc_request_set *set)
1679 {
1680         struct brw_page **ppga, **orig;
1681         struct client_obd *cli = &exp->exp_obd->u.cli;
1682         int page_count_orig;
1683         int rc = 0;
1684         ENTRY;
1685
1686         if (cmd & OBD_BRW_CHECK) {
1687                 struct obd_import *imp = class_exp2cliimp(exp);
1688                 /* The caller just wants to know if there's a chance that this
1689                  * I/O can succeed */
1690
1691                 if (imp == NULL || imp->imp_invalid)
1692                         RETURN(-EIO);
1693                 RETURN(0);
1694         }
1695
1696         orig = ppga = osc_build_ppga(pga, page_count);
1697         if (ppga == NULL)
1698                 RETURN(-ENOMEM);
1699         page_count_orig = page_count;
1700
1701         sort_brw_pages(ppga, page_count);
1702         while (page_count) {
1703                 struct brw_page **copy;
1704                 obd_count pages_per_brw;
1705
1706                 pages_per_brw = min_t(obd_count, page_count,
1707                                       cli->cl_max_pages_per_rpc);
1708
1709                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1710
1711                 /* use ppga only if single RPC is going to fly */
1712                 if (pages_per_brw != page_count_orig || ppga != orig) {
1713                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1714                         if (copy == NULL)
1715                                 GOTO(out, rc = -ENOMEM);
1716                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1717                 } else
1718                         copy = ppga;
1719
1720                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1721                                     pages_per_brw, copy, set, oinfo->oi_capa);
1722
1723                 if (rc != 0) {
1724                         if (copy != ppga)
1725                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1726                         break;
1727                 }
1728                 if (copy == orig) {
1729                         /* we passed it to async_internal() which is
1730                          * now responsible for releasing memory */
1731                         orig = NULL;
1732                 }
1733
1734                 page_count -= pages_per_brw;
1735                 ppga += pages_per_brw;
1736         }
1737 out:
1738         if (orig)
1739                 osc_release_ppga(orig, page_count_orig);
1740         RETURN(rc);
1741 }
1742
1743 static void osc_check_rpcs(struct client_obd *cli);
1744
1745 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1746  * the dirty accounting.  Writeback completes or truncate happens before
1747  * writing starts.  Must be called with the loi lock held. */
1748 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1749                            int sent)
1750 {
1751         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1752 }
1753
1754
1755 /* This maintains the lists of pending pages to read/write for a given object
1756  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1757  * to quickly find objects that are ready to send an RPC. */
1758 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1759                          int cmd)
1760 {
1761         int optimal;
1762         ENTRY;
1763
1764         if (lop->lop_num_pending == 0)
1765                 RETURN(0);
1766
1767         /* if we have an invalid import we want to drain the queued pages
1768          * by forcing them through rpcs that immediately fail and complete
1769          * the pages.  recovery relies on this to empty the queued pages
1770          * before canceling the locks and evicting down the llite pages */
1771         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1772                 RETURN(1);
1773
1774         /* stream rpcs in queue order as long as as there is an urgent page
1775          * queued.  this is our cheap solution for good batching in the case
1776          * where writepage marks some random page in the middle of the file
1777          * as urgent because of, say, memory pressure */
1778         if (!list_empty(&lop->lop_urgent)) {
1779                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1780                 RETURN(1);
1781         }
1782         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1783         optimal = cli->cl_max_pages_per_rpc;
1784         if (cmd & OBD_BRW_WRITE) {
1785                 /* trigger a write rpc stream as long as there are dirtiers
1786                  * waiting for space.  as they're waiting, they're not going to
1787                  * create more pages to coallesce with what's waiting.. */
1788                 if (!list_empty(&cli->cl_cache_waiters)) {
1789                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1790                         RETURN(1);
1791                 }
1792                 /* +16 to avoid triggering rpcs that would want to include pages
1793                  * that are being queued but which can't be made ready until
1794                  * the queuer finishes with the page. this is a wart for
1795                  * llite::commit_write() */
1796                 optimal += 16;
1797         }
1798         if (lop->lop_num_pending >= optimal)
1799                 RETURN(1);
1800
1801         RETURN(0);
1802 }
1803
1804 static void on_list(struct list_head *item, struct list_head *list,
1805                     int should_be_on)
1806 {
1807         if (list_empty(item) && should_be_on)
1808                 list_add_tail(item, list);
1809         else if (!list_empty(item) && !should_be_on)
1810                 list_del_init(item);
1811 }
1812
1813 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1814  * can find pages to build into rpcs quickly */
1815 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1816 {
1817         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1818                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1819                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1820
1821         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1822                 loi->loi_write_lop.lop_num_pending);
1823
1824         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1825                 loi->loi_read_lop.lop_num_pending);
1826 }
1827
1828 static void lop_update_pending(struct client_obd *cli,
1829                                struct loi_oap_pages *lop, int cmd, int delta)
1830 {
1831         lop->lop_num_pending += delta;
1832         if (cmd & OBD_BRW_WRITE)
1833                 cli->cl_pending_w_pages += delta;
1834         else
1835                 cli->cl_pending_r_pages += delta;
1836 }
1837
1838 /* this is called when a sync waiter receives an interruption.  Its job is to
1839  * get the caller woken as soon as possible.  If its page hasn't been put in an
1840  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1841  * desiring interruption which will forcefully complete the rpc once the rpc
1842  * has timed out */
1843 static void osc_occ_interrupted(struct oig_callback_context *occ)
1844 {
1845         struct osc_async_page *oap;
1846         struct loi_oap_pages *lop;
1847         struct lov_oinfo *loi;
1848         ENTRY;
1849
1850         /* XXX member_of() */
1851         oap = list_entry(occ, struct osc_async_page, oap_occ);
1852
1853         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1854
1855         oap->oap_interrupted = 1;
1856
1857         /* ok, it's been put in an rpc. only one oap gets a request reference */
1858         if (oap->oap_request != NULL) {
1859                 ptlrpc_mark_interrupted(oap->oap_request);
1860                 ptlrpcd_wake(oap->oap_request);
1861                 GOTO(unlock, 0);
1862         }
1863
1864         /* we don't get interruption callbacks until osc_trigger_group_io()
1865          * has been called and put the sync oaps in the pending/urgent lists.*/
1866         if (!list_empty(&oap->oap_pending_item)) {
1867                 list_del_init(&oap->oap_pending_item);
1868                 list_del_init(&oap->oap_urgent_item);
1869
1870                 loi = oap->oap_loi;
1871                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1872                         &loi->loi_write_lop : &loi->loi_read_lop;
1873                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1874                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1875
1876                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1877                 oap->oap_oig = NULL;
1878         }
1879
1880 unlock:
1881         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1882 }
1883
1884 /* this is trying to propogate async writeback errors back up to the
1885  * application.  As an async write fails we record the error code for later if
1886  * the app does an fsync.  As long as errors persist we force future rpcs to be
1887  * sync so that the app can get a sync error and break the cycle of queueing
1888  * pages for which writeback will fail. */
1889 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1890                            int rc)
1891 {
1892         if (rc) {
1893                 if (!ar->ar_rc)
1894                         ar->ar_rc = rc;
1895
1896                 ar->ar_force_sync = 1;
1897                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1898                 return;
1899
1900         }
1901
1902         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1903                 ar->ar_force_sync = 0;
1904 }
1905
1906 static void osc_oap_to_pending(struct osc_async_page *oap)
1907 {
1908         struct loi_oap_pages *lop;
1909
1910         if (oap->oap_cmd & OBD_BRW_WRITE)
1911                 lop = &oap->oap_loi->loi_write_lop;
1912         else
1913                 lop = &oap->oap_loi->loi_read_lop;
1914
1915         if (oap->oap_async_flags & ASYNC_URGENT)
1916                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1917         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1918         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1919 }
1920
1921 /* this must be called holding the loi list lock to give coverage to exit_cache,
1922  * async_flag maintenance, and oap_request */
1923 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1924                               struct osc_async_page *oap, int sent, int rc)
1925 {
1926         __u64 xid = 0;
1927
1928         ENTRY;
1929         if (oap->oap_request != NULL) {
1930                 xid = ptlrpc_req_xid(oap->oap_request);
1931                 ptlrpc_req_finished(oap->oap_request);
1932                 oap->oap_request = NULL;
1933         }
1934
1935         oap->oap_async_flags = 0;
1936         oap->oap_interrupted = 0;
1937
1938         if (oap->oap_cmd & OBD_BRW_WRITE) {
1939                 osc_process_ar(&cli->cl_ar, xid, rc);
1940                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1941         }
1942
1943         if (rc == 0 && oa != NULL) {
1944                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1945                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1946                 if (oa->o_valid & OBD_MD_FLMTIME)
1947                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1948                 if (oa->o_valid & OBD_MD_FLATIME)
1949                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1950                 if (oa->o_valid & OBD_MD_FLCTIME)
1951                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1952         }
1953
1954         if (oap->oap_oig) {
1955                 osc_exit_cache(cli, oap, sent);
1956                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1957                 oap->oap_oig = NULL;
1958                 EXIT;
1959                 return;
1960         }
1961
1962         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1963                                                 oap->oap_cmd, oa, rc);
1964
1965         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1966          * I/O on the page could start, but OSC calls it under lock
1967          * and thus we can add oap back to pending safely */
1968         if (rc)
1969                 /* upper layer wants to leave the page on pending queue */
1970                 osc_oap_to_pending(oap);
1971         else
1972                 osc_exit_cache(cli, oap, sent);
1973         EXIT;
1974 }
1975
1976 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1977 {
1978         struct osc_async_page *oap, *tmp;
1979         struct osc_brw_async_args *aa = data;
1980         struct client_obd *cli;
1981         ENTRY;
1982
1983         rc = osc_brw_fini_request(req, rc);
1984         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1985         if (osc_recoverable_error(rc)) {
1986                 rc = osc_brw_redo_request(req, aa);
1987                 if (rc == 0)
1988                         RETURN(0);
1989         }
1990
1991         cli = aa->aa_cli;
1992
1993         client_obd_list_lock(&cli->cl_loi_list_lock);
1994
1995         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1996          * is called so we know whether to go to sync BRWs or wait for more
1997          * RPCs to complete */
1998         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1999                 cli->cl_w_in_flight--;
2000         else
2001                 cli->cl_r_in_flight--;
2002
2003         /* the caller may re-use the oap after the completion call so
2004          * we need to clean it up a little */
2005         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2006                 list_del_init(&oap->oap_rpc_item);
2007                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2008         }
2009
2010         osc_wake_cache_waiters(cli);
2011         osc_check_rpcs(cli);
2012
2013         client_obd_list_unlock(&cli->cl_loi_list_lock);
2014
2015         OBDO_FREE(aa->aa_oa);
2016         
2017         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2018         RETURN(rc);
2019 }
2020
2021 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2022                                             struct list_head *rpc_list,
2023                                             int page_count, int cmd)
2024 {
2025         struct ptlrpc_request *req;
2026         struct brw_page **pga = NULL;
2027         struct osc_brw_async_args *aa;
2028         struct obdo *oa = NULL;
2029         struct obd_async_page_ops *ops = NULL;
2030         void *caller_data = NULL;
2031         struct obd_capa *ocapa;
2032         struct osc_async_page *oap;
2033         int i, rc;
2034
2035         ENTRY;
2036         LASSERT(!list_empty(rpc_list));
2037
2038         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2039         if (pga == NULL)
2040                 RETURN(ERR_PTR(-ENOMEM));
2041
2042         OBDO_ALLOC(oa);
2043         if (oa == NULL)
2044                 GOTO(out, req = ERR_PTR(-ENOMEM));
2045
2046         i = 0;
2047         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2048                 if (ops == NULL) {
2049                         ops = oap->oap_caller_ops;
2050                         caller_data = oap->oap_caller_data;
2051                 }
2052                 pga[i] = &oap->oap_brw_page;
2053                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2054                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2055                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2056                 i++;
2057         }
2058
2059         /* always get the data for the obdo for the rpc */
2060         LASSERT(ops != NULL);
2061         ops->ap_fill_obdo(caller_data, cmd, oa);
2062         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2063
2064         sort_brw_pages(pga, page_count);
2065         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2066                                   pga, &req, ocapa);
2067         capa_put(ocapa);
2068         if (rc != 0) {
2069                 CERROR("prep_req failed: %d\n", rc);
2070                 GOTO(out, req = ERR_PTR(rc));
2071         }
2072
2073         /* Need to update the timestamps after the request is built in case
2074          * we race with setattr (locally or in queue at OST).  If OST gets
2075          * later setattr before earlier BRW (as determined by the request xid),
2076          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2077          * way to do this in a single call.  bug 10150 */
2078         ops->ap_update_obdo(caller_data, cmd, oa,
2079                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2080
2081         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2082         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2083         INIT_LIST_HEAD(&aa->aa_oaps);
2084         list_splice(rpc_list, &aa->aa_oaps);
2085         INIT_LIST_HEAD(rpc_list);
2086
2087 out:
2088         if (IS_ERR(req)) {
2089                 if (oa)
2090                         OBDO_FREE(oa);
2091                 if (pga)
2092                         OBD_FREE(pga, sizeof(*pga) * page_count);
2093         }
2094         RETURN(req);
2095 }
2096
2097 /* the loi lock is held across this function but it's allowed to release
2098  * and reacquire it during its work */
2099 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2100                             int cmd, struct loi_oap_pages *lop)
2101 {
2102         struct ptlrpc_request *req;
2103         obd_count page_count = 0;
2104         struct osc_async_page *oap = NULL, *tmp;
2105         struct osc_brw_async_args *aa;
2106         struct obd_async_page_ops *ops;
2107         CFS_LIST_HEAD(rpc_list);
2108         unsigned int ending_offset;
2109         unsigned  starting_offset = 0;
2110         ENTRY;
2111
2112         /* first we find the pages we're allowed to work with */
2113         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2114                                  oap_pending_item) {
2115                 ops = oap->oap_caller_ops;
2116
2117                 LASSERT(oap->oap_magic == OAP_MAGIC);
2118
2119                 /* in llite being 'ready' equates to the page being locked
2120                  * until completion unlocks it.  commit_write submits a page
2121                  * as not ready because its unlock will happen unconditionally
2122                  * as the call returns.  if we race with commit_write giving
2123                  * us that page we dont' want to create a hole in the page
2124                  * stream, so we stop and leave the rpc to be fired by
2125                  * another dirtier or kupdated interval (the not ready page
2126                  * will still be on the dirty list).  we could call in
2127                  * at the end of ll_file_write to process the queue again. */
2128                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2129                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2130                         if (rc < 0)
2131                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2132                                                 "instead of ready\n", oap,
2133                                                 oap->oap_page, rc);
2134                         switch (rc) {
2135                         case -EAGAIN:
2136                                 /* llite is telling us that the page is still
2137                                  * in commit_write and that we should try
2138                                  * and put it in an rpc again later.  we
2139                                  * break out of the loop so we don't create
2140                                  * a hole in the sequence of pages in the rpc
2141                                  * stream.*/
2142                                 oap = NULL;
2143                                 break;
2144                         case -EINTR:
2145                                 /* the io isn't needed.. tell the checks
2146                                  * below to complete the rpc with EINTR */
2147                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2148                                 oap->oap_count = -EINTR;
2149                                 break;
2150                         case 0:
2151                                 oap->oap_async_flags |= ASYNC_READY;
2152                                 break;
2153                         default:
2154                                 LASSERTF(0, "oap %p page %p returned %d "
2155                                             "from make_ready\n", oap,
2156                                             oap->oap_page, rc);
2157                                 break;
2158                         }
2159                 }
2160                 if (oap == NULL)
2161                         break;
2162                 /*
2163                  * Page submitted for IO has to be locked. Either by
2164                  * ->ap_make_ready() or by higher layers.
2165                  *
2166                  * XXX nikita: this assertion should be adjusted when lustre
2167                  * starts using PG_writeback for pages being written out.
2168                  */
2169 #if defined(__KERNEL__) && defined(__linux__)
2170                 LASSERT(PageLocked(oap->oap_page));
2171 #endif
2172                 /* If there is a gap at the start of this page, it can't merge
2173                  * with any previous page, so we'll hand the network a
2174                  * "fragmented" page array that it can't transfer in 1 RDMA */
2175                 if (page_count != 0 && oap->oap_page_off != 0)
2176                         break;
2177
2178                 /* take the page out of our book-keeping */
2179                 list_del_init(&oap->oap_pending_item);
2180                 lop_update_pending(cli, lop, cmd, -1);
2181                 list_del_init(&oap->oap_urgent_item);
2182
2183                 if (page_count == 0)
2184                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2185                                           (PTLRPC_MAX_BRW_SIZE - 1);
2186
2187                 /* ask the caller for the size of the io as the rpc leaves. */
2188                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2189                         oap->oap_count =
2190                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2191                 if (oap->oap_count <= 0) {
2192                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2193                                oap->oap_count);
2194                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2195                         continue;
2196                 }
2197
2198                 /* now put the page back in our accounting */
2199                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2200                 if (++page_count >= cli->cl_max_pages_per_rpc)
2201                         break;
2202
2203                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2204                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2205                  * have the same alignment as the initial writes that allocated
2206                  * extents on the server. */
2207                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2208                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2209                 if (ending_offset == 0)
2210                         break;
2211
2212                 /* If there is a gap at the end of this page, it can't merge
2213                  * with any subsequent pages, so we'll hand the network a
2214                  * "fragmented" page array that it can't transfer in 1 RDMA */
2215                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2216                         break;
2217         }
2218
2219         osc_wake_cache_waiters(cli);
2220
2221         if (page_count == 0)
2222                 RETURN(0);
2223
2224         loi_list_maint(cli, loi);
2225
2226         client_obd_list_unlock(&cli->cl_loi_list_lock);
2227
2228         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2229         if (IS_ERR(req)) {
2230                 /* this should happen rarely and is pretty bad, it makes the
2231                  * pending list not follow the dirty order */
2232                 client_obd_list_lock(&cli->cl_loi_list_lock);
2233                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2234                         list_del_init(&oap->oap_rpc_item);
2235
2236                         /* queued sync pages can be torn down while the pages
2237                          * were between the pending list and the rpc */
2238                         if (oap->oap_interrupted) {
2239                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2240                                 osc_ap_completion(cli, NULL, oap, 0,
2241                                                   oap->oap_count);
2242                                 continue;
2243                         }
2244                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2245                 }
2246                 loi_list_maint(cli, loi);
2247                 RETURN(PTR_ERR(req));
2248         }
2249
2250         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2251
2252         if (cmd == OBD_BRW_READ) {
2253                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2254                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2255                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2256                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2257                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2258         } else {
2259                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2260                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2261                                  cli->cl_w_in_flight);
2262                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2263                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2264                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2265         }
2266
2267         client_obd_list_lock(&cli->cl_loi_list_lock);
2268
2269         if (cmd == OBD_BRW_READ)
2270                 cli->cl_r_in_flight++;
2271         else
2272                 cli->cl_w_in_flight++;
2273
2274         /* queued sync pages can be torn down while the pages
2275          * were between the pending list and the rpc */
2276         tmp = NULL;
2277         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2278                 /* only one oap gets a request reference */
2279                 if (tmp == NULL)
2280                         tmp = oap;
2281                 if (oap->oap_interrupted && !req->rq_intr) {
2282                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2283                                oap, req);
2284                         ptlrpc_mark_interrupted(req);
2285                 }
2286         }
2287         if (tmp != NULL)
2288                 tmp->oap_request = ptlrpc_request_addref(req);
2289
2290         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2291                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2292
2293         req->rq_interpret_reply = brw_interpret_oap;
2294         ptlrpcd_add_req(req);
2295         RETURN(1);
2296 }
2297
2298 #define LOI_DEBUG(LOI, STR, args...)                                     \
2299         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2300                !list_empty(&(LOI)->loi_cli_item),                        \
2301                (LOI)->loi_write_lop.lop_num_pending,                     \
2302                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2303                (LOI)->loi_read_lop.lop_num_pending,                      \
2304                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2305                args)                                                     \
2306
2307 /* This is called by osc_check_rpcs() to find which objects have pages that
2308  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2309 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2310 {
2311         ENTRY;
2312         /* first return all objects which we already know to have
2313          * pages ready to be stuffed into rpcs */
2314         if (!list_empty(&cli->cl_loi_ready_list))
2315                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2316                                   struct lov_oinfo, loi_cli_item));
2317
2318         /* then if we have cache waiters, return all objects with queued
2319          * writes.  This is especially important when many small files
2320          * have filled up the cache and not been fired into rpcs because
2321          * they don't pass the nr_pending/object threshhold */
2322         if (!list_empty(&cli->cl_cache_waiters) &&
2323             !list_empty(&cli->cl_loi_write_list))
2324                 RETURN(list_entry(cli->cl_loi_write_list.next,
2325                                   struct lov_oinfo, loi_write_item));
2326
2327         /* then return all queued objects when we have an invalid import
2328          * so that they get flushed */
2329         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2330                 if (!list_empty(&cli->cl_loi_write_list))
2331                         RETURN(list_entry(cli->cl_loi_write_list.next,
2332                                           struct lov_oinfo, loi_write_item));
2333                 if (!list_empty(&cli->cl_loi_read_list))
2334                         RETURN(list_entry(cli->cl_loi_read_list.next,
2335                                           struct lov_oinfo, loi_read_item));
2336         }
2337         RETURN(NULL);
2338 }
2339
2340 /* called with the loi list lock held */
2341 static void osc_check_rpcs(struct client_obd *cli)
2342 {
2343         struct lov_oinfo *loi;
2344         int rc = 0, race_counter = 0;
2345         ENTRY;
2346
2347         while ((loi = osc_next_loi(cli)) != NULL) {
2348                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2349
2350                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2351                         break;
2352
2353                 /* attempt some read/write balancing by alternating between
2354                  * reads and writes in an object.  The makes_rpc checks here
2355                  * would be redundant if we were getting read/write work items
2356                  * instead of objects.  we don't want send_oap_rpc to drain a
2357                  * partial read pending queue when we're given this object to
2358                  * do io on writes while there are cache waiters */
2359                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2360                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2361                                               &loi->loi_write_lop);
2362                         if (rc < 0)
2363                                 break;
2364                         if (rc > 0)
2365                                 race_counter = 0;
2366                         else
2367                                 race_counter++;
2368                 }
2369                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2370                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2371                                               &loi->loi_read_lop);
2372                         if (rc < 0)
2373                                 break;
2374                         if (rc > 0)
2375                                 race_counter = 0;
2376                         else
2377                                 race_counter++;
2378                 }
2379
2380                 /* attempt some inter-object balancing by issueing rpcs
2381                  * for each object in turn */
2382                 if (!list_empty(&loi->loi_cli_item))
2383                         list_del_init(&loi->loi_cli_item);
2384                 if (!list_empty(&loi->loi_write_item))
2385                         list_del_init(&loi->loi_write_item);
2386                 if (!list_empty(&loi->loi_read_item))
2387                         list_del_init(&loi->loi_read_item);
2388
2389                 loi_list_maint(cli, loi);
2390
2391                 /* send_oap_rpc fails with 0 when make_ready tells it to
2392                  * back off.  llite's make_ready does this when it tries
2393                  * to lock a page queued for write that is already locked.
2394                  * we want to try sending rpcs from many objects, but we
2395                  * don't want to spin failing with 0.  */
2396                 if (race_counter == 10)
2397                         break;
2398         }
2399         EXIT;
2400 }
2401
2402 /* we're trying to queue a page in the osc so we're subject to the
2403  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2404  * If the osc's queued pages are already at that limit, then we want to sleep
2405  * until there is space in the osc's queue for us.  We also may be waiting for
2406  * write credits from the OST if there are RPCs in flight that may return some
2407  * before we fall back to sync writes.
2408  *
2409  * We need this know our allocation was granted in the presence of signals */
2410 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2411 {
2412         int rc;
2413         ENTRY;
2414         client_obd_list_lock(&cli->cl_loi_list_lock);
2415         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2416         client_obd_list_unlock(&cli->cl_loi_list_lock);
2417         RETURN(rc);
2418 };
2419
2420 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2421  * grant or cache space. */
2422 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2423                            struct osc_async_page *oap)
2424 {
2425         struct osc_cache_waiter ocw;
2426         struct l_wait_info lwi = { 0 };
2427
2428         ENTRY;
2429
2430         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2431                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2432                cli->cl_dirty_max, obd_max_dirty_pages,
2433                cli->cl_lost_grant, cli->cl_avail_grant);
2434
2435         /* force the caller to try sync io.  this can jump the list
2436          * of queued writes and create a discontiguous rpc stream */
2437         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2438             loi->loi_ar.ar_force_sync)
2439                 RETURN(-EDQUOT);
2440
2441         /* Hopefully normal case - cache space and write credits available */
2442         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2443             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2444             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2445                 /* account for ourselves */
2446                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2447                 RETURN(0);
2448         }
2449
2450         /* Make sure that there are write rpcs in flight to wait for.  This
2451          * is a little silly as this object may not have any pending but
2452          * other objects sure might. */
2453         if (cli->cl_w_in_flight) {
2454                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2455                 cfs_waitq_init(&ocw.ocw_waitq);
2456                 ocw.ocw_oap = oap;
2457                 ocw.ocw_rc = 0;
2458
2459                 loi_list_maint(cli, loi);
2460                 osc_check_rpcs(cli);
2461                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2462
2463                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2464                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2465
2466                 client_obd_list_lock(&cli->cl_loi_list_lock);
2467                 if (!list_empty(&ocw.ocw_entry)) {
2468                         list_del(&ocw.ocw_entry);
2469                         RETURN(-EINTR);
2470                 }
2471                 RETURN(ocw.ocw_rc);
2472         }
2473
2474         RETURN(-EDQUOT);
2475 }
2476
2477 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2478                         struct lov_oinfo *loi, cfs_page_t *page,
2479                         obd_off offset, struct obd_async_page_ops *ops,
2480                         void *data, void **res)
2481 {
2482         struct osc_async_page *oap;
2483         ENTRY;
2484
2485         if (!page)
2486                 return size_round(sizeof(*oap));
2487
2488         oap = *res;
2489         oap->oap_magic = OAP_MAGIC;
2490         oap->oap_cli = &exp->exp_obd->u.cli;
2491         oap->oap_loi = loi;
2492
2493         oap->oap_caller_ops = ops;
2494         oap->oap_caller_data = data;
2495
2496         oap->oap_page = page;
2497         oap->oap_obj_off = offset;
2498
2499         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2500         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2501         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2502
2503         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2504
2505         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2506         RETURN(0);
2507 }
2508
2509 struct osc_async_page *oap_from_cookie(void *cookie)
2510 {
2511         struct osc_async_page *oap = cookie;
2512         if (oap->oap_magic != OAP_MAGIC)
2513                 return ERR_PTR(-EINVAL);
2514         return oap;
2515 };
2516
2517 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2518                               struct lov_oinfo *loi, void *cookie,
2519                               int cmd, obd_off off, int count,
2520                               obd_flag brw_flags, enum async_flags async_flags)
2521 {
2522         struct client_obd *cli = &exp->exp_obd->u.cli;
2523         struct osc_async_page *oap;
2524         int rc = 0;
2525         ENTRY;
2526
2527         oap = oap_from_cookie(cookie);
2528         if (IS_ERR(oap))
2529                 RETURN(PTR_ERR(oap));
2530
2531         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2532                 RETURN(-EIO);
2533
2534         if (!list_empty(&oap->oap_pending_item) ||
2535             !list_empty(&oap->oap_urgent_item) ||
2536             !list_empty(&oap->oap_rpc_item))
2537                 RETURN(-EBUSY);
2538
2539         /* check if the file's owner/group is over quota */
2540 #ifdef HAVE_QUOTA_SUPPORT
2541         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2542                 struct obd_async_page_ops *ops;
2543                 struct obdo *oa;
2544
2545                 OBDO_ALLOC(oa);
2546                 if (oa == NULL)
2547                         RETURN(-ENOMEM);
2548
2549                 ops = oap->oap_caller_ops;
2550                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2551                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2552                     NO_QUOTA)
2553                         rc = -EDQUOT;
2554
2555                 OBDO_FREE(oa);
2556                 if (rc)
2557                         RETURN(rc);
2558         }
2559 #endif
2560
2561         if (loi == NULL)
2562                 loi = lsm->lsm_oinfo[0];
2563
2564         client_obd_list_lock(&cli->cl_loi_list_lock);
2565
2566         oap->oap_cmd = cmd;
2567         oap->oap_page_off = off;
2568         oap->oap_count = count;
2569         oap->oap_brw_flags = brw_flags;
2570         oap->oap_async_flags = async_flags;
2571
2572         if (cmd & OBD_BRW_WRITE) {
2573                 rc = osc_enter_cache(cli, loi, oap);
2574                 if (rc) {
2575                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2576                         RETURN(rc);
2577                 }
2578         }
2579
2580         osc_oap_to_pending(oap);
2581         loi_list_maint(cli, loi);
2582
2583         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2584                   cmd);
2585
2586         osc_check_rpcs(cli);
2587         client_obd_list_unlock(&cli->cl_loi_list_lock);
2588
2589         RETURN(0);
2590 }
2591
2592 /* aka (~was & now & flag), but this is more clear :) */
2593 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2594
2595 static int osc_set_async_flags(struct obd_export *exp,
2596                                struct lov_stripe_md *lsm,
2597                                struct lov_oinfo *loi, void *cookie,
2598                                obd_flag async_flags)
2599 {
2600         struct client_obd *cli = &exp->exp_obd->u.cli;
2601         struct loi_oap_pages *lop;
2602         struct osc_async_page *oap;
2603         int rc = 0;
2604         ENTRY;
2605
2606         oap = oap_from_cookie(cookie);
2607         if (IS_ERR(oap))
2608                 RETURN(PTR_ERR(oap));
2609
2610         /*
2611          * bug 7311: OST-side locking is only supported for liblustre for now
2612          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2613          * implementation has to handle case where OST-locked page was picked
2614          * up by, e.g., ->writepage().
2615          */
2616         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2617         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2618                                      * tread here. */
2619
2620         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2621                 RETURN(-EIO);
2622
2623         if (loi == NULL)
2624                 loi = lsm->lsm_oinfo[0];
2625
2626         if (oap->oap_cmd & OBD_BRW_WRITE) {
2627                 lop = &loi->loi_write_lop;
2628         } else {
2629                 lop = &loi->loi_read_lop;
2630         }
2631
2632         client_obd_list_lock(&cli->cl_loi_list_lock);
2633
2634         if (list_empty(&oap->oap_pending_item))
2635                 GOTO(out, rc = -EINVAL);
2636
2637         if ((oap->oap_async_flags & async_flags) == async_flags)
2638                 GOTO(out, rc = 0);
2639
2640         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2641                 oap->oap_async_flags |= ASYNC_READY;
2642
2643         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2644                 if (list_empty(&oap->oap_rpc_item)) {
2645                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2646                         loi_list_maint(cli, loi);
2647                 }
2648         }
2649
2650         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2651                         oap->oap_async_flags);
2652 out:
2653         osc_check_rpcs(cli);
2654         client_obd_list_unlock(&cli->cl_loi_list_lock);
2655         RETURN(rc);
2656 }
2657
2658 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2659                              struct lov_oinfo *loi,
2660                              struct obd_io_group *oig, void *cookie,
2661                              int cmd, obd_off off, int count,
2662                              obd_flag brw_flags,
2663                              obd_flag async_flags)
2664 {
2665         struct client_obd *cli = &exp->exp_obd->u.cli;
2666         struct osc_async_page *oap;
2667         struct loi_oap_pages *lop;
2668         int rc = 0;
2669         ENTRY;
2670
2671         oap = oap_from_cookie(cookie);
2672         if (IS_ERR(oap))
2673                 RETURN(PTR_ERR(oap));
2674
2675         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2676                 RETURN(-EIO);
2677
2678         if (!list_empty(&oap->oap_pending_item) ||
2679             !list_empty(&oap->oap_urgent_item) ||
2680             !list_empty(&oap->oap_rpc_item))
2681                 RETURN(-EBUSY);
2682
2683         if (loi == NULL)
2684                 loi = lsm->lsm_oinfo[0];
2685
2686         client_obd_list_lock(&cli->cl_loi_list_lock);
2687
2688         oap->oap_cmd = cmd;
2689         oap->oap_page_off = off;
2690         oap->oap_count = count;
2691         oap->oap_brw_flags = brw_flags;
2692         oap->oap_async_flags = async_flags;
2693
2694         if (cmd & OBD_BRW_WRITE)
2695                 lop = &loi->loi_write_lop;
2696         else
2697                 lop = &loi->loi_read_lop;
2698
2699         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2700         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2701                 oap->oap_oig = oig;
2702                 rc = oig_add_one(oig, &oap->oap_occ);
2703         }
2704
2705         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2706                   oap, oap->oap_page, rc);
2707
2708         client_obd_list_unlock(&cli->cl_loi_list_lock);
2709
2710         RETURN(rc);
2711 }
2712
2713 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2714                                  struct loi_oap_pages *lop, int cmd)
2715 {
2716         struct list_head *pos, *tmp;
2717         struct osc_async_page *oap;
2718
2719         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2720                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2721                 list_del(&oap->oap_pending_item);
2722                 osc_oap_to_pending(oap);
2723         }
2724         loi_list_maint(cli, loi);
2725 }
2726
2727 static int osc_trigger_group_io(struct obd_export *exp,
2728                                 struct lov_stripe_md *lsm,
2729                                 struct lov_oinfo *loi,
2730                                 struct obd_io_group *oig)
2731 {
2732         struct client_obd *cli = &exp->exp_obd->u.cli;
2733         ENTRY;
2734
2735         if (loi == NULL)
2736                 loi = lsm->lsm_oinfo[0];
2737
2738         client_obd_list_lock(&cli->cl_loi_list_lock);
2739
2740         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2741         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2742
2743         osc_check_rpcs(cli);
2744         client_obd_list_unlock(&cli->cl_loi_list_lock);
2745
2746         RETURN(0);
2747 }
2748
2749 static int osc_teardown_async_page(struct obd_export *exp,
2750                                    struct lov_stripe_md *lsm,
2751                                    struct lov_oinfo *loi, void *cookie)
2752 {
2753         struct client_obd *cli = &exp->exp_obd->u.cli;
2754         struct loi_oap_pages *lop;
2755         struct osc_async_page *oap;
2756         int rc = 0;
2757         ENTRY;
2758
2759         oap = oap_from_cookie(cookie);
2760         if (IS_ERR(oap))
2761                 RETURN(PTR_ERR(oap));
2762
2763         if (loi == NULL)
2764                 loi = lsm->lsm_oinfo[0];
2765
2766         if (oap->oap_cmd & OBD_BRW_WRITE) {
2767                 lop = &loi->loi_write_lop;
2768         } else {
2769                 lop = &loi->loi_read_lop;
2770         }
2771
2772         client_obd_list_lock(&cli->cl_loi_list_lock);
2773
2774         if (!list_empty(&oap->oap_rpc_item))
2775                 GOTO(out, rc = -EBUSY);
2776
2777         osc_exit_cache(cli, oap, 0);
2778         osc_wake_cache_waiters(cli);
2779
2780         if (!list_empty(&oap->oap_urgent_item)) {
2781                 list_del_init(&oap->oap_urgent_item);
2782                 oap->oap_async_flags &= ~ASYNC_URGENT;
2783         }
2784         if (!list_empty(&oap->oap_pending_item)) {
2785                 list_del_init(&oap->oap_pending_item);
2786                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2787         }
2788         loi_list_maint(cli, loi);
2789
2790         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2791 out:
2792         client_obd_list_unlock(&cli->cl_loi_list_lock);
2793         RETURN(rc);
2794 }
2795
2796 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2797                                     int flags)
2798 {
2799         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2800
2801         if (lock == NULL) {
2802                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2803                 return;
2804         }
2805         lock_res_and_lock(lock);
2806 #if defined (__KERNEL__) && defined (__linux__)
2807         /* Liang XXX: Darwin and Winnt checking should be added */
2808         if (lock->l_ast_data && lock->l_ast_data != data) {
2809                 struct inode *new_inode = data;
2810                 struct inode *old_inode = lock->l_ast_data;
2811                 if (!(old_inode->i_state & I_FREEING))
2812                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2813                 LASSERTF(old_inode->i_state & I_FREEING,
2814                          "Found existing inode %p/%lu/%u state %lu in lock: "
2815                          "setting data to %p/%lu/%u\n", old_inode,
2816                          old_inode->i_ino, old_inode->i_generation,
2817                          old_inode->i_state,
2818                          new_inode, new_inode->i_ino, new_inode->i_generation);
2819         }
2820 #endif
2821         lock->l_ast_data = data;
2822         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2823         unlock_res_and_lock(lock);
2824         LDLM_LOCK_PUT(lock);
2825 }
2826
2827 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2828                              ldlm_iterator_t replace, void *data)
2829 {
2830         struct ldlm_res_id res_id = { .name = {0} };
2831         struct obd_device *obd = class_exp2obd(exp);
2832
2833         res_id.name[0] = lsm->lsm_object_id;
2834         res_id.name[2] = lsm->lsm_object_gr;
2835
2836         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2837         return 0;
2838 }
2839
2840 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2841                             int intent, int rc)
2842 {
2843         ENTRY;
2844
2845         if (intent) {
2846                 /* The request was created before ldlm_cli_enqueue call. */
2847                 if (rc == ELDLM_LOCK_ABORTED) {
2848                         struct ldlm_reply *rep;
2849
2850                         /* swabbed by ldlm_cli_enqueue() */
2851                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2852                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2853                                              sizeof(*rep));
2854                         LASSERT(rep != NULL);
2855                         if (rep->lock_policy_res1)
2856                                 rc = rep->lock_policy_res1;
2857                 }
2858         }
2859
2860         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2861                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2862                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2863                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2864                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2865         }
2866
2867         /* Call the update callback. */
2868         rc = oinfo->oi_cb_up(oinfo, rc);
2869         RETURN(rc);
2870 }
2871
2872 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2873                                  struct osc_enqueue_args *aa, int rc)
2874 {
2875         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2876         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2877         struct ldlm_lock *lock;
2878
2879         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2880          * be valid. */
2881         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2882
2883         /* Complete obtaining the lock procedure. */
2884         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2885                                    aa->oa_ei->ei_mode,
2886                                    &aa->oa_oi->oi_flags,
2887                                    &lsm->lsm_oinfo[0]->loi_lvb,
2888                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2889                                    lustre_swab_ost_lvb,
2890                                    aa->oa_oi->oi_lockh, rc);
2891
2892         /* Complete osc stuff. */
2893         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2894
2895         /* Release the lock for async request. */
2896         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2897                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2898
2899         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2900                  aa->oa_oi->oi_lockh, req, aa);
2901         LDLM_LOCK_PUT(lock);
2902         return rc;
2903 }
2904
2905 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2906  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2907  * other synchronous requests, however keeping some locks and trying to obtain
2908  * others may take a considerable amount of time in a case of ost failure; and
2909  * when other sync requests do not get released lock from a client, the client
2910  * is excluded from the cluster -- such scenarious make the life difficult, so
2911  * release locks just after they are obtained. */
2912 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2913                        struct ldlm_enqueue_info *einfo,
2914                        struct ptlrpc_request_set *rqset)
2915 {
2916         struct ldlm_res_id res_id = { .name = {0} };
2917         struct obd_device *obd = exp->exp_obd;
2918         struct ldlm_reply *rep;
2919         struct ptlrpc_request *req = NULL;
2920         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2921         ldlm_mode_t mode;
2922         int rc;
2923         ENTRY;
2924
2925         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2926         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2927
2928         /* Filesystem lock extents are extended to page boundaries so that
2929          * dealing with the page cache is a little smoother.  */
2930         oinfo->oi_policy.l_extent.start -=
2931                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2932         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2933
2934         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2935                 goto no_match;
2936
2937         /* Next, search for already existing extent locks that will cover us */
2938         /* If we're trying to read, we also search for an existing PW lock.  The
2939          * VFS and page cache already protect us locally, so lots of readers/
2940          * writers can share a single PW lock.
2941          *
2942          * There are problems with conversion deadlocks, so instead of
2943          * converting a read lock to a write lock, we'll just enqueue a new
2944          * one.
2945          *
2946          * At some point we should cancel the read lock instead of making them
2947          * send us a blocking callback, but there are problems with canceling
2948          * locks out from other users right now, too. */
2949         mode = einfo->ei_mode;
2950         if (einfo->ei_mode == LCK_PR)
2951                 mode |= LCK_PW;
2952         mode = ldlm_lock_match(obd->obd_namespace,
2953                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2954                                einfo->ei_type, &oinfo->oi_policy, mode,
2955                                oinfo->oi_lockh);
2956         if (mode) {
2957                 /* addref the lock only if not async requests and PW lock is
2958                  * matched whereas we asked for PR. */
2959                 if (!rqset && einfo->ei_mode != mode)
2960                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2961                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2962                                         oinfo->oi_flags);
2963                 if (intent) {
2964                         /* I would like to be able to ASSERT here that rss <=
2965                          * kms, but I can't, for reasons which are explained in
2966                          * lov_enqueue() */
2967                 }
2968
2969                 /* We already have a lock, and it's referenced */
2970                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2971
2972                 /* For async requests, decref the lock. */
2973                 if (einfo->ei_mode != mode)
2974                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2975                 else if (rqset)
2976                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2977
2978                 RETURN(ELDLM_OK);
2979         }
2980
2981  no_match:
2982         if (intent) {
2983                 int size[3] = {
2984                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2985                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2986                         [DLM_LOCKREQ_OFF + 1] = 0 };
2987
2988                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2989                 if (req == NULL)
2990                         RETURN(-ENOMEM);
2991
2992                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2993                 size[DLM_REPLY_REC_OFF] =
2994                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2995                 ptlrpc_req_set_repsize(req, 3, size);
2996         }
2997
2998         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2999         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3000
3001         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3002                               &oinfo->oi_policy, &oinfo->oi_flags,
3003                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3004                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3005                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3006                               rqset ? 1 : 0);
3007         if (rqset) {
3008                 if (!rc) {
3009                         struct osc_enqueue_args *aa;
3010                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3011                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3012                         aa->oa_oi = oinfo;
3013                         aa->oa_ei = einfo;
3014                         aa->oa_exp = exp;
3015
3016                         req->rq_interpret_reply = osc_enqueue_interpret;
3017                         ptlrpc_set_add_req(rqset, req);
3018                 } else if (intent) {
3019                         ptlrpc_req_finished(req);
3020                 }
3021                 RETURN(rc);
3022         }
3023
3024         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3025         if (intent)
3026                 ptlrpc_req_finished(req);
3027
3028         RETURN(rc);
3029 }
3030
3031 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3032                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3033                      int *flags, void *data, struct lustre_handle *lockh)
3034 {
3035         struct ldlm_res_id res_id = { .name = {0} };
3036         struct obd_device *obd = exp->exp_obd;
3037         int lflags = *flags;
3038         ldlm_mode_t rc;
3039         ENTRY;
3040
3041         res_id.name[0] = lsm->lsm_object_id;
3042         res_id.name[2] = lsm->lsm_object_gr;
3043
3044         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3045                 RETURN(-EIO);
3046
3047         /* Filesystem lock extents are extended to page boundaries so that
3048          * dealing with the page cache is a little smoother */
3049         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3050         policy->l_extent.end |= ~CFS_PAGE_MASK;
3051
3052         /* Next, search for already existing extent locks that will cover us */
3053         /* If we're trying to read, we also search for an existing PW lock.  The
3054          * VFS and page cache already protect us locally, so lots of readers/
3055          * writers can share a single PW lock. */
3056         rc = mode;
3057         if (mode == LCK_PR)
3058                 rc |= LCK_PW;
3059         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3060                              &res_id, type, policy, rc, lockh);
3061         if (rc) {
3062                 osc_set_data_with_check(lockh, data, lflags);
3063                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3064                         ldlm_lock_addref(lockh, LCK_PR);
3065                         ldlm_lock_decref(lockh, LCK_PW);
3066                 }
3067                 RETURN(rc);
3068         }
3069         RETURN(rc);
3070 }
3071
3072 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3073                       __u32 mode, struct lustre_handle *lockh)
3074 {
3075         ENTRY;
3076
3077         if (unlikely(mode == LCK_GROUP))
3078                 ldlm_lock_decref_and_cancel(lockh, mode);
3079         else
3080                 ldlm_lock_decref(lockh, mode);
3081
3082         RETURN(0);
3083 }
3084
3085 static int osc_cancel_unused(struct obd_export *exp,
3086                              struct lov_stripe_md *lsm, int flags,
3087                              void *opaque)
3088 {
3089         struct obd_device *obd = class_exp2obd(exp);
3090         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3091
3092         if (lsm != NULL) {
3093                 res_id.name[0] = lsm->lsm_object_id;
3094                 res_id.name[2] = lsm->lsm_object_gr;
3095                 resp = &res_id;
3096         }
3097
3098         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3099 }
3100
3101 static int osc_join_lru(struct obd_export *exp,
3102                         struct lov_stripe_md *lsm, int join)
3103 {
3104         struct obd_device *obd = class_exp2obd(exp);
3105         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3106
3107         if (lsm != NULL) {
3108                 res_id.name[0] = lsm->lsm_object_id;
3109                 res_id.name[2] = lsm->lsm_object_gr;
3110                 resp = &res_id;
3111         }
3112
3113         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3114 }
3115
3116 static int osc_statfs_interpret(struct ptlrpc_request *req,
3117                                 struct osc_async_args *aa, int rc)
3118 {
3119         struct obd_statfs *msfs;
3120         ENTRY;
3121
3122         if (rc != 0)
3123                 GOTO(out, rc);
3124
3125         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3126                                   lustre_swab_obd_statfs);
3127         if (msfs == NULL) {
3128                 CERROR("Can't unpack obd_statfs\n");
3129                 GOTO(out, rc = -EPROTO);
3130         }
3131
3132         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3133 out:
3134         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3135         RETURN(rc);
3136 }
3137
3138 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3139                             __u64 max_age, struct ptlrpc_request_set *rqset)
3140 {
3141         struct ptlrpc_request *req;
3142         struct osc_async_args *aa;
3143         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3144         ENTRY;
3145
3146         /* We could possibly pass max_age in the request (as an absolute
3147          * timestamp or a "seconds.usec ago") so the target can avoid doing
3148          * extra calls into the filesystem if that isn't necessary (e.g.
3149          * during mount that would help a bit).  Having relative timestamps
3150          * is not so great if request processing is slow, while absolute
3151          * timestamps are not ideal because they need time synchronization. */
3152         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3153                               OST_STATFS, 1, NULL, NULL);
3154         if (!req)
3155                 RETURN(-ENOMEM);
3156
3157         ptlrpc_req_set_repsize(req, 2, size);
3158         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3159
3160         req->rq_interpret_reply = osc_statfs_interpret;
3161         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3162         aa = (struct osc_async_args *)&req->rq_async_args;
3163         aa->aa_oi = oinfo;
3164
3165         ptlrpc_set_add_req(rqset, req);
3166         RETURN(0);
3167 }
3168
3169 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3170                       __u64 max_age)
3171 {
3172         struct obd_statfs *msfs;
3173         struct ptlrpc_request *req;
3174         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3175         ENTRY;
3176
3177         /* We could possibly pass max_age in the request (as an absolute
3178          * timestamp or a "seconds.usec ago") so the target can avoid doing
3179          * extra calls into the filesystem if that isn't necessary (e.g.
3180          * during mount that would help a bit).  Having relative timestamps
3181          * is not so great if request processing is slow, while absolute
3182          * timestamps are not ideal because they need time synchronization. */
3183         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3184                               OST_STATFS, 1, NULL, NULL);
3185         if (!req)
3186                 RETURN(-ENOMEM);
3187
3188         ptlrpc_req_set_repsize(req, 2, size);
3189         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3190
3191         rc = ptlrpc_queue_wait(req);
3192         if (rc)
3193                 GOTO(out, rc);
3194
3195         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3196                                   lustre_swab_obd_statfs);
3197         if (msfs == NULL) {
3198                 CERROR("Can't unpack obd_statfs\n");
3199                 GOTO(out, rc = -EPROTO);
3200         }
3201
3202         memcpy(osfs, msfs, sizeof(*osfs));
3203
3204         EXIT;
3205  out:
3206         ptlrpc_req_finished(req);
3207         return rc;
3208 }
3209
3210 /* Retrieve object striping information.
3211  *
3212  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3213  * the maximum number of OST indices which will fit in the user buffer.
3214  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3215  */
3216 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3217 {
3218         struct lov_user_md lum, *lumk;
3219         int rc = 0, lum_size;
3220         ENTRY;
3221
3222         if (!lsm)
3223                 RETURN(-ENODATA);
3224
3225         if (copy_from_user(&lum, lump, sizeof(lum)))
3226                 RETURN(-EFAULT);
3227
3228         if (lum.lmm_magic != LOV_USER_MAGIC)
3229                 RETURN(-EINVAL);
3230
3231         if (lum.lmm_stripe_count > 0) {
3232                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3233                 OBD_ALLOC(lumk, lum_size);
3234                 if (!lumk)
3235                         RETURN(-ENOMEM);
3236
3237                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3238                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3239         } else {
3240                 lum_size = sizeof(lum);
3241                 lumk = &lum;
3242         }
3243
3244         lumk->lmm_object_id = lsm->lsm_object_id;
3245         lumk->lmm_object_gr = lsm->lsm_object_gr;
3246         lumk->lmm_stripe_count = 1;
3247
3248         if (copy_to_user(lump, lumk, lum_size))
3249                 rc = -EFAULT;
3250
3251         if (lumk != &lum)
3252                 OBD_FREE(lumk, lum_size);
3253
3254         RETURN(rc);
3255 }
3256
3257
3258 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3259                          void *karg, void *uarg)
3260 {
3261         struct obd_device *obd = exp->exp_obd;
3262         struct obd_ioctl_data *data = karg;
3263         int err = 0;
3264         ENTRY;
3265
3266         if (!try_module_get(THIS_MODULE)) {
3267                 CERROR("Can't get module. Is it alive?");
3268                 return -EINVAL;
3269         }
3270         switch (cmd) {
3271         case OBD_IOC_LOV_GET_CONFIG: {
3272                 char *buf;
3273                 struct lov_desc *desc;
3274                 struct obd_uuid uuid;
3275
3276                 buf = NULL;
3277                 len = 0;
3278                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3279                         GOTO(out, err = -EINVAL);
3280
3281                 data = (struct obd_ioctl_data *)buf;
3282
3283                 if (sizeof(*desc) > data->ioc_inllen1) {
3284                         obd_ioctl_freedata(buf, len);
3285                         GOTO(out, err = -EINVAL);
3286                 }
3287
3288                 if (data->ioc_inllen2 < sizeof(uuid)) {
3289                         obd_ioctl_freedata(buf, len);
3290                         GOTO(out, err = -EINVAL);
3291                 }
3292
3293                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3294                 desc->ld_tgt_count = 1;
3295                 desc->ld_active_tgt_count = 1;
3296                 desc->ld_default_stripe_count = 1;
3297                 desc->ld_default_stripe_size = 0;
3298                 desc->ld_default_stripe_offset = 0;
3299                 desc->ld_pattern = 0;
3300                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3301
3302                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3303
3304                 err = copy_to_user((void *)uarg, buf, len);
3305                 if (err)
3306                         err = -EFAULT;
3307                 obd_ioctl_freedata(buf, len);
3308                 GOTO(out, err);
3309         }
3310         case LL_IOC_LOV_SETSTRIPE:
3311                 err = obd_alloc_memmd(exp, karg);
3312                 if (err > 0)
3313                         err = 0;
3314                 GOTO(out, err);
3315         case LL_IOC_LOV_GETSTRIPE:
3316                 err = osc_getstripe(karg, uarg);
3317                 GOTO(out, err);
3318         case OBD_IOC_CLIENT_RECOVER:
3319                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3320                                             data->ioc_inlbuf1);
3321                 if (err > 0)
3322                         err = 0;
3323                 GOTO(out, err);
3324         case IOC_OSC_SET_ACTIVE:
3325                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3326                                                data->ioc_offset);
3327                 GOTO(out, err);
3328         case OBD_IOC_POLL_QUOTACHECK:
3329                 err = lquota_poll_check(quota_interface, exp,
3330                                         (struct if_quotacheck *)karg);
3331                 GOTO(out, err);
3332         default:
3333                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3334                        cmd, cfs_curproc_comm());
3335                 GOTO(out, err = -ENOTTY);
3336         }
3337 out:
3338         module_put(THIS_MODULE);
3339         return err;
3340 }
3341
3342 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3343                         void *key, __u32 *vallen, void *val)
3344 {
3345         ENTRY;
3346         if (!vallen || !val)
3347                 RETURN(-EFAULT);
3348
3349         if (KEY_IS("lock_to_stripe")) {
3350                 __u32 *stripe = val;
3351                 *vallen = sizeof(*stripe);
3352                 *stripe = 0;
3353                 RETURN(0);
3354         } else if (KEY_IS("last_id")) {
3355                 struct ptlrpc_request *req;
3356                 obd_id *reply;
3357                 char *bufs[2] = { NULL, key };
3358                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3359
3360                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3361                                       OST_GET_INFO, 2, size, bufs);
3362                 if (req == NULL)
3363                         RETURN(-ENOMEM);
3364
3365                 size[REPLY_REC_OFF] = *vallen;
3366                 ptlrpc_req_set_repsize(req, 2, size);
3367                 rc = ptlrpc_queue_wait(req);
3368                 if (rc)
3369                         GOTO(out, rc);
3370
3371                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3372                                            lustre_swab_ost_last_id);
3373                 if (reply == NULL) {
3374                         CERROR("Can't unpack OST last ID\n");
3375                         GOTO(out, rc = -EPROTO);
3376                 }
3377                 *((obd_id *)val) = *reply;
3378         out:
3379                 ptlrpc_req_finished(req);
3380                 RETURN(rc);
3381         }
3382         RETURN(-EINVAL);
3383 }
3384
3385 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3386                                           void *aa, int rc)
3387 {
3388         struct llog_ctxt *ctxt;
3389         struct obd_import *imp = req->rq_import;
3390         ENTRY;
3391
3392         if (rc != 0)
3393                 RETURN(rc);
3394
3395         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3396         if (ctxt) {
3397                 if (rc == 0)
3398                         rc = llog_initiator_connect(ctxt);
3399                 else
3400                         CERROR("cannot establish connection for "
3401                                "ctxt %p: %d\n", ctxt, rc);
3402         }
3403
3404         spin_lock(&imp->imp_lock);
3405         imp->imp_server_timeout = 1;
3406         imp->imp_pingable = 1;
3407         spin_unlock(&imp->imp_lock);
3408         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3409
3410         RETURN(rc);
3411 }
3412
3413 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3414                               void *key, obd_count vallen, void *val,
3415                               struct ptlrpc_request_set *set)
3416 {
3417         struct ptlrpc_request *req;
3418         struct obd_device  *obd = exp->exp_obd;
3419         struct obd_import *imp = class_exp2cliimp(exp);
3420         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3421         char *bufs[3] = { NULL, key, val };
3422         ENTRY;
3423
3424         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3425
3426         if (KEY_IS(KEY_NEXT_ID)) {
3427                 if (vallen != sizeof(obd_id))
3428                         RETURN(-EINVAL);
3429                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3430                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3431                        exp->exp_obd->obd_name,
3432                        obd->u.cli.cl_oscc.oscc_next_id);
3433
3434                 RETURN(0);
3435         }
3436
3437         if (KEY_IS("unlinked")) {
3438                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3439                 spin_lock(&oscc->oscc_lock);
3440                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3441                 spin_unlock(&oscc->oscc_lock);
3442                 RETURN(0);
3443         }
3444
3445         if (KEY_IS(KEY_INIT_RECOV)) {
3446                 if (vallen != sizeof(int))
3447                         RETURN(-EINVAL);
3448                 spin_lock(&imp->imp_lock);
3449                 imp->imp_initial_recov = *(int *)val;
3450                 spin_unlock(&imp->imp_lock);
3451                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3452                        exp->exp_obd->obd_name,
3453                        imp->imp_initial_recov);
3454                 RETURN(0);
3455         }
3456
3457         if (KEY_IS("checksum")) {
3458                 if (vallen != sizeof(int))
3459                         RETURN(-EINVAL);
3460                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3461                 RETURN(0);
3462         }
3463
3464         if (KEY_IS(KEY_FLUSH_CTX)) {
3465                 sptlrpc_import_flush_my_ctx(imp);
3466                 RETURN(0);
3467         }
3468
3469         if (!set)
3470                 RETURN(-EINVAL);
3471
3472         /* We pass all other commands directly to OST. Since nobody calls osc
3473            methods directly and everybody is supposed to go through LOV, we
3474            assume lov checked invalid values for us.
3475            The only recognised values so far are evict_by_nid and mds_conn.
3476            Even if something bad goes through, we'd get a -EINVAL from OST
3477            anyway. */
3478
3479         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3480                               bufs);
3481         if (req == NULL)
3482                 RETURN(-ENOMEM);
3483
3484         if (KEY_IS(KEY_MDS_CONN)) {
3485                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3486
3487                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3488                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3489                 LASSERT(oscc->oscc_oa.o_gr > 0);
3490                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3491         }
3492
3493         ptlrpc_req_set_repsize(req, 1, NULL);
3494         ptlrpc_set_add_req(set, req);
3495         ptlrpc_check_set(set);
3496
3497         RETURN(0);
3498 }
3499
3500
3501 static struct llog_operations osc_size_repl_logops = {
3502         lop_cancel: llog_obd_repl_cancel
3503 };
3504
3505 static struct llog_operations osc_mds_ost_orig_logops;
3506 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3507                          struct obd_device *tgt, int count,
3508                          struct llog_catid *catid, struct obd_uuid *uuid)
3509 {
3510         int rc;
3511         ENTRY;
3512
3513         spin_lock(&obd->obd_dev_lock);
3514         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3515                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3516                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3517                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3518                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3519                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3520         }
3521         spin_unlock(&obd->obd_dev_lock);
3522
3523         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3524                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3525         if (rc) {
3526                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3527                 GOTO (out, rc);
3528         }
3529
3530         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3531                         &osc_size_repl_logops);
3532         if (rc)
3533                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3534 out:
3535         if (rc) {
3536                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3537                        obd->obd_name, tgt->obd_name, count, catid, rc);
3538                 CERROR("logid "LPX64":0x%x\n",
3539                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3540         }
3541         RETURN(rc);
3542 }
3543
3544 static int osc_llog_finish(struct obd_device *obd, int count)
3545 {
3546         struct llog_ctxt *ctxt;
3547         int rc = 0, rc2 = 0;
3548         ENTRY;
3549
3550         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3551         if (ctxt)
3552                 rc = llog_cleanup(ctxt);
3553
3554         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3555         if (ctxt)
3556                 rc2 = llog_cleanup(ctxt);
3557         if (!rc)
3558                 rc = rc2;
3559
3560         RETURN(rc);
3561 }
3562
3563 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3564                          struct obd_uuid *cluuid,
3565                          struct obd_connect_data *data)
3566 {
3567         struct client_obd *cli = &obd->u.cli;
3568
3569         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3570                 long lost_grant;
3571
3572                 client_obd_list_lock(&cli->cl_loi_list_lock);
3573                 data->ocd_grant = cli->cl_avail_grant ?:
3574                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3575                 lost_grant = cli->cl_lost_grant;
3576                 cli->cl_lost_grant = 0;
3577                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3578
3579                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3580                        "cl_lost_grant: %ld\n", data->ocd_grant,
3581                        cli->cl_avail_grant, lost_grant);
3582                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3583                        " ocd_grant: %d\n", data->ocd_connect_flags,
3584                        data->ocd_version, data->ocd_grant);
3585         }
3586
3587         RETURN(0);
3588 }
3589
3590 static int osc_disconnect(struct obd_export *exp)
3591 {
3592         struct obd_device *obd = class_exp2obd(exp);
3593         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3594         int rc;
3595
3596         if (obd->u.cli.cl_conn_count == 1)
3597                 /* flush any remaining cancel messages out to the target */
3598                 llog_sync(ctxt, exp);
3599
3600         rc = client_disconnect_export(exp);
3601         return rc;
3602 }
3603
3604 static int osc_import_event(struct obd_device *obd,
3605                             struct obd_import *imp,
3606                             enum obd_import_event event)
3607 {
3608         struct client_obd *cli;
3609         int rc = 0;
3610
3611         ENTRY;
3612         LASSERT(imp->imp_obd == obd);
3613
3614         switch (event) {
3615         case IMP_EVENT_DISCON: {
3616                 /* Only do this on the MDS OSC's */
3617                 if (imp->imp_server_timeout) {
3618                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3619
3620                         spin_lock(&oscc->oscc_lock);
3621                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3622                         spin_unlock(&oscc->oscc_lock);
3623                 }
3624                 cli = &obd->u.cli;
3625                 client_obd_list_lock(&cli->cl_loi_list_lock);
3626                 cli->cl_avail_grant = 0;
3627                 cli->cl_lost_grant = 0;
3628                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3629                 break;
3630         }
3631         case IMP_EVENT_INACTIVE: {
3632                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3633                 break;
3634         }
3635         case IMP_EVENT_INVALIDATE: {
3636                 struct ldlm_namespace *ns = obd->obd_namespace;
3637
3638                 /* Reset grants */
3639                 cli = &obd->u.cli;
3640                 client_obd_list_lock(&cli->cl_loi_list_lock);
3641                 /* all pages go to failing rpcs due to the invalid import */
3642                 osc_check_rpcs(cli);
3643                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3644
3645                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3646
3647                 break;
3648         }
3649         case IMP_EVENT_ACTIVE: {
3650                 /* Only do this on the MDS OSC's */
3651                 if (imp->imp_server_timeout) {
3652                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3653
3654                         spin_lock(&oscc->oscc_lock);
3655                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3656                         spin_unlock(&oscc->oscc_lock);
3657                 }
3658                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3659                 break;
3660         }
3661         case IMP_EVENT_OCD: {
3662                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3663
3664                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3665                         osc_init_grant(&obd->u.cli, ocd);
3666
3667                 /* See bug 7198 */
3668                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3669                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3670
3671                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3672                 break;
3673         }
3674         default:
3675                 CERROR("Unknown import event %d\n", event);
3676                 LBUG();
3677         }
3678         RETURN(rc);
3679 }
3680
3681 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3682 {
3683         int rc;
3684         ENTRY;
3685
3686         ENTRY;
3687         rc = ptlrpcd_addref();
3688         if (rc)
3689                 RETURN(rc);
3690
3691         rc = client_obd_setup(obd, lcfg);
3692         if (rc) {
3693                 ptlrpcd_decref();
3694         } else {
3695                 struct lprocfs_static_vars lvars = { 0 };
3696                 struct client_obd *cli = &obd->u.cli;
3697
3698                 lprocfs_osc_init_vars(&lvars);
3699                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3700                         lproc_osc_attach_seqstat(obd);
3701                         ptlrpc_lprocfs_register_obd(obd);
3702                 }
3703
3704                 oscc_init(obd);
3705                 /* We need to allocate a few requests more, because
3706                    brw_interpret_oap tries to create new requests before freeing
3707                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3708                    reserved, but I afraid that might be too much wasted RAM
3709                    in fact, so 2 is just my guess and still should work. */
3710                 cli->cl_import->imp_rq_pool =
3711                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3712                                             OST_MAXREQSIZE,
3713                                             ptlrpc_add_rqs_to_pool);
3714         }
3715
3716         RETURN(rc);
3717 }
3718
3719 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3720 {
3721         int rc = 0;
3722         ENTRY;
3723
3724         switch (stage) {
3725         case OBD_CLEANUP_EARLY: {
3726                 struct obd_import *imp;
3727                 imp = obd->u.cli.cl_import;
3728                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3729                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3730                 ptlrpc_deactivate_import(imp);
3731                 spin_lock(&imp->imp_lock);
3732                 imp->imp_pingable = 0;
3733                 spin_unlock(&imp->imp_lock);
3734                 break;
3735         }
3736         case OBD_CLEANUP_EXPORTS: {
3737                 /* If we set up but never connected, the
3738                    client import will not have been cleaned. */
3739                 if (obd->u.cli.cl_import) {
3740                         struct obd_import *imp;
3741                         imp = obd->u.cli.cl_import;
3742                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3743                                obd->obd_name);
3744                         ptlrpc_invalidate_import(imp);
3745                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3746                         class_destroy_import(imp);
3747                         obd->u.cli.cl_import = NULL;
3748                 }
3749                 break;
3750         }
3751         case OBD_CLEANUP_SELF_EXP:
3752                 rc = obd_llog_finish(obd, 0);
3753                 if (rc != 0)
3754                         CERROR("failed to cleanup llogging subsystems\n");
3755                 break;
3756         case OBD_CLEANUP_OBD:
3757                 break;
3758         }
3759         RETURN(rc);
3760 }
3761
3762 int osc_cleanup(struct obd_device *obd)
3763 {
3764         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3765         int rc;
3766
3767         ENTRY;
3768         ptlrpc_lprocfs_unregister_obd(obd);
3769         lprocfs_obd_cleanup(obd);
3770
3771         spin_lock(&oscc->oscc_lock);
3772         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3773         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3774         spin_unlock(&oscc->oscc_lock);
3775
3776         /* free memory of osc quota cache */
3777         lquota_cleanup(quota_interface, obd);
3778
3779         rc = client_obd_cleanup(obd);
3780
3781         ptlrpcd_decref();
3782         RETURN(rc);
3783 }
3784
3785 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3786 {
3787         struct lustre_cfg *lcfg = buf;
3788         struct lprocfs_static_vars lvars = { 0 };
3789         int rc = 0;
3790
3791         lprocfs_osc_init_vars(&lvars);
3792
3793         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3794         return(rc);
3795 }
3796
3797 struct obd_ops osc_obd_ops = {
3798         .o_owner                = THIS_MODULE,
3799         .o_setup                = osc_setup,
3800         .o_precleanup           = osc_precleanup,
3801         .o_cleanup              = osc_cleanup,
3802         .o_add_conn             = client_import_add_conn,
3803         .o_del_conn             = client_import_del_conn,
3804         .o_connect              = client_connect_import,
3805         .o_reconnect            = osc_reconnect,
3806         .o_disconnect           = osc_disconnect,
3807         .o_statfs               = osc_statfs,
3808         .o_statfs_async         = osc_statfs_async,
3809         .o_packmd               = osc_packmd,
3810         .o_unpackmd             = osc_unpackmd,
3811         .o_precreate            = osc_precreate,
3812         .o_create               = osc_create,
3813         .o_destroy              = osc_destroy,
3814         .o_getattr              = osc_getattr,
3815         .o_getattr_async        = osc_getattr_async,
3816         .o_setattr              = osc_setattr,
3817         .o_setattr_async        = osc_setattr_async,
3818         .o_brw                  = osc_brw,
3819         .o_brw_async            = osc_brw_async,
3820         .o_prep_async_page      = osc_prep_async_page,
3821         .o_queue_async_io       = osc_queue_async_io,
3822         .o_set_async_flags      = osc_set_async_flags,
3823         .o_queue_group_io       = osc_queue_group_io,
3824         .o_trigger_group_io     = osc_trigger_group_io,
3825         .o_teardown_async_page  = osc_teardown_async_page,
3826         .o_punch                = osc_punch,
3827         .o_sync                 = osc_sync,
3828         .o_enqueue              = osc_enqueue,
3829         .o_match                = osc_match,
3830         .o_change_cbdata        = osc_change_cbdata,
3831         .o_cancel               = osc_cancel,
3832         .o_cancel_unused        = osc_cancel_unused,
3833         .o_join_lru             = osc_join_lru,
3834         .o_iocontrol            = osc_iocontrol,
3835         .o_get_info             = osc_get_info,
3836         .o_set_info_async       = osc_set_info_async,
3837         .o_import_event         = osc_import_event,
3838         .o_llog_init            = osc_llog_init,
3839         .o_llog_finish          = osc_llog_finish,
3840         .o_process_config       = osc_process_config,
3841 };
3842 int __init osc_init(void)
3843 {
3844         struct lprocfs_static_vars lvars = { 0 };
3845         int rc;
3846         ENTRY;
3847
3848         lprocfs_osc_init_vars(&lvars);
3849
3850         request_module("lquota");
3851         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3852         lquota_init(quota_interface);
3853         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3854
3855         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3856                                  LUSTRE_OSC_NAME, NULL);
3857         if (rc) {
3858                 if (quota_interface)
3859                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3860                 RETURN(rc);
3861         }
3862
3863         RETURN(rc);
3864 }
3865
3866 #ifdef __KERNEL__
3867 static void /*__exit*/ osc_exit(void)
3868 {
3869         lquota_exit(quota_interface);
3870         if (quota_interface)
3871                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3872
3873         class_unregister_type(LUSTRE_OSC_NAME);
3874 }
3875
3876 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3877 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3878 MODULE_LICENSE("GPL");
3879
3880 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3881 #endif