Whamcloud - gitweb
- b_hd_audit landing
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #ifdef __KERNEL__
35 # include <linux/version.h>
36 # include <linux/module.h>
37 # include <linux/mm.h>
38 # include <linux/highmem.h>
39 # include <linux/ctype.h>
40 # include <linux/init.h>
41 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 #  include <linux/workqueue.h>
43 #  include <linux/smp_lock.h>
44 # else
45 #  include <linux/locks.h>
46 # endif
47 #else /* __KERNEL__ */
48 # include <liblustre.h>
49 #endif
50
51 #include <linux/lustre_dlm.h>
52 #include <libcfs/kp30.h>
53 #include <linux/lustre_net.h>
54 #include <linux/lustre_sec.h>
55 #include <lustre/lustre_user.h>
56 #include <linux/obd_ost.h>
57 #include <linux/obd_lov.h>
58
59 #ifdef  __CYGWIN__
60 # include <ctype.h>
61 #endif
62
63 #include <linux/lustre_ha.h>
64 #include <linux/lprocfs_status.h>
65 #include <linux/lustre_log.h>
66 #include <linux/lustre_audit.h>
67 #include <linux/lustre_gs.h>
68 #include "osc_internal.h"
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (!lmmp)
79                 RETURN(lmm_size);
80
81         if (*lmmp && !lsm) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         }
86
87         if (!*lmmp) {
88                 OBD_ALLOC(*lmmp, lmm_size);
89                 if (!*lmmp)
90                         RETURN(-ENOMEM);
91         }
92
93         if (lsm) {
94                 LASSERT(lsm->lsm_object_id);
95                 LASSERT(lsm->lsm_object_gr);
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 loi_init((*lsmp)->lsm_oinfo);
139         }
140
141         if (lmm != NULL) {
142                 /* XXX zero *lsmp? */
143                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
144                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
145                 LASSERT((*lsmp)->lsm_object_id);
146                 LASSERT((*lsmp)->lsm_object_gr);
147         }
148
149         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
150
151         RETURN(lsm_size);
152 }
153
154 static int osc_getattr_interpret(struct ptlrpc_request *req,
155                                  struct osc_getattr_async_args *aa, int rc)
156 {
157         struct ost_body *body;
158         ENTRY;
159
160         if (rc != 0)
161                 RETURN(rc);
162
163         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
164         if (body) {
165                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
166                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
167
168                 /* This should really be sent by the OST */
169                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
170                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
171         } else {
172                 CERROR("can't unpack ost_body\n");
173                 rc = -EPROTO;
174                 aa->aa_oa->o_valid = 0;
175         }
176
177         RETURN(rc);
178 }
179
180 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
181                              struct lov_stripe_md *md,
182                              struct ptlrpc_request_set *set)
183 {
184         struct ptlrpc_request *request;
185         struct ost_body *body;
186         int size = sizeof(*body);
187         struct osc_getattr_async_args *aa;
188         ENTRY;
189
190         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
191                                   OST_GETATTR, 1, &size, NULL);
192         if (!request)
193                 RETURN(-ENOMEM);
194
195         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
196         memcpy(&body->oa, oa, sizeof(*oa));
197
198         request->rq_replen = lustre_msg_size(1, &size);
199         request->rq_interpret_reply = osc_getattr_interpret;
200
201         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
202         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
203         aa->aa_oa = oa;
204
205         ptlrpc_set_add_req (set, request);
206         RETURN (0);
207 }
208
209 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
210                        struct lov_stripe_md *md)
211 {
212         struct ptlrpc_request *request;
213         struct ost_body *body;
214         int rc, size = sizeof(*body);
215         ENTRY;
216
217         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
218                                   OST_GETATTR, 1, &size, NULL);
219         if (!request)
220                 RETURN(-ENOMEM);
221
222         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
223         memcpy(&body->oa, oa, sizeof(*oa));
224
225         request->rq_replen = lustre_msg_size(1, &size);
226
227         rc = ptlrpc_queue_wait(request);
228         if (rc) {
229                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
230                 GOTO(out, rc);
231         }
232
233         body = lustre_swab_repbuf(request, 0, sizeof (*body),
234                                   lustre_swab_ost_body);
235         if (body == NULL) {
236                 CERROR ("can't unpack ost_body\n");
237                 GOTO (out, rc = -EPROTO);
238         }
239
240         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
241         memcpy(oa, &body->oa, sizeof(*oa));
242
243         /* This should really be sent by the OST */
244         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
245         oa->o_valid |= OBD_MD_FLBLKSZ;
246
247         EXIT;
248  out:
249         ptlrpc_req_finished(request);
250         return rc;
251 }
252
253 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
254                        struct lov_stripe_md *md, struct obd_trans_info *oti)
255 {
256         struct ptlrpc_request *request;
257         struct ost_body *body;
258         int rc, size = sizeof(*body);
259         ENTRY;
260
261         LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0);
262
263         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
264                                   OST_SETATTR, 1, &size, NULL);
265         if (!request)
266                 RETURN(-ENOMEM);
267
268         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
269         memcpy(&body->oa, oa, sizeof(*oa));
270
271         request->rq_replen = lustre_msg_size(1, &size);
272
273         if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
274                 ptlrpcd_add_req(request);
275                 rc = 0;
276         } else {
277                 rc = ptlrpc_queue_wait(request);
278                 if (rc)
279                         GOTO(out, rc);
280
281                 body = lustre_swab_repbuf(request, 0, sizeof(*body),
282                                           lustre_swab_ost_body);
283                 if (body == NULL)
284                         GOTO(out, rc = -EPROTO);
285
286                 memcpy(oa, &body->oa, sizeof(*oa));
287         }
288         EXIT;
289 out:
290         ptlrpc_req_finished(request);
291         RETURN(0);
292 }
293
294 int osc_real_create(struct obd_export *exp, struct obdo *oa,
295                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
296 {
297         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
298         struct ptlrpc_request *request;
299         struct ost_body *body;
300         struct lov_stripe_md *lsm;
301         int rc, size = sizeof(*body);
302         ENTRY;
303
304         LASSERT(oa);
305         LASSERT(ea);
306
307         lsm = *ea;
308         if (!lsm) {
309                 rc = obd_alloc_memmd(exp, &lsm);
310                 if (rc < 0)
311                         RETURN(rc);
312         }
313
314         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
315                                   OST_CREATE, 1, &size, NULL);
316         if (!request)
317                 GOTO(out, rc = -ENOMEM);
318
319         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
320         memcpy(&body->oa, oa, sizeof(body->oa));
321
322         request->rq_replen = lustre_msg_size(1, &size);
323         if (oa->o_valid & OBD_MD_FLINLINE) {
324                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
325                         oa->o_flags == OBD_FL_DELORPHAN);
326                 DEBUG_REQ(D_HA, request,
327                           "delorphan from OST integration");
328                 /* Don't resend the delorphan request */
329                 request->rq_no_resend = request->rq_no_delay = 1;
330         }
331
332         rc = ptlrpc_queue_wait(request);
333         if (rc)
334                 GOTO(out_req, rc);
335
336         body = lustre_swab_repbuf(request, 0, sizeof(*body),
337                                   lustre_swab_ost_body);
338         if (body == NULL) {
339                 CERROR ("can't unpack ost_body\n");
340                 GOTO (out_req, rc = -EPROTO);
341         }
342
343         if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
344                 struct obd_import *imp = class_exp2cliimp(exp);
345                 /* MDS declares last known object, OSS responses
346                  * with next possible object -bzzz */
347                 spin_lock(&oscc->oscc_lock);
348                 oscc->oscc_next_id = body->oa.o_id;
349                 spin_unlock(&oscc->oscc_lock);
350                 CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
351                        imp->imp_target_uuid.uuid, oa->o_id);
352         }
353         memcpy(oa, &body->oa, sizeof(*oa));
354
355         /* This should really be sent by the OST */
356         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
357         oa->o_valid |= OBD_MD_FLBLKSZ;
358
359         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
360          * have valid lsm_oinfo data structs, so don't go touching that.
361          * This needs to be fixed in a big way.
362          */
363         lsm->lsm_object_id = oa->o_id;
364         lsm->lsm_object_gr = oa->o_gr;
365         *ea = lsm;
366
367         if (oti != NULL) {
368                 oti->oti_transno = request->rq_repmsg->transno;
369
370                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
371                         if (!oti->oti_logcookies)
372                                 oti_alloc_cookies(oti, 1);
373                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
374                                sizeof(oti->oti_onecookie));
375                 }
376         }
377
378         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
379         EXIT;
380 out_req:
381         ptlrpc_req_finished(request);
382 out:
383         if (rc && !*ea)
384                 obd_free_memmd(exp, &lsm);
385         return rc;
386 }
387
388 static int osc_punch(struct obd_export *exp, struct obdo *oa,
389                      struct lov_stripe_md *md, obd_size start,
390                      obd_size end, struct obd_trans_info *oti)
391 {
392         struct ptlrpc_request *request;
393         struct ost_body *body;
394         int rc, size = sizeof(*body);
395         ENTRY;
396
397         if (!oa) {
398                 CERROR("oa NULL\n");
399                 RETURN(-EINVAL);
400         }
401
402         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
403                                   OST_PUNCH, 1, &size, NULL);
404         if (!request)
405                 RETURN(-ENOMEM);
406
407         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
408         memcpy(&body->oa, oa, sizeof(*oa));
409
410         /* overload the size and blocks fields in the oa with start/end */
411         body->oa.o_size = start;
412         body->oa.o_blocks = end;
413         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
414
415         request->rq_replen = lustre_msg_size(1, &size);
416
417         rc = ptlrpc_queue_wait(request);
418         if (rc)
419                 GOTO(out, rc);
420
421         body = lustre_swab_repbuf (request, 0, sizeof (*body),
422                                    lustre_swab_ost_body);
423         if (body == NULL) {
424                 CERROR ("can't unpack ost_body\n");
425                 GOTO (out, rc = -EPROTO);
426         }
427
428         memcpy(oa, &body->oa, sizeof(*oa));
429
430         EXIT;
431  out:
432         ptlrpc_req_finished(request);
433         return rc;
434 }
435
436 static int osc_sync(struct obd_export *exp, struct obdo *oa,
437                     struct lov_stripe_md *md, obd_size start,
438                     obd_size end)
439 {
440         struct ptlrpc_request *request;
441         struct ost_body *body;
442         int rc, size = sizeof(*body);
443         ENTRY;
444
445         if (!oa) {
446                 CERROR("oa NULL\n");
447                 RETURN(-EINVAL);
448         }
449
450         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
451                                   OST_SYNC, 1, &size, NULL);
452         if (!request)
453                 RETURN(-ENOMEM);
454
455         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
456         memcpy(&body->oa, oa, sizeof(*oa));
457
458         /* overload the size and blocks fields in the oa with start/end */
459         body->oa.o_size = start;
460         body->oa.o_blocks = end;
461         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
462
463         request->rq_replen = lustre_msg_size(1, &size);
464
465         rc = ptlrpc_queue_wait(request);
466         if (rc)
467                 GOTO(out, rc);
468
469         body = lustre_swab_repbuf(request, 0, sizeof(*body),
470                                   lustre_swab_ost_body);
471         if (body == NULL) {
472                 CERROR ("can't unpack ost_body\n");
473                 GOTO (out, rc = -EPROTO);
474         }
475
476         memcpy(oa, &body->oa, sizeof(*oa));
477
478         EXIT;
479  out:
480         ptlrpc_req_finished(request);
481         return rc;
482 }
483
484 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
485                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
486 {
487         struct ptlrpc_request *request;
488         struct ost_body *body;
489         int rc, size = sizeof(*body);
490         ENTRY;
491
492         if (!oa) {
493                 CERROR("oa NULL\n");
494                 RETURN(-EINVAL);
495         }
496
497         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
498                                   OST_DESTROY, 1, &size, NULL);
499         if (!request)
500                 RETURN(-ENOMEM);
501
502         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
503
504         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
505                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
506                        sizeof(*oti->oti_logcookies));
507                 oti->oti_logcookies++;
508         }
509
510         memcpy(&body->oa, oa, sizeof(*oa));
511         request->rq_replen = lustre_msg_size(1, &size);
512
513         if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
514                 ptlrpcd_add_req(request);
515                 rc = 0;
516         } else {
517                 rc = ptlrpc_queue_wait(request);
518
519                 if (rc == -ENOENT)
520                         rc = 0;
521
522                 if (rc) {
523                         ptlrpc_req_finished(request);
524                         RETURN(rc);
525                 }
526
527                 body = lustre_swab_repbuf(request, 0, sizeof(*body),
528                                           lustre_swab_ost_body);
529                 if (body == NULL) {
530                         CERROR ("Can't unpack body\n");
531                         ptlrpc_req_finished(request);
532                         RETURN(-EPROTO);
533                 }
534
535                 memcpy(oa, &body->oa, sizeof(*oa));
536                 ptlrpc_req_finished(request);
537         }
538         RETURN(rc);
539 }
540
541 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
542                                 long writing_bytes)
543 {
544         obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
545
546         LASSERT(!(oa->o_valid & bits));
547
548         oa->o_valid |= bits;
549         spin_lock(&cli->cl_loi_list_lock);
550         oa->o_dirty = cli->cl_dirty;
551         oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
552         oa->o_grant = cli->cl_avail_grant;
553         oa->o_dropped = cli->cl_lost_grant;
554         cli->cl_lost_grant = 0;
555         spin_unlock(&cli->cl_loi_list_lock);
556         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
557                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
558 }
559
560 /* caller must hold loi_list_lock */
561 static void osc_consume_write_grant(struct client_obd *cli,
562                                     struct osc_async_page *oap)
563 {
564         cli->cl_dirty += PAGE_SIZE;
565         cli->cl_avail_grant -= PAGE_SIZE;
566         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
567         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
568         LASSERT(cli->cl_avail_grant >= 0);
569 }
570
571 static unsigned long rpcs_in_flight(struct client_obd *cli)
572 {
573         return cli->cl_r_in_flight + cli->cl_w_in_flight;
574 }
575
576 /* caller must hold loi_list_lock */
577 void osc_wake_cache_waiters(struct client_obd *cli)
578 {
579         struct list_head *l, *tmp;
580         struct osc_cache_waiter *ocw;
581
582         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
583                 /* if we can't dirty more, we must wait until some is written */
584                 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
585                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
586                                cli->cl_dirty, cli->cl_dirty_max);
587                         return;
588                 }
589
590                 /* if still dirty cache but no grant wait for pending RPCs that
591                  * may yet return us some grant before doing sync writes */
592                 if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
593                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
594                                cli->cl_w_in_flight);
595                 }
596                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
597                 list_del_init(&ocw->ocw_entry);
598                 if (cli->cl_avail_grant < PAGE_SIZE) {
599                         /* no more RPCs in flight to return grant, do sync IO */
600                         ocw->ocw_rc = -EDQUOT;
601                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
602                 } else {
603                         osc_consume_write_grant(cli, ocw->ocw_oap);
604                 }
605
606                 wake_up(&ocw->ocw_waitq);
607         }
608
609         EXIT;
610 }
611
612 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
613 {
614         spin_lock(&cli->cl_loi_list_lock);
615         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
616         cli->cl_avail_grant += body->oa.o_grant;
617         /* waiters are woken in brw_interpret_oap */
618         spin_unlock(&cli->cl_loi_list_lock);
619 }
620
621 /* We assume that the reason this OSC got a short read is because it read
622  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
623  * via the LOV, and it _knows_ it's reading inside the file, it's just that
624  * this stripe never got written at or beyond this stripe offset yet. */
625 static void handle_short_read(int nob_read, obd_count page_count,
626                               struct brw_page *pga)
627 {
628         char *ptr;
629
630         /* skip bytes read OK */
631         while (nob_read > 0) {
632                 LASSERT (page_count > 0);
633
634                 if (pga->count > nob_read) {
635                         /* EOF inside this page */
636                         ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
637                         memset(ptr + nob_read, 0, pga->count - nob_read);
638                         kunmap(pga->pg);
639                         page_count--;
640                         pga++;
641                         break;
642                 }
643
644                 nob_read -= pga->count;
645                 page_count--;
646                 pga++;
647         }
648
649         /* zero remaining pages */
650         while (page_count-- > 0) {
651                 ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
652                 memset(ptr, 0, pga->count);
653                 kunmap(pga->pg);
654                 pga++;
655         }
656 }
657
658 static int check_write_rcs(struct ptlrpc_request *request,
659                            int requested_nob, int niocount,
660                            obd_count page_count, struct brw_page *pga)
661 {
662         int *remote_rcs, i;
663
664         /* return error if any niobuf was in error */
665         remote_rcs = lustre_swab_repbuf(request, 1,
666                                         sizeof(*remote_rcs) * niocount, NULL);
667         if (remote_rcs == NULL) {
668                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
669                 return(-EPROTO);
670         }
671         if (lustre_msg_swabbed(request->rq_repmsg))
672                 for (i = 0; i < niocount; i++)
673                         __swab32s((__u32 *)&remote_rcs[i]);
674
675         for (i = 0; i < niocount; i++) {
676                 if (remote_rcs[i] < 0)
677                         return(remote_rcs[i]);
678
679                 if (remote_rcs[i] != 0) {
680                         CERROR("rc[%d] invalid (%d) req %p\n",
681                                 i, remote_rcs[i], request);
682                         return(-EPROTO);
683                 }
684         }
685
686         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
687                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
688                        requested_nob, request->rq_bulk->bd_nob_transferred);
689                 return(-EPROTO);
690         }
691
692         return (0);
693 }
694
695 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
696 {
697         if (p1->flag != p2->flag) {
698                 unsigned mask = ~OBD_BRW_FROM_GRANT;
699
700                 /* warn if we try to combine flags that we don't know to be
701                  * safe to combine */
702                 if ((p1->flag & mask) != (p2->flag & mask))
703                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
704                                "same brw?\n", p1->flag, p2->flag);
705                 return 0;
706         }
707
708         return (p1->disk_offset + p1->count == p2->disk_offset);
709 }
710
711 #if CHECKSUM_BULK
712 static obd_count cksum_pages(int nob, obd_count page_count,
713                              struct brw_page *pga)
714 {
715         obd_count cksum = 0;
716         char *ptr;
717
718         while (nob > 0) {
719                 LASSERT (page_count > 0);
720
721                 ptr = kmap(pga->pg);
722                 ost_checksum(&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
723                              pga->count > nob ? nob : pga->count);
724                 kunmap(pga->pg);
725
726                 nob -= pga->count;
727                 page_count--;
728                 pga++;
729         }
730
731         return (cksum);
732 }
733 #endif
734
735 #define osc_encrypt_page(page, off, count)  \
736         osc_crypt_page(page, off, count, ENCRYPT_DATA)
737 #define osc_decrypt_page(page, off, count)  \
738         osc_crypt_page(page, off, count, DECRYPT_DATA)
739
740 /*Put a global call back var here is Ugly, but put it to client_obd
741  *also seems not a good idea, WangDi*/
742 crypt_cb_t  osc_crypt_cb = NULL;
743
744 static int osc_crypt_page(struct page *page, obd_off page_off, obd_off count,
745                           int flags)
746 {
747         int rc = 0;
748         ENTRY;
749
750         if (osc_crypt_cb != NULL)
751                 rc = osc_crypt_cb(page, page_off, count, flags);
752         if (rc != 0)
753                 CERROR("crypt page error %d \n", rc);
754         RETURN(rc);
755 }
756
757 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
758                                 struct lov_stripe_md *lsm, obd_count page_count,
759                                 struct brw_page *pga, int *requested_nobp,
760                                 int *niocountp, struct ptlrpc_request **reqp)
761 {
762         struct ptlrpc_request   *req;
763         struct ptlrpc_bulk_desc *desc;
764         struct client_obd       *cli = &imp->imp_obd->u.cli;
765         struct ost_body         *body;
766         struct obd_ioobj        *ioobj;
767         struct niobuf_remote    *niobuf;
768         int                      niocount;
769         int                      size[3];
770         int                      i;
771         int                      requested_nob;
772         int                      opc;
773         int                      rc;
774
775         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
776
777         for (niocount = i = 1; i < page_count; i++)
778                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
779                         niocount++;
780
781         size[0] = sizeof(*body);
782         size[1] = sizeof(*ioobj);
783         size[2] = niocount * sizeof(*niobuf);
784
785         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
786         if (req == NULL)
787                 return (-ENOMEM);
788
789         if (opc == OST_WRITE)
790                 desc = ptlrpc_prep_bulk_imp (req, page_count,
791                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
792         else
793                 desc = ptlrpc_prep_bulk_imp (req, page_count,
794                                              BULK_PUT_SINK, OST_BULK_PORTAL);
795         if (desc == NULL)
796                 GOTO(out, rc = -ENOMEM);
797         /* NB request now owns desc and will free it when it gets freed */
798
799         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
800         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
801         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
802
803         memcpy(&body->oa, oa, sizeof(*oa));
804
805         obdo_to_ioobj(oa, ioobj);
806         ioobj->ioo_bufcnt = niocount;
807
808         LASSERT (page_count > 0);
809
810         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
811                 struct brw_page *pg = &pga[i];
812                 struct brw_page *pg_prev = pg - 1;
813
814                 LASSERT(pg->count > 0);
815                 LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE,
816                          "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg,
817                          pg->page_offset, pg->count);
818                 LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset,
819                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
820                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
821                          i, page_count,
822                          pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset,
823                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
824                          pg_prev->disk_offset);
825
826                 if (opc == OST_WRITE) {
827                         osc_encrypt_page(pg->pg, pg->page_offset, pg->count);
828                 }
829
830                 ptlrpc_prep_bulk_page(desc, pg->pg,
831                                       pg->page_offset & ~PAGE_MASK, pg->count);
832                 requested_nob += pg->count;
833
834                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
835                         niobuf--;
836                         niobuf->len += pg->count;
837                 } else {
838                         niobuf->offset = pg->disk_offset;
839                         niobuf->len    = pg->count;
840                         niobuf->flags  = pg->flag;
841                 }
842         }
843
844         LASSERT((void *)(niobuf - niocount) ==
845                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
846         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
847
848         /* size[0] still sizeof (*body) */
849         if (opc == OST_WRITE) {
850 #if CHECKSUM_BULK
851                 body->oa.o_valid |= OBD_MD_FLCKSUM;
852                 body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
853 #endif
854                 /* 1 RC per niobuf */
855                 size[1] = sizeof(__u32) * niocount;
856                 req->rq_replen = lustre_msg_size(2, size);
857         } else {
858                 /* 1 RC for the whole I/O */
859                 req->rq_replen = lustre_msg_size(1, size);
860         }
861
862         *niocountp = niocount;
863         *requested_nobp = requested_nob;
864         *reqp = req;
865         return (0);
866
867  out:
868         ptlrpc_req_finished (req);
869         return (rc);
870 }
871
872 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
873                                 int requested_nob, int niocount,
874                                 obd_count page_count, struct brw_page *pga,
875                                 int rc)
876 {
877         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
878         struct ost_body *body;
879         ENTRY;
880
881         if (rc < 0)
882                 RETURN(rc);
883
884         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
885         if (body == NULL) {
886                 CERROR ("Can't unpack body\n");
887                 RETURN(-EPROTO);
888         }
889
890         osc_update_grant(cli, body);
891         memcpy(oa, &body->oa, sizeof(*oa));
892
893         if (req->rq_reqmsg->opc == OST_WRITE) {
894                 if (rc > 0) {
895                         CERROR ("Unexpected +ve rc %d\n", rc);
896                         RETURN(-EPROTO);
897                 }
898                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
899                 osc_decrypt_page(pga->pg, pga->page_offset,
900                                  pga->count);
901                 RETURN(check_write_rcs(req, requested_nob, niocount,
902                                        page_count, pga));
903         }
904
905         if (rc > requested_nob) {
906                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
907                 RETURN(-EPROTO);
908         }
909
910         if (rc != req->rq_bulk->bd_nob_transferred) {
911                 CERROR ("Unexpected rc %d (%d transferred)\n",
912                         rc, req->rq_bulk->bd_nob_transferred);
913                 return (-EPROTO);
914         }
915
916         if (rc < requested_nob)
917                 handle_short_read(rc, page_count, pga);
918
919 #if CHECKSUM_BULK
920         if (oa->o_valid & OBD_MD_FLCKSUM) {
921                 const struct ptlrpc_peer *peer =
922                         &req->rq_import->imp_connection->c_peer;
923                 static int cksum_counter;
924                 obd_count server_cksum = oa->o_cksum;
925                 obd_count cksum = cksum_pages(rc, page_count, pga);
926                 char str[PTL_NALFMT_SIZE];
927
928                 ptlrpc_peernid2str(peer, str);
929
930                 cksum_counter++;
931                 if (server_cksum != cksum) {
932                         CERROR("Bad checksum: server %x, client %x, server NID "
933                                LPX64" (%s)\n", server_cksum, cksum,
934                                peer->peer_id.nid, str);
935                         cksum_counter = 0;
936                         oa->o_cksum = cksum;
937                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
938                         CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
939                               cksum_counter, peer->peer_id.nid, str, cksum);
940                 }
941         } else {
942                 static int cksum_missed;
943
944                 cksum_missed++;
945                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
946                         CERROR("Request checksum %u from "LPX64", no reply\n",
947                                cksum_missed,
948                                req->rq_import->imp_connection->c_peer.peer_id.nid);
949         }
950 #endif
951         osc_decrypt_page(pga->pg, pga->page_offset, pga->count);
952         RETURN(0);
953 }
954
955 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
956                             struct lov_stripe_md *lsm,
957                             obd_count page_count, struct brw_page *pga)
958 {
959         int                    requested_nob;
960         int                    niocount;
961         struct ptlrpc_request *request;
962         int                    rc;
963         ENTRY;
964
965 restart_bulk:
966         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
967                                   page_count, pga, &requested_nob, &niocount,
968                                   &request);
969         if (rc != 0)
970                 return (rc);
971
972         rc = ptlrpc_queue_wait(request);
973
974         if (rc == -ETIMEDOUT && request->rq_resend) {
975                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
976                 ptlrpc_req_finished(request);
977                 goto restart_bulk;
978         }
979
980         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
981                                   page_count, pga, rc);
982
983         ptlrpc_req_finished(request);
984         RETURN (rc);
985 }
986
987 static int brw_interpret(struct ptlrpc_request *request,
988                          struct osc_brw_async_args *aa, int rc)
989 {
990         struct obdo *oa      = aa->aa_oa;
991         int requested_nob    = aa->aa_requested_nob;
992         int niocount         = aa->aa_nio_count;
993         obd_count page_count = aa->aa_page_count;
994         struct brw_page *pga = aa->aa_pga;
995         ENTRY;
996
997         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
998                                   page_count, pga, rc);
999         RETURN (rc);
1000 }
1001
1002 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1003                           struct lov_stripe_md *lsm, obd_count page_count,
1004                           struct brw_page *pga, struct ptlrpc_request_set *set)
1005 {
1006         struct ptlrpc_request     *request;
1007         int                        requested_nob;
1008         int                        nio_count;
1009         struct osc_brw_async_args *aa;
1010         int                        rc;
1011         ENTRY;
1012
1013         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1014                                   page_count, pga, &requested_nob, &nio_count,
1015                                   &request);
1016         if (rc == 0) {
1017                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1018                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1019                 aa->aa_oa = oa;
1020                 aa->aa_requested_nob = requested_nob;
1021                 aa->aa_nio_count = nio_count;
1022                 aa->aa_page_count = page_count;
1023                 aa->aa_pga = pga;
1024
1025                 request->rq_interpret_reply = brw_interpret;
1026                 ptlrpc_set_add_req(set, request);
1027         }
1028         RETURN (rc);
1029 }
1030
1031 #ifndef min_t
1032 #define min_t(type,x,y) \
1033         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1034 #endif
1035
1036 /*
1037  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1038  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1039  * fine for our small page arrays and doesn't require allocation.  its an
1040  * insertion sort that swaps elements that are strides apart, shrinking the
1041  * stride down until its '1' and the array is sorted.
1042  */
1043 static void sort_brw_pages(struct brw_page *array, int num)
1044 {
1045         int stride, i, j;
1046         struct brw_page tmp;
1047
1048         if (num == 1)
1049                 return;
1050         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1051                 ;
1052
1053         do {
1054                 stride /= 3;
1055                 for (i = stride ; i < num ; i++) {
1056                         tmp = array[i];
1057                         j = i;
1058                         while (j >= stride && array[j - stride].disk_offset >
1059                                 tmp.disk_offset) {
1060                                 array[j] = array[j - stride];
1061                                 j -= stride;
1062                         }
1063                         array[j] = tmp;
1064                 }
1065         } while (stride > 1);
1066 }
1067
1068 /* make sure we the regions we're passing to elan don't violate its '4
1069  * fragments' constraint.  portal headers are a fragment, all full
1070  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1071  * counts as a fragment.  I think.  see bug 934. */
1072 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1073 {
1074         int frags_left = 3;
1075         int saw_whole_frag = 0;
1076         int i;
1077
1078         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1079                 if (pg->count == PAGE_SIZE) {
1080                         if (!saw_whole_frag) {
1081                                 saw_whole_frag = 1;
1082                                 frags_left--;
1083                         }
1084                 } else {
1085                         frags_left--;
1086                 }
1087         }
1088         return i;
1089 }
1090
1091 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1092                    struct lov_stripe_md *lsm, obd_count page_count,
1093                    struct brw_page *pga, struct obd_trans_info *oti)
1094 {
1095         ENTRY;
1096
1097         if (cmd == OBD_BRW_CHECK) {
1098                 /* The caller just wants to know if there's a chance that this
1099                  * I/O can succeed */
1100                 struct obd_import *imp = class_exp2cliimp(exp);
1101
1102                 if (imp == NULL || imp->imp_invalid)
1103                         RETURN(-EIO);
1104                 RETURN(0);
1105         }
1106
1107         while (page_count) {
1108                 obd_count pages_per_brw;
1109                 int rc;
1110
1111                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1112                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1113                 else
1114                         pages_per_brw = page_count;
1115
1116                 sort_brw_pages(pga, pages_per_brw);
1117                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1118
1119                 rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga);
1120
1121                 if (rc != 0)
1122                         RETURN(rc);
1123
1124                 page_count -= pages_per_brw;
1125                 pga += pages_per_brw;
1126         }
1127         RETURN(0);
1128 }
1129
1130 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1131                          struct lov_stripe_md *lsm, obd_count page_count,
1132                          struct brw_page *pga, struct ptlrpc_request_set *set,
1133                          struct obd_trans_info *oti)
1134 {
1135         ENTRY;
1136
1137         if (cmd == OBD_BRW_CHECK) {
1138                 /* The caller just wants to know if there's a chance that this
1139                  * I/O can succeed */
1140                 struct obd_import *imp = class_exp2cliimp(exp);
1141
1142                 if (imp == NULL || imp->imp_invalid)
1143                         RETURN(-EIO);
1144                 RETURN(0);
1145         }
1146
1147         while (page_count) {
1148                 obd_count pages_per_brw;
1149                 int rc;
1150
1151                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1152                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1153                 else
1154                         pages_per_brw = page_count;
1155
1156                 sort_brw_pages(pga, pages_per_brw);
1157                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1158
1159                 rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set);
1160
1161                 if (rc != 0)
1162                         RETURN(rc);
1163
1164                 page_count -= pages_per_brw;
1165                 pga += pages_per_brw;
1166         }
1167         RETURN(0);
1168 }
1169
1170 static void osc_check_rpcs(struct client_obd *cli);
1171 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1172                            int sent);
1173 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
1174 static void lop_update_pending(struct client_obd *cli,
1175                                struct loi_oap_pages *lop, int cmd, int delta);
1176
1177 /* this is called when a sync waiter receives an interruption.  Its job is to
1178  * get the caller woken as soon as possible.  If its page hasn't been put in an
1179  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1180  * desiring interruption which will forcefully complete the rpc once the rpc
1181  * has timed out */
1182 static void osc_occ_interrupted(struct oig_callback_context *occ)
1183 {
1184         struct osc_async_page *oap;
1185         struct loi_oap_pages *lop;
1186         struct lov_oinfo *loi;
1187         ENTRY;
1188
1189         /* XXX member_of() */
1190         oap = list_entry(occ, struct osc_async_page, oap_occ);
1191
1192         spin_lock(&oap->oap_cli->cl_loi_list_lock);
1193
1194         oap->oap_interrupted = 1;
1195
1196         /* ok, it's been put in an rpc. */
1197         if (oap->oap_request != NULL) {
1198                 ptlrpc_mark_interrupted(oap->oap_request);
1199                 ptlrpcd_wake(oap->oap_request);
1200                 GOTO(unlock, 0);
1201         }
1202
1203         /* we don't get interruption callbacks until osc_trigger_sync_io()
1204          * has been called and put the sync oaps in the pending/urgent lists.*/
1205         if (!list_empty(&oap->oap_pending_item)) {
1206                 list_del_init(&oap->oap_pending_item);
1207                 if (oap->oap_async_flags & ASYNC_URGENT)
1208                         list_del_init(&oap->oap_urgent_item);
1209
1210                 loi = oap->oap_loi;
1211                 lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
1212                         &loi->loi_write_lop : &loi->loi_read_lop;
1213                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1214                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1215
1216                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1217                 oap->oap_oig = NULL;
1218         }
1219
1220 unlock:
1221         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1222 }
1223
1224 /* this must be called holding the loi list lock to give coverage to exit_cache,
1225  * async_flag maintenance, and oap_request */
1226 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1227                               struct osc_async_page *oap, int sent, int rc)
1228 {
1229         osc_exit_cache(cli, oap, sent);
1230         oap->oap_async_flags = 0;
1231         oap->oap_interrupted = 0;
1232
1233         if (oap->oap_request != NULL) {
1234                 ptlrpc_req_finished(oap->oap_request);
1235                 oap->oap_request = NULL;
1236         }
1237
1238         if (rc == 0 && oa != NULL)
1239                 oap->oap_loi->loi_blocks = oa->o_blocks;
1240
1241         if (oap->oap_oig) {
1242                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1243                 oap->oap_oig = NULL;
1244                 EXIT;
1245                 return;
1246         }
1247
1248         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1249                                            oa, rc);
1250 }
1251
1252 static int brw_interpret_oap(struct ptlrpc_request *request,
1253                              struct osc_brw_async_args *aa, int rc)
1254 {
1255         struct osc_async_page *oap;
1256         struct client_obd *cli;
1257         struct list_head *pos, *n;
1258         struct timeval now;
1259         ENTRY;
1260
1261         do_gettimeofday(&now);
1262         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1263                                   aa->aa_nio_count, aa->aa_page_count,
1264                                   aa->aa_pga, rc);
1265
1266         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1267
1268         cli = aa->aa_cli;
1269         /* in failout recovery we ignore writeback failure and want
1270          * to just tell llite to unlock the page and continue */
1271         if (request->rq_reqmsg->opc == OST_WRITE &&
1272             (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
1273                 CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n",
1274                        cli->cl_import,
1275                        cli->cl_import ? cli->cl_import->imp_invalid : -1);
1276                 rc = 0;
1277         }
1278
1279         spin_lock(&cli->cl_loi_list_lock);
1280
1281         if (request->rq_reqmsg->opc == OST_WRITE)
1282                 lprocfs_stime_record(&cli->cl_write_stime, &now,
1283                                      &request->rq_rpcd_start);
1284         else
1285                 lprocfs_stime_record(&cli->cl_read_stime, &now,
1286                                      &request->rq_rpcd_start);
1287
1288
1289
1290         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1291          * is called so we know whether to go to sync BRWs or wait for more
1292          * RPCs to complete */
1293         if (request->rq_reqmsg->opc == OST_WRITE)
1294                 cli->cl_w_in_flight--;
1295         else
1296                 cli->cl_r_in_flight--;
1297
1298         /* the caller may re-use the oap after the completion call so
1299          * we need to clean it up a little */
1300         list_for_each_safe(pos, n, &aa->aa_oaps) {
1301                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1302
1303                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1304                        //oap->oap_page, oap->oap_page->index, oap);
1305
1306                 list_del_init(&oap->oap_rpc_item);
1307                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1308         }
1309
1310         osc_wake_cache_waiters(cli);
1311         osc_check_rpcs(cli);
1312         spin_unlock(&cli->cl_loi_list_lock);
1313
1314         obdo_free(aa->aa_oa);
1315         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1316
1317         RETURN(0);
1318 }
1319
1320 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1321                                             struct list_head *rpc_list,
1322                                             int page_count, int cmd)
1323 {
1324         struct ptlrpc_request *req;
1325         struct brw_page *pga = NULL;
1326         int requested_nob, nio_count;
1327         struct osc_brw_async_args *aa;
1328         struct obdo *oa = NULL;
1329         struct obd_async_page_ops *ops = NULL;
1330         void *caller_data = NULL;
1331         struct list_head *pos;
1332         int i, rc;
1333
1334         LASSERT(!list_empty(rpc_list));
1335
1336         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1337         if (pga == NULL)
1338                 RETURN(ERR_PTR(-ENOMEM));
1339
1340         oa = obdo_alloc();
1341         if (oa == NULL)
1342                 GOTO(out, req = ERR_PTR(-ENOMEM));
1343
1344         i = 0;
1345         list_for_each(pos, rpc_list) {
1346                 struct osc_async_page *oap;
1347
1348                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1349                 if (ops == NULL) {
1350                         ops = oap->oap_caller_ops;
1351                         caller_data = oap->oap_caller_data;
1352                 }
1353                 pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off;
1354                 pga[i].page_offset = pga[i].disk_offset;
1355                 pga[i].pg = oap->oap_page;
1356                 pga[i].count = oap->oap_count;
1357                 pga[i].flag = oap->oap_brw_flags;
1358                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1359                        pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1360                 i++;
1361         }
1362
1363         /* always get the data for the obdo for the rpc */
1364         LASSERT(ops != NULL);
1365         ops->ap_fill_obdo(caller_data, cmd, oa);
1366
1367         sort_brw_pages(pga, page_count);
1368         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1369                                   pga, &requested_nob, &nio_count, &req);
1370         if (rc != 0) {
1371                 CERROR("prep_req failed: %d\n", rc);
1372                 GOTO(out, req = ERR_PTR(rc));
1373         }
1374
1375         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1376         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1377         aa->aa_oa = oa;
1378         aa->aa_requested_nob = requested_nob;
1379         aa->aa_nio_count = nio_count;
1380         aa->aa_page_count = page_count;
1381         aa->aa_pga = pga;
1382         aa->aa_cli = cli;
1383
1384 out:
1385         if (IS_ERR(req)) {
1386                 if (oa)
1387                         obdo_free(oa);
1388                 if (pga)
1389                         OBD_FREE(pga, sizeof(*pga) * page_count);
1390         }
1391         RETURN(req);
1392 }
1393
1394 static void lop_update_pending(struct client_obd *cli,
1395                                struct loi_oap_pages *lop, int cmd, int delta)
1396 {
1397         lop->lop_num_pending += delta;
1398         if (cmd == OBD_BRW_WRITE)
1399                 cli->cl_pending_w_pages += delta;
1400         else
1401                 cli->cl_pending_r_pages += delta;
1402 }
1403
1404 /* the loi lock is held across this function but it's allowed to release
1405  * and reacquire it during its work */
1406 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1407                             int cmd, struct loi_oap_pages *lop)
1408 {
1409         struct ptlrpc_request *request;
1410         obd_count page_count = 0;
1411         struct list_head *tmp, *pos;
1412         struct osc_async_page *oap = NULL;
1413         struct osc_brw_async_args *aa;
1414         struct obd_async_page_ops *ops;
1415         LIST_HEAD(rpc_list);
1416         ENTRY;
1417
1418         /* first we find the pages we're allowed to work with */
1419         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1420                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1421                 ops = oap->oap_caller_ops;
1422
1423                 LASSERT(oap->oap_magic == OAP_MAGIC);
1424
1425                 /* in llite being 'ready' equates to the page being locked
1426                  * until completion unlocks it.  commit_write submits a page
1427                  * as not ready because its unlock will happen unconditionally
1428                  * as the call returns.  if we race with commit_write giving
1429                  * us that page we dont' want to create a hole in the page
1430                  * stream, so we stop and leave the rpc to be fired by
1431                  * another dirtier or kupdated interval (the not ready page
1432                  * will still be on the dirty list).  we could call in
1433                  * at the end of ll_file_write to process the queue again. */
1434                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1435                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1436                         if (rc < 0)
1437                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1438                                                 "instead of ready\n", oap,
1439                                                 oap->oap_page, rc);
1440                         switch (rc) {
1441                         case -EAGAIN:
1442                                 /* llite is telling us that the page is still
1443                                  * in commit_write and that we should try
1444                                  * and put it in an rpc again later.  we
1445                                  * break out of the loop so we don't create
1446                                  * a hole in the sequence of pages in the rpc
1447                                  * stream.*/
1448                                 pos = NULL;
1449                                 break;
1450                         case -EINTR:
1451                                 /* the io isn't needed.. tell the checks
1452                                  * below to complete the rpc with EINTR */
1453                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1454                                 oap->oap_count = -EINTR;
1455                                 break;
1456                         case 0:
1457                                 oap->oap_async_flags |= ASYNC_READY;
1458                                 break;
1459                         default:
1460                                 LASSERTF(0, "oap %p page %p returned %d "
1461                                             "from make_ready\n", oap,
1462                                             oap->oap_page, rc);
1463                                 break;
1464                         }
1465                 }
1466                 if (pos == NULL)
1467                         break;
1468                 /*
1469                  * Page submitted for IO has to be locked. Either by
1470                  * ->ap_make_ready() or by higher layers.
1471                  *
1472                  * XXX nikita: this assertion should be adjusted when lustre
1473                  * starts using PG_writeback for pages being written out.
1474                  */
1475                 LASSERT(PageLocked(oap->oap_page));
1476
1477                 /* take the page out of our book-keeping */
1478                 list_del_init(&oap->oap_pending_item);
1479                 lop_update_pending(cli, lop, cmd, -1);
1480                 list_del_init(&oap->oap_urgent_item);
1481
1482                 /* ask the caller for the size of the io as the rpc leaves. */
1483                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1484                         oap->oap_count =
1485                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1486                 if (oap->oap_count <= 0) {
1487                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1488                                oap->oap_count);
1489                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1490                         continue;
1491                 }
1492
1493                 /* now put the page back in our accounting */
1494                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1495                 if (++page_count >= cli->cl_max_pages_per_rpc)
1496                         break;
1497         }
1498
1499         osc_wake_cache_waiters(cli);
1500
1501         if (page_count == 0)
1502                 RETURN(0);
1503
1504         loi_list_maint(cli, loi);
1505         spin_unlock(&cli->cl_loi_list_lock);
1506
1507         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1508         if (IS_ERR(request)) {
1509                 /* this should happen rarely and is pretty bad, it makes the
1510                  * pending list not follow the dirty order */
1511                 spin_lock(&cli->cl_loi_list_lock);
1512                 list_for_each_safe(pos, tmp, &rpc_list) {
1513                         oap = list_entry(pos, struct osc_async_page,
1514                                          oap_rpc_item);
1515                         list_del_init(&oap->oap_rpc_item);
1516
1517                         /* queued sync pages can be torn down while the pages
1518                          * were between the pending list and the rpc */
1519                         if (oap->oap_interrupted) {
1520                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1521                                 osc_ap_completion(cli, NULL, oap, 0,
1522                                                   oap->oap_count);
1523                                 continue;
1524                         }
1525
1526                         /* put the page back in the loi/lop lists */
1527                         list_add_tail(&oap->oap_pending_item,
1528                                       &lop->lop_pending);
1529                         lop_update_pending(cli, lop, cmd, 1);
1530                         if (oap->oap_async_flags & ASYNC_URGENT)
1531                                 list_add(&oap->oap_urgent_item,
1532                                          &lop->lop_urgent);
1533                 }
1534                 loi_list_maint(cli, loi);
1535                 RETURN(PTR_ERR(request));
1536         }
1537
1538         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1539         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1540         INIT_LIST_HEAD(&aa->aa_oaps);
1541         list_splice(&rpc_list, &aa->aa_oaps);
1542         INIT_LIST_HEAD(&rpc_list);
1543
1544 #ifdef __KERNEL__
1545         if (cmd == OBD_BRW_READ) {
1546                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1547                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1548         } else {
1549                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1550                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1551                                  cli->cl_w_in_flight);
1552         }
1553 #endif
1554
1555         spin_lock(&cli->cl_loi_list_lock);
1556
1557         if (cmd == OBD_BRW_READ)
1558                 cli->cl_r_in_flight++;
1559         else
1560                 cli->cl_w_in_flight++;
1561         /* queued sync pages can be torn down while the pages
1562          * were between the pending list and the rpc */
1563         list_for_each(pos, &aa->aa_oaps) {
1564                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1565                 if (oap->oap_interrupted) {
1566                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1567                                oap, request);
1568                         ptlrpc_mark_interrupted(request);
1569                         break;
1570                 }
1571         }
1572
1573         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1574                         request, page_count, aa, cli->cl_r_in_flight,
1575                         cli->cl_w_in_flight);
1576
1577         oap->oap_request = ptlrpc_request_addref(request);
1578         request->rq_interpret_reply = brw_interpret_oap;
1579         ptlrpcd_add_req(request);
1580         RETURN(1);
1581 }
1582
1583 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1584                          int cmd)
1585 {
1586         int optimal;
1587         ENTRY;
1588
1589         if (lop->lop_num_pending == 0)
1590                 RETURN(0);
1591
1592         /* if we have an invalid import we want to drain the queued pages
1593          * by forcing them through rpcs that immediately fail and complete
1594          * the pages.  recovery relies on this to empty the queued pages
1595          * before canceling the locks and evicting down the llite pages */
1596         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1597                 RETURN(1);
1598
1599         /* stream rpcs in queue order as long as as there is an urgent page
1600          * queued.  this is our cheap solution for good batching in the case
1601          * where writepage marks some random page in the middle of the file as
1602          * urgent because of, say, memory pressure */
1603         if (!list_empty(&lop->lop_urgent))
1604                 RETURN(1);
1605
1606         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1607         optimal = cli->cl_max_pages_per_rpc;
1608         if (cmd == OBD_BRW_WRITE) {
1609                 /* trigger a write rpc stream as long as there are dirtiers
1610                  * waiting for space.  as they're waiting, they're not going to
1611                  * create more pages to coallesce with what's waiting.. */
1612                 if (!list_empty(&cli->cl_cache_waiters))
1613                         RETURN(1);
1614
1615                 /* *2 to avoid triggering rpcs that would want to include pages
1616                  * that are being queued but which can't be made ready until
1617                  * the queuer finishes with the page. this is a wart for
1618                  * llite::commit_write() */
1619                 optimal += 16;
1620         }
1621         if (lop->lop_num_pending >= optimal)
1622                 RETURN(1);
1623
1624         RETURN(0);
1625 }
1626
1627 static void on_list(struct list_head *item, struct list_head *list,
1628                     int should_be_on)
1629 {
1630         if (list_empty(item) && should_be_on)
1631                 list_add_tail(item, list);
1632         else if (!list_empty(item) && !should_be_on)
1633                 list_del_init(item);
1634 }
1635
1636 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1637  * can find pages to build into rpcs quickly */
1638 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1639 {
1640         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1641                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1642                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1643
1644         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1645                 loi->loi_write_lop.lop_num_pending);
1646
1647         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1648                 loi->loi_read_lop.lop_num_pending);
1649 }
1650
1651 #define LOI_DEBUG(LOI, STR, args...)                                     \
1652         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1653                !list_empty(&(LOI)->loi_cli_item),                        \
1654                (LOI)->loi_write_lop.lop_num_pending,                     \
1655                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1656                (LOI)->loi_read_lop.lop_num_pending,                      \
1657                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1658                args)                                                     \
1659
1660 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1661 {
1662         ENTRY;
1663         /* first return all objects which we already know to have
1664          * pages ready to be stuffed into rpcs */
1665         if (!list_empty(&cli->cl_loi_ready_list))
1666                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1667                                   struct lov_oinfo, loi_cli_item));
1668
1669         /* then if we have cache waiters, return all objects with queued
1670          * writes.  This is especially important when many small files
1671          * have filled up the cache and not been fired into rpcs because
1672          * they don't pass the nr_pending/object threshhold */
1673         if (!list_empty(&cli->cl_cache_waiters) &&
1674             !list_empty(&cli->cl_loi_write_list))
1675                 RETURN(list_entry(cli->cl_loi_write_list.next,
1676                                   struct lov_oinfo, loi_write_item));
1677
1678         /* then return all queued objects when we have an invalid import
1679          * so that they get flushed */
1680         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1681                 if (!list_empty(&cli->cl_loi_write_list))
1682                         RETURN(list_entry(cli->cl_loi_write_list.next,
1683                                           struct lov_oinfo, loi_write_item));
1684                 if (!list_empty(&cli->cl_loi_read_list))
1685                         RETURN(list_entry(cli->cl_loi_read_list.next,
1686                                           struct lov_oinfo, loi_read_item));
1687         }
1688         RETURN(NULL);
1689 }
1690
1691 /* called with the loi list lock held */
1692 static void osc_check_rpcs(struct client_obd *cli)
1693 {
1694         struct lov_oinfo *loi;
1695         int rc = 0, race_counter = 0;
1696         ENTRY;
1697
1698         while ((loi = osc_next_loi(cli)) != NULL) {
1699                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1700
1701                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
1702                         break;
1703
1704                 /* attempt some read/write balancing by alternating between
1705                  * reads and writes in an object.  The makes_rpc checks here
1706                  * would be redundant if we were getting read/write work items
1707                  * instead of objects.  we don't want send_oap_rpc to drain a
1708                  * partial read pending queue when we're given this object to
1709                  * do io on writes while there are cache waiters */
1710                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1711                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1712                                               &loi->loi_write_lop);
1713                         if (rc < 0)
1714                                 break;
1715                         if (rc > 0)
1716                                 race_counter = 0;
1717                         else
1718                                 race_counter++;
1719                 }
1720                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1721                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1722                                               &loi->loi_read_lop);
1723                         if (rc < 0)
1724                                 break;
1725                         if (rc > 0)
1726                                 race_counter = 0;
1727                         else
1728                                 race_counter++;
1729                 }
1730
1731                 /* attempt some inter-object balancing by issueing rpcs
1732                  * for each object in turn */
1733                 if (!list_empty(&loi->loi_cli_item))
1734                         list_del_init(&loi->loi_cli_item);
1735                 if (!list_empty(&loi->loi_write_item))
1736                         list_del_init(&loi->loi_write_item);
1737                 if (!list_empty(&loi->loi_read_item))
1738                         list_del_init(&loi->loi_read_item);
1739
1740                 loi_list_maint(cli, loi);
1741
1742                 /* send_oap_rpc fails with 0 when make_ready tells it to
1743                  * back off.  llite's make_ready does this when it tries
1744                  * to lock a page queued for write that is already locked.
1745                  * we want to try sending rpcs from many objects, but we
1746                  * don't want to spin failing with 0.  */
1747                 if (race_counter == 10)
1748                         break;
1749         }
1750         EXIT;
1751 }
1752
1753 /* we're trying to queue a page in the osc so we're subject to the
1754  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1755  * If the osc's queued pages are already at that limit, then we want to sleep
1756  * until there is space in the osc's queue for us.  We also may be waiting for
1757  * write credits from the OST if there are RPCs in flight that may return some
1758  * before we fall back to sync writes.
1759  *
1760  * We need this know our allocation was granted in the presence of signals */
1761 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1762 {
1763         int rc;
1764         ENTRY;
1765         spin_lock(&cli->cl_loi_list_lock);
1766         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1767         spin_unlock(&cli->cl_loi_list_lock);
1768         RETURN(rc);
1769 };
1770
1771 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1772  * grant or cache space. */
1773 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1774                            struct osc_async_page *oap)
1775 {
1776         struct osc_cache_waiter ocw;
1777         struct l_wait_info lwi = { 0 };
1778         struct timeval start, stop;
1779
1780         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1781                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1782                cli->cl_avail_grant);
1783
1784         if (cli->cl_dirty_max < PAGE_SIZE)
1785                 return(-EDQUOT);
1786
1787         /* Hopefully normal case - cache space and write credits available */
1788         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1789             cli->cl_avail_grant >= PAGE_SIZE) {
1790                 /* account for ourselves */
1791                 osc_consume_write_grant(cli, oap);
1792                 return(0);
1793         }
1794
1795         /* Make sure that there are write rpcs in flight to wait for.  This
1796          * is a little silly as this object may not have any pending but
1797          * other objects sure might. */
1798         if (cli->cl_w_in_flight) {
1799                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1800                 init_waitqueue_head(&ocw.ocw_waitq);
1801                 ocw.ocw_oap = oap;
1802                 ocw.ocw_rc = 0;
1803
1804                 loi_list_maint(cli, loi);
1805                 osc_check_rpcs(cli);
1806                 spin_unlock(&cli->cl_loi_list_lock);
1807
1808                 CDEBUG(0, "sleeping for cache space\n");
1809                 do_gettimeofday(&start);
1810                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1811                 do_gettimeofday(&stop);
1812                 spin_lock(&cli->cl_loi_list_lock);
1813                 lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start);
1814                 if (!list_empty(&ocw.ocw_entry)) {
1815                         list_del(&ocw.ocw_entry);
1816                         RETURN(-EINTR);
1817                 }
1818                 RETURN(ocw.ocw_rc);
1819         }
1820
1821         RETURN(-EDQUOT);
1822 }
1823
1824 /* the companion to enter_cache, called when an oap is no longer part of the
1825  * dirty accounting.. so writeback completes or truncate happens before writing
1826  * starts.  must be called with the loi lock held. */
1827 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1828                            int sent)
1829 {
1830         ENTRY;
1831
1832         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1833                 EXIT;
1834                 return;
1835         }
1836
1837         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1838         cli->cl_dirty -= PAGE_SIZE;
1839         if (!sent) {
1840                 cli->cl_lost_grant += PAGE_SIZE;
1841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1843         }
1844
1845         EXIT;
1846 }
1847
1848 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1849                         struct lov_oinfo *loi, struct page *page,
1850                         obd_off offset, struct obd_async_page_ops *ops,
1851                         void *data, void **res)
1852 {
1853         struct osc_async_page *oap;
1854         ENTRY;
1855
1856         OBD_ALLOC(oap, sizeof(*oap));
1857         if (oap == NULL)
1858                 return -ENOMEM;
1859
1860         oap->oap_magic = OAP_MAGIC;
1861         oap->oap_cli = &exp->exp_obd->u.cli;
1862         oap->oap_loi = loi;
1863
1864         oap->oap_caller_ops = ops;
1865         oap->oap_caller_data = data;
1866
1867         oap->oap_page = page;
1868         oap->oap_obj_off = offset;
1869
1870         INIT_LIST_HEAD(&oap->oap_pending_item);
1871         INIT_LIST_HEAD(&oap->oap_urgent_item);
1872         INIT_LIST_HEAD(&oap->oap_rpc_item);
1873
1874         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
1875
1876         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
1877         *res = oap;
1878         RETURN(0);
1879 }
1880
1881 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1882                               struct lov_oinfo *loi, void *cookie,
1883                               int cmd, obd_off off, int count,
1884                               obd_flags brw_flags, enum async_flags async_flags)
1885 {
1886         struct client_obd *cli = &exp->exp_obd->u.cli;
1887         struct osc_async_page *oap;
1888         struct loi_oap_pages *lop;
1889         int rc;
1890         ENTRY;
1891
1892         oap = OAP_FROM_COOKIE(cookie);
1893
1894         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1895                 RETURN(-EIO);
1896
1897         if (!list_empty(&oap->oap_pending_item) ||
1898             !list_empty(&oap->oap_urgent_item) ||
1899             !list_empty(&oap->oap_rpc_item))
1900                 RETURN(-EBUSY);
1901
1902         if (loi == NULL)
1903                 loi = &lsm->lsm_oinfo[0];
1904
1905         spin_lock(&cli->cl_loi_list_lock);
1906
1907         oap->oap_cmd = cmd;
1908         oap->oap_async_flags = async_flags;
1909         oap->oap_page_off = off;
1910         oap->oap_count = count;
1911         oap->oap_brw_flags = brw_flags;
1912
1913         if (cmd == OBD_BRW_WRITE) {
1914                 rc = osc_enter_cache(cli, loi, oap);
1915                 if (rc) {
1916                         spin_unlock(&cli->cl_loi_list_lock);
1917                         RETURN(rc);
1918                 }
1919                 lop = &loi->loi_write_lop;
1920         } else {
1921                 lop = &loi->loi_read_lop;
1922         }
1923
1924         if (oap->oap_async_flags & ASYNC_URGENT)
1925                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1926         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1927         lop_update_pending(cli, lop, cmd, 1);
1928
1929         loi_list_maint(cli, loi);
1930
1931         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
1932                   cmd);
1933
1934         osc_check_rpcs(cli);
1935         spin_unlock(&cli->cl_loi_list_lock);
1936
1937         RETURN(0);
1938 }
1939
1940 /* aka (~was & now & flag), but this is more clear :) */
1941 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1942
1943 static int osc_set_async_flags(struct obd_export *exp,
1944                                struct lov_stripe_md *lsm,
1945                                struct lov_oinfo *loi, void *cookie,
1946                                obd_flags async_flags)
1947 {
1948         struct client_obd *cli = &exp->exp_obd->u.cli;
1949         struct loi_oap_pages *lop;
1950         struct osc_async_page *oap;
1951         int rc = 0;
1952         ENTRY;
1953
1954         oap = OAP_FROM_COOKIE(cookie);
1955
1956         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1957                 RETURN(-EIO);
1958
1959         if (loi == NULL)
1960                 loi = &lsm->lsm_oinfo[0];
1961
1962         if (oap->oap_cmd == OBD_BRW_WRITE) {
1963                 lop = &loi->loi_write_lop;
1964         } else {
1965                 lop = &loi->loi_read_lop;
1966         }
1967
1968         spin_lock(&cli->cl_loi_list_lock);
1969
1970         if (list_empty(&oap->oap_pending_item))
1971                 GOTO(out, rc = -EINVAL);
1972
1973         if ((oap->oap_async_flags & async_flags) == async_flags)
1974                 GOTO(out, rc = 0);
1975
1976         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1977                 oap->oap_async_flags |= ASYNC_READY;
1978
1979         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
1980                 if (list_empty(&oap->oap_rpc_item)) {
1981                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1982                         loi_list_maint(cli, loi);
1983                 }
1984         }
1985
1986         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
1987                         oap->oap_async_flags);
1988 out:
1989         osc_check_rpcs(cli);
1990         spin_unlock(&cli->cl_loi_list_lock);
1991         RETURN(rc);
1992 }
1993
1994 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1995                              struct lov_oinfo *loi,
1996                              struct obd_io_group *oig, void *cookie,
1997                              int cmd, obd_off off, int count,
1998                              obd_flags brw_flags,
1999                              obd_flags async_flags)
2000 {
2001         struct client_obd *cli = &exp->exp_obd->u.cli;
2002         struct osc_async_page *oap;
2003         struct loi_oap_pages *lop;
2004         ENTRY;
2005
2006         oap = OAP_FROM_COOKIE(cookie);
2007
2008         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2009                 RETURN(-EIO);
2010
2011         if (!list_empty(&oap->oap_pending_item) ||
2012             !list_empty(&oap->oap_urgent_item) ||
2013             !list_empty(&oap->oap_rpc_item))
2014                 RETURN(-EBUSY);
2015
2016         if (loi == NULL)
2017                 loi = &lsm->lsm_oinfo[0];
2018
2019         spin_lock(&cli->cl_loi_list_lock);
2020
2021         oap->oap_cmd = cmd;
2022         oap->oap_page_off = off;
2023         oap->oap_count = count;
2024         oap->oap_brw_flags = brw_flags;
2025         oap->oap_async_flags = async_flags;
2026
2027         if (cmd == OBD_BRW_WRITE)
2028                 lop = &loi->loi_write_lop;
2029         else
2030                 lop = &loi->loi_read_lop;
2031
2032         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2033         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2034                 oap->oap_oig = oig;
2035                 oig_add_one(oig, &oap->oap_occ);
2036         }
2037
2038         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
2039
2040         spin_unlock(&cli->cl_loi_list_lock);
2041
2042         RETURN(0);
2043 }
2044
2045 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2046                                  struct loi_oap_pages *lop, int cmd)
2047 {
2048         struct list_head *pos, *tmp;
2049         struct osc_async_page *oap;
2050
2051         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2052                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2053                 list_del(&oap->oap_pending_item);
2054                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2055                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2056                 lop_update_pending(cli, lop, cmd, 1);
2057         }
2058         loi_list_maint(cli, loi);
2059 }
2060
2061 static int osc_trigger_group_io(struct obd_export *exp,
2062                                 struct lov_stripe_md *lsm,
2063                                 struct lov_oinfo *loi,
2064                                 struct obd_io_group *oig)
2065 {
2066         struct client_obd *cli = &exp->exp_obd->u.cli;
2067         ENTRY;
2068
2069         if (loi == NULL)
2070                 loi = &lsm->lsm_oinfo[0];
2071
2072         spin_lock(&cli->cl_loi_list_lock);
2073
2074         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2075         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2076
2077         osc_check_rpcs(cli);
2078         spin_unlock(&cli->cl_loi_list_lock);
2079
2080         RETURN(0);
2081 }
2082
2083 static int osc_teardown_async_page(struct obd_export *exp,
2084                                    struct lov_stripe_md *lsm,
2085                                    struct lov_oinfo *loi, void *cookie)
2086 {
2087         struct client_obd *cli = &exp->exp_obd->u.cli;
2088         struct loi_oap_pages *lop;
2089         struct osc_async_page *oap;
2090         int rc = 0;
2091         ENTRY;
2092
2093         oap = OAP_FROM_COOKIE(cookie);
2094
2095         if (loi == NULL)
2096                 loi = &lsm->lsm_oinfo[0];
2097
2098         if (oap->oap_cmd == OBD_BRW_WRITE) {
2099                 lop = &loi->loi_write_lop;
2100         } else {
2101                 lop = &loi->loi_read_lop;
2102         }
2103
2104         spin_lock(&cli->cl_loi_list_lock);
2105
2106         if (!list_empty(&oap->oap_rpc_item))
2107                 GOTO(out, rc = -EBUSY);
2108
2109         osc_exit_cache(cli, oap, 0);
2110         osc_wake_cache_waiters(cli);
2111
2112         if (!list_empty(&oap->oap_urgent_item)) {
2113                 list_del_init(&oap->oap_urgent_item);
2114                 oap->oap_async_flags &= ~ASYNC_URGENT;
2115         }
2116         if (!list_empty(&oap->oap_pending_item)) {
2117                 list_del_init(&oap->oap_pending_item);
2118                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2119         }
2120         loi_list_maint(cli, loi);
2121
2122         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2123 out:
2124         spin_unlock(&cli->cl_loi_list_lock);
2125         if (rc == 0)
2126                 OBD_FREE(oap, sizeof(*oap));
2127         RETURN(rc);
2128 }
2129
2130 #ifdef __KERNEL__
2131 /* Note: caller will lock/unlock, and set uptodate on the pages */
2132 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2133 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2134                            struct lov_stripe_md *lsm, obd_count page_count,
2135                            struct brw_page *pga)
2136 {
2137         struct ptlrpc_request *request = NULL;
2138         struct ost_body *body;
2139         struct niobuf_remote *nioptr;
2140         struct obd_ioobj *iooptr;
2141         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2142         int swab;
2143         ENTRY;
2144
2145         /* XXX does not handle 'new' brw protocol */
2146
2147         size[1] = sizeof(struct obd_ioobj);
2148         size[2] = page_count * sizeof(*nioptr);
2149
2150         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2151                                   OST_SAN_READ, 3, size, NULL);
2152         if (!request)
2153                 RETURN(-ENOMEM);
2154
2155         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2156         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2157         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2158                                 sizeof(*nioptr) * page_count);
2159
2160         memcpy(&body->oa, oa, sizeof(body->oa));
2161
2162         obdo_to_ioobj(oa, iooptr);
2163         iooptr->ioo_bufcnt = page_count;
2164
2165         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2166                 LASSERT(PageLocked(pga[mapped].pg));
2167                 LASSERT(mapped == 0 ||
2168                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2169
2170                 nioptr->offset = pga[mapped].disk_offset;
2171                 nioptr->len    = pga[mapped].count;
2172                 nioptr->flags  = pga[mapped].flag;
2173         }
2174
2175         size[1] = page_count * sizeof(*nioptr);
2176         request->rq_replen = lustre_msg_size(2, size);
2177
2178         rc = ptlrpc_queue_wait(request);
2179         if (rc)
2180                 GOTO(out_req, rc);
2181
2182         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2183                                   lustre_swab_ost_body);
2184         if (body == NULL) {
2185                 CERROR("Can't unpack body\n");
2186                 GOTO(out_req, rc = -EPROTO);
2187         }
2188
2189         memcpy(oa, &body->oa, sizeof(*oa));
2190
2191         swab = lustre_msg_swabbed(request->rq_repmsg);
2192         LASSERT_REPSWAB(request, 1);
2193         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2194         if (!nioptr) {
2195                 /* nioptr missing or short */
2196                 GOTO(out_req, rc = -EPROTO);
2197         }
2198
2199         /* actual read */
2200         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2201                 struct page *page = pga[mapped].pg;
2202                 struct buffer_head *bh;
2203                 kdev_t dev;
2204
2205                 if (swab)
2206                         lustre_swab_niobuf_remote (nioptr);
2207
2208                 /* got san device associated */
2209                 LASSERT(exp->exp_obd != NULL);
2210                 dev = exp->exp_obd->u.cli.cl_sandev;
2211
2212                 /* hole */
2213                 if (!nioptr->offset) {
2214                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2215                                         page->mapping->host->i_ino,
2216                                         page->index);
2217                         memset(page_address(page), 0, PAGE_SIZE);
2218                         continue;
2219                 }
2220
2221                 if (!page->buffers) {
2222                         create_empty_buffers(page, dev, PAGE_SIZE);
2223                         bh = page->buffers;
2224
2225                         clear_bit(BH_New, &bh->b_state);
2226                         set_bit(BH_Mapped, &bh->b_state);
2227                         bh->b_blocknr = (unsigned long)nioptr->offset;
2228
2229                         clear_bit(BH_Uptodate, &bh->b_state);
2230
2231                         ll_rw_block(READ, 1, &bh);
2232                 } else {
2233                         bh = page->buffers;
2234
2235                         /* if buffer already existed, it must be the
2236                          * one we mapped before, check it */
2237                         LASSERT(!test_bit(BH_New, &bh->b_state));
2238                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2239                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2240
2241                         /* wait it's io completion */
2242                         if (test_bit(BH_Lock, &bh->b_state))
2243                                 wait_on_buffer(bh);
2244
2245                         if (!test_bit(BH_Uptodate, &bh->b_state))
2246                                 ll_rw_block(READ, 1, &bh);
2247                 }
2248
2249
2250                 /* must do syncronous write here */
2251                 wait_on_buffer(bh);
2252                 if (!buffer_uptodate(bh)) {
2253                         /* I/O error */
2254                         rc = -EIO;
2255                         goto out_req;
2256                 }
2257         }
2258
2259 out_req:
2260         ptlrpc_req_finished(request);
2261         RETURN(rc);
2262 }
2263
2264 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2265                             struct lov_stripe_md *lsm, obd_count page_count,
2266                             struct brw_page *pga)
2267 {
2268         struct ptlrpc_request *request = NULL;
2269         struct ost_body *body;
2270         struct niobuf_remote *nioptr;
2271         struct obd_ioobj *iooptr;
2272         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2273         int swab;
2274         ENTRY;
2275
2276         size[1] = sizeof(struct obd_ioobj);
2277         size[2] = page_count * sizeof(*nioptr);
2278
2279         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2280                                   OST_SAN_WRITE, 3, size, NULL);
2281         if (!request)
2282                 RETURN(-ENOMEM);
2283
2284         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2285         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2286         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2287                                 sizeof (*nioptr) * page_count);
2288
2289         memcpy(&body->oa, oa, sizeof(body->oa));
2290
2291         obdo_to_ioobj(oa, iooptr);
2292         iooptr->ioo_bufcnt = page_count;
2293
2294         /* pack request */
2295         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2296                 LASSERT(PageLocked(pga[mapped].pg));
2297                 LASSERT(mapped == 0 ||
2298                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2299
2300                 nioptr->offset = pga[mapped].disk_offset;
2301                 nioptr->len    = pga[mapped].count;
2302                 nioptr->flags  = pga[mapped].flag;
2303         }
2304
2305         size[1] = page_count * sizeof(*nioptr);
2306         request->rq_replen = lustre_msg_size(2, size);
2307
2308         rc = ptlrpc_queue_wait(request);
2309         if (rc)
2310                 GOTO(out_req, rc);
2311
2312         swab = lustre_msg_swabbed (request->rq_repmsg);
2313         LASSERT_REPSWAB (request, 1);
2314         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2315         if (!nioptr) {
2316                 CERROR("absent/short niobuf array\n");
2317                 GOTO(out_req, rc = -EPROTO);
2318         }
2319
2320         /* actual write */
2321         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2322                 struct page *page = pga[mapped].pg;
2323                 struct buffer_head *bh;
2324                 kdev_t dev;
2325
2326                 if (swab)
2327                         lustre_swab_niobuf_remote (nioptr);
2328
2329                 /* got san device associated */
2330                 LASSERT(exp->exp_obd != NULL);
2331                 dev = exp->exp_obd->u.cli.cl_sandev;
2332
2333                 if (!page->buffers) {
2334                         create_empty_buffers(page, dev, PAGE_SIZE);
2335                 } else {
2336                         /* checking */
2337                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2338                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2339                         LASSERT(page->buffers->b_blocknr ==
2340                                 (unsigned long)nioptr->offset);
2341                 }
2342                 bh = page->buffers;
2343
2344                 LASSERT(bh);
2345
2346                 /* if buffer locked, wait it's io completion */
2347                 if (test_bit(BH_Lock, &bh->b_state))
2348                         wait_on_buffer(bh);
2349
2350                 clear_bit(BH_New, &bh->b_state);
2351                 set_bit(BH_Mapped, &bh->b_state);
2352
2353                 /* override the block nr */
2354                 bh->b_blocknr = (unsigned long)nioptr->offset;
2355
2356                 /* we are about to write it, so set it
2357                  * uptodate/dirty
2358                  * page lock should garentee no race condition here */
2359                 set_bit(BH_Uptodate, &bh->b_state);
2360                 set_bit(BH_Dirty, &bh->b_state);
2361
2362                 ll_rw_block(WRITE, 1, &bh);
2363
2364                 /* must do syncronous write here */
2365                 wait_on_buffer(bh);
2366                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2367                         /* I/O error */
2368                         rc = -EIO;
2369                         goto out_req;
2370                 }
2371         }
2372
2373 out_req:
2374         ptlrpc_req_finished(request);
2375         RETURN(rc);
2376 }
2377
2378 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2379                       struct lov_stripe_md *lsm, obd_count page_count,
2380                       struct brw_page *pga, struct obd_trans_info *oti)
2381 {
2382         ENTRY;
2383
2384         while (page_count) {
2385                 obd_count pages_per_brw;
2386                 int rc;
2387
2388                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2389                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2390                 else
2391                         pages_per_brw = page_count;
2392
2393                 if (cmd & OBD_BRW_WRITE)
2394                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2395                 else
2396                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2397
2398                 if (rc != 0)
2399                         RETURN(rc);
2400
2401                 page_count -= pages_per_brw;
2402                 pga += pages_per_brw;
2403         }
2404         RETURN(0);
2405 }
2406 #endif
2407 #endif
2408
2409 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
2410 {
2411         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2412
2413         if (lock == NULL) {
2414                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2415                 return;
2416         }
2417
2418         lock_res_and_lock(lock);
2419 #ifdef __KERNEL__
2420         if (lock->l_ast_data && lock->l_ast_data != data) {
2421                 struct inode *new_inode = data;
2422                 struct inode *old_inode = lock->l_ast_data;
2423                 if (!(old_inode->i_state & I_FREEING))
2424                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2425                 LASSERTF(old_inode->i_state & I_FREEING,
2426                          "Found existing inode %p/%lu/%u state %lu in lock: "
2427                          "setting data to %p/%lu/%u\n", old_inode,
2428                          old_inode->i_ino, old_inode->i_generation,
2429                          old_inode->i_state,
2430                          new_inode, new_inode->i_ino, new_inode->i_generation);
2431         }
2432 #endif
2433         lock->l_ast_data = data;
2434         unlock_res_and_lock(lock);
2435         LDLM_LOCK_PUT(lock);
2436 }
2437
2438 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2439                              ldlm_iterator_t replace, void *data)
2440 {
2441         struct ldlm_res_id res_id = { .name = {0} };
2442         struct obd_device *obd = class_exp2obd(exp);
2443
2444         res_id.name[0] = lsm->lsm_object_id;
2445         res_id.name[2] = lsm->lsm_object_gr;
2446         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2447         return 0;
2448 }
2449
2450 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2451                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2452                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2453                        void *data, __u32 lvb_len, void *lvb_swabber,
2454                        struct lustre_handle *lockh)
2455 {
2456         struct obd_device *obd = exp->exp_obd;
2457         struct ldlm_res_id res_id = { .name = {0} };
2458         struct ost_lvb lvb;
2459         struct ldlm_reply *rep;
2460         struct ptlrpc_request *req = NULL;
2461         int rc;
2462         ENTRY;
2463
2464         res_id.name[0] = lsm->lsm_object_id;
2465         res_id.name[2] = lsm->lsm_object_gr;
2466
2467         /* Filesystem lock extents are extended to page boundaries so that
2468          * dealing with the page cache is a little smoother.  */
2469         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2470         policy->l_extent.end |= ~PAGE_MASK;
2471
2472         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2473                 goto no_match;
2474
2475         /* Next, search for already existing extent locks that will cover us */
2476         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2477                              lockh);
2478         if (rc == 1) {
2479                 if (ptlrpcs_check_cred(obd->u.cli.cl_import)) {
2480                         /* return immediately if no credential held */
2481                         ldlm_lock_decref(lockh, mode);
2482                         RETURN(-EACCES);
2483                 }
2484
2485                 osc_set_data_with_check(lockh, data);
2486                 if (*flags & LDLM_FL_HAS_INTENT) {
2487                         /* I would like to be able to ASSERT here that rss <=
2488                          * kms, but I can't, for reasons which are explained in
2489                          * lov_enqueue() */
2490                 }
2491                 /* We already have a lock, and it's referenced */
2492                 RETURN(ELDLM_OK);
2493         }
2494
2495         /* If we're trying to read, we also search for an existing PW lock.  The
2496          * VFS and page cache already protect us locally, so lots of readers/
2497          * writers can share a single PW lock.
2498          *
2499          * There are problems with conversion deadlocks, so instead of
2500          * converting a read lock to a write lock, we'll just enqueue a new
2501          * one.
2502          *
2503          * At some point we should cancel the read lock instead of making them
2504          * send us a blocking callback, but there are problems with canceling
2505          * locks out from other users right now, too. */
2506
2507         if (mode == LCK_PR) {
2508                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2509                                      policy, LCK_PW, lockh);
2510                 if (rc == 1) {
2511                         if (ptlrpcs_check_cred(obd->u.cli.cl_import)) {
2512                                 /* return immediately if no credential held */
2513                                 ldlm_lock_decref(lockh, LCK_PW);
2514                                 RETURN(-EACCES);
2515                         }
2516
2517                         /* FIXME: This is not incredibly elegant, but it might
2518                          * be more elegant than adding another parameter to
2519                          * lock_match.  I want a second opinion. */
2520                         ldlm_lock_addref(lockh, LCK_PR);
2521                         ldlm_lock_decref(lockh, LCK_PW);
2522                         osc_set_data_with_check(lockh, data);
2523                         RETURN(ELDLM_OK);
2524                 }
2525         }
2526         if (mode == LCK_PW) {
2527                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2528                                      policy, LCK_PR, lockh);
2529                 if (rc == 1) {
2530                         rc = ldlm_cli_convert(lockh, mode, flags);
2531                         if (!rc) {
2532                                 /* Update readers/writers accounting */
2533                                 ldlm_lock_addref(lockh, LCK_PW);
2534                                 ldlm_lock_decref(lockh, LCK_PR);
2535                                 osc_set_data_with_check(lockh, data);
2536                                 RETURN(ELDLM_OK);
2537                         }
2538                         /* If the conversion failed, we need to drop refcount
2539                            on matched lock before we get new one */
2540                         /* XXX Won't it save us some efforts if we cancel PR
2541                            lock here? We are going to take PW lock anyway and it
2542                            will invalidate PR lock */
2543                         ldlm_lock_decref(lockh, LCK_PR);
2544                         if (rc != EDEADLOCK) {
2545                                 RETURN(rc);
2546                         }
2547                 }
2548         }
2549
2550 no_match:
2551         if (*flags & LDLM_FL_HAS_INTENT) {
2552                 int size[2] = {0, sizeof(struct ldlm_request)};
2553
2554                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2555                                       LDLM_ENQUEUE, 2, size, NULL);
2556                 if (req == NULL)
2557                         RETURN(-ENOMEM);
2558
2559                 size[0] = sizeof(*rep);
2560                 size[1] = sizeof(lvb);
2561                 req->rq_replen = lustre_msg_size(2, size);
2562         }
2563         rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2564                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2565                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2566         if (req != NULL) {
2567                 if (rc == ELDLM_LOCK_ABORTED) {
2568                         /* swabbed by ldlm_cli_enqueue() */
2569                         LASSERT_REPSWABBED(req, 0);
2570                         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2571                         LASSERT(rep != NULL);
2572                         if (rep->lock_policy_res1)
2573                                 rc = rep->lock_policy_res1;
2574                 }
2575                 ptlrpc_req_finished(req);
2576         }
2577
2578         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2579                 CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
2580                        lvb.lvb_size, lvb.lvb_blocks);
2581                 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2582                 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2583         }
2584
2585         RETURN(rc);
2586 }
2587
2588 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2589                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2590                      int *flags, void *data, struct lustre_handle *lockh)
2591 {
2592         struct ldlm_res_id res_id = { .name = {0} };
2593         struct obd_device *obd = exp->exp_obd;
2594         int rc;
2595         ENTRY;
2596
2597         res_id.name[0] = lsm->lsm_object_id;
2598         res_id.name[2] = lsm->lsm_object_gr;
2599
2600         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2601
2602         /* Filesystem lock extents are extended to page boundaries so that
2603          * dealing with the page cache is a little smoother */
2604         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2605         policy->l_extent.end |= ~PAGE_MASK;
2606
2607         /* Next, search for already existing extent locks that will cover us */
2608         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2609                              policy, mode, lockh);
2610         if (rc) {
2611                // if (!(*flags & LDLM_FL_TEST_LOCK))
2612                         osc_set_data_with_check(lockh, data);
2613                 RETURN(rc);
2614         }
2615         /* If we're trying to read, we also search for an existing PW lock.  The
2616          * VFS and page cache already protect us locally, so lots of readers/
2617          * writers can share a single PW lock. */
2618         if (mode == LCK_PR) {
2619                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2620                                      policy, LCK_PW, lockh);
2621                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2622                         /* FIXME: This is not incredibly elegant, but it might
2623                          * be more elegant than adding another parameter to
2624                          * lock_match.  I want a second opinion. */
2625                         osc_set_data_with_check(lockh, data);
2626                         ldlm_lock_addref(lockh, LCK_PR);
2627                         ldlm_lock_decref(lockh, LCK_PW);
2628                 }
2629         }
2630         RETURN(rc);
2631 }
2632
2633 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2634                       __u32 mode, struct lustre_handle *lockh)
2635 {
2636         ENTRY;
2637
2638         if (mode == LCK_GROUP)
2639                 ldlm_lock_decref_and_cancel(lockh, mode);
2640         else
2641                 ldlm_lock_decref(lockh, mode);
2642
2643         RETURN(0);
2644 }
2645
2646 static int osc_cancel_unused(struct obd_export *exp,
2647                              struct lov_stripe_md *lsm,
2648                              int flags, void *opaque)
2649 {
2650         struct obd_device *obd = class_exp2obd(exp);
2651         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2652
2653         if (lsm != NULL) {
2654                 res_id.name[0] = lsm->lsm_object_id;
2655                 res_id.name[2] = lsm->lsm_object_gr;
2656                 resp = &res_id;
2657         }
2658
2659         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2660 }
2661
2662 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2663                       unsigned long max_age)
2664 {
2665         struct obd_statfs *msfs;
2666         struct ptlrpc_request *request;
2667         int rc, size = sizeof(*osfs);
2668         ENTRY;
2669
2670         /* We could possibly pass max_age in the request (as an absolute
2671          * timestamp or a "seconds.usec ago") so the target can avoid doing
2672          * extra calls into the filesystem if that isn't necessary (e.g.
2673          * during mount that would help a bit).  Having relative timestamps
2674          * is not so great if request processing is slow, while absolute
2675          * timestamps are not ideal because they need time synchronization. */
2676         request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
2677                                   OST_STATFS, 0, NULL, NULL);
2678         if (!request)
2679                 RETURN(-ENOMEM);
2680
2681         request->rq_replen = lustre_msg_size(1, &size);
2682         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2683
2684         rc = ptlrpc_queue_wait(request);
2685         if (rc)
2686                 GOTO(out, rc);
2687
2688         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2689                                   lustre_swab_obd_statfs);
2690         if (msfs == NULL) {
2691                 CERROR("Can't unpack obd_statfs\n");
2692                 GOTO(out, rc = -EPROTO);
2693         }
2694
2695         memcpy(osfs, msfs, sizeof(*osfs));
2696
2697         EXIT;
2698  out:
2699         ptlrpc_req_finished(request);
2700         return rc;
2701 }
2702
2703 /* Retrieve object striping information.
2704  *
2705  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2706  * the maximum number of OST indices which will fit in the user buffer.
2707  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2708  */
2709 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2710 {
2711         struct lov_user_md lum, *lumk;
2712         int rc, lum_size;
2713         ENTRY;
2714
2715         if (!lsm)
2716                 RETURN(-ENODATA);
2717
2718         rc = copy_from_user(&lum, lump, sizeof(lum));
2719         if (rc)
2720                 RETURN(-EFAULT);
2721
2722         if (lum.lmm_magic != LOV_USER_MAGIC)
2723                 RETURN(-EINVAL);
2724
2725         if (lum.lmm_stripe_count > 0) {
2726                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2727                 OBD_ALLOC(lumk, lum_size);
2728                 if (!lumk)
2729                         RETURN(-ENOMEM);
2730
2731                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2732                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
2733         } else {
2734                 lum_size = sizeof(lum);
2735                 lumk = &lum;
2736         }
2737
2738         lumk->lmm_object_id = lsm->lsm_object_id;
2739         lumk->lmm_object_gr = lsm->lsm_object_gr;
2740         lumk->lmm_stripe_count = 1;
2741
2742         if (copy_to_user(lump, lumk, lum_size))
2743                 rc = -EFAULT;
2744
2745         if (lumk != &lum)
2746                 OBD_FREE(lumk, lum_size);
2747
2748         RETURN(rc);
2749 }
2750
2751 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2752                          void *karg, void *uarg)
2753 {
2754         struct obd_device *obd = exp->exp_obd;
2755         struct obd_ioctl_data *data = karg;
2756         int err = 0;
2757         ENTRY;
2758
2759 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2760         MOD_INC_USE_COUNT;
2761 #else
2762         if (!try_module_get(THIS_MODULE)) {
2763                 CERROR("Can't get module. Is it alive?");
2764                 return -EINVAL;
2765         }
2766 #endif
2767         switch (cmd) {
2768         case OBD_IOC_LOV_GET_CONFIG: {
2769                 char *buf;
2770                 struct lov_desc *desc;
2771                 struct obd_uuid uuid;
2772
2773                 buf = NULL;
2774                 len = 0;
2775                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2776                         GOTO(out, err = -EINVAL);
2777
2778                 data = (struct obd_ioctl_data *)buf;
2779
2780                 if (sizeof(*desc) > data->ioc_inllen1) {
2781                         OBD_FREE(buf, len);
2782                         GOTO(out, err = -EINVAL);
2783                 }
2784
2785                 if (data->ioc_inllen2 < sizeof(uuid)) {
2786                         OBD_FREE(buf, len);
2787                         GOTO(out, err = -EINVAL);
2788                 }
2789
2790                 if (data->ioc_inllen3 < sizeof(__u32)) {
2791                         OBD_FREE(buf, len);
2792                         GOTO(out, err = -EINVAL);
2793                 }
2794
2795                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2796                 desc->ld_tgt_count = 1;
2797                 desc->ld_active_tgt_count = 1;
2798                 desc->ld_default_stripe_count = 1;
2799                 desc->ld_default_stripe_size = 0;
2800                 desc->ld_default_stripe_offset = 0;
2801                 desc->ld_pattern = 0;
2802                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2803                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2804                 *((__u32 *)data->ioc_inlbuf3) = 1;
2805
2806                 err = copy_to_user((void *)uarg, buf, len);
2807                 if (err)
2808                         err = -EFAULT;
2809                 obd_ioctl_freedata(buf, len);
2810                 GOTO(out, err);
2811         }
2812         case LL_IOC_LOV_SETSTRIPE:
2813                 err = obd_alloc_memmd(exp, karg);
2814                 if (err > 0)
2815                         err = 0;
2816                 GOTO(out, err);
2817         case LL_IOC_LOV_GETSTRIPE:
2818                 err = osc_getstripe(karg, uarg);
2819                 GOTO(out, err);
2820         case OBD_IOC_CLIENT_RECOVER:
2821                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2822                                             data->ioc_inlbuf1);
2823                 if (err > 0)
2824                         err = 0;
2825                 GOTO(out, err);
2826         case IOC_OSC_SET_ACTIVE:
2827                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2828                                                data->ioc_offset);
2829                 GOTO(out, err);
2830         case IOC_OSC_CTL_RECOVERY:
2831                 err = ptlrpc_import_control_recovery(obd->u.cli.cl_import,
2832                                                      data->ioc_offset);
2833                 GOTO(out, err);
2834         default:
2835                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm);
2836                 GOTO(out, err = -ENOTTY);
2837         }
2838 out:
2839 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2840         MOD_DEC_USE_COUNT;
2841 #else
2842         module_put(THIS_MODULE);
2843 #endif
2844         return err;
2845 }
2846
2847 static int osc_get_info(struct obd_export *exp, __u32 keylen,
2848                         void *key, __u32 *vallen, void *val)
2849 {
2850         ENTRY;
2851         if (!vallen || !val)
2852                 RETURN(-EFAULT);
2853
2854         if (keylen > strlen("lock_to_stripe") &&
2855             strcmp(key, "lock_to_stripe") == 0) {
2856                 __u32 *stripe = val;
2857                 *vallen = sizeof(*stripe);
2858                 *stripe = 0;
2859                 RETURN(0);
2860         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2861                 struct ptlrpc_request *req;
2862                 obd_id *reply;
2863                 char *bufs[1] = {key};
2864                 int rc;
2865                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2866                                       OST_GET_INFO, 1, (int *)&keylen, bufs);
2867                 if (req == NULL)
2868                         RETURN(-ENOMEM);
2869
2870                 req->rq_replen = lustre_msg_size(1, (int *)vallen);
2871                 rc = ptlrpc_queue_wait(req);
2872                 if (rc)
2873                         GOTO(out, rc);
2874
2875                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
2876                                            lustre_swab_ost_last_id);
2877                 if (reply == NULL) {
2878                         CERROR("Can't unpack OST last ID\n");
2879                         GOTO(out, rc = -EPROTO);
2880                 }
2881                 *((obd_id *)val) = *reply;
2882         out:
2883                 ptlrpc_req_finished(req);
2884                 RETURN(rc);
2885         } else if (keylen >= strlen("client_nid") &&
2886                    strcmp(key, "client_nid") == 0) {
2887                 struct ptlrpc_connection * conn;
2888                 ptl_nid_t * nid = val;
2889                 *vallen = sizeof(*nid);
2890                 
2891                 conn = class_exp2cliimp(exp)->imp_connection;
2892                 if (!conn) 
2893                         RETURN(-ENOTCONN);
2894                 
2895                 nid = &conn->c_peer.peer_id.nid;
2896                 
2897                 RETURN(0);
2898         }
2899         RETURN(-EPROTO);
2900 }
2901
2902 static int osc_set_info(struct obd_export *exp, obd_count keylen,
2903                         void *key, obd_count vallen, void *val)
2904 {
2905         struct obd_device  *obd = exp->exp_obd;
2906         struct obd_import *imp = class_exp2cliimp(exp);
2907         struct llog_ctxt *ctxt;
2908         int rc = 0;
2909         ENTRY;
2910
2911         if (keylen == strlen("unlinked") &&
2912             memcmp(key, "unlinked", keylen) == 0) {
2913                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2914                 spin_lock(&oscc->oscc_lock);
2915                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
2916                 spin_unlock(&oscc->oscc_lock);
2917                 RETURN(0);
2918         }
2919         if (keylen == strlen("unrecovery") &&
2920             memcmp(key, "unrecovery", keylen) == 0) {
2921                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2922                 spin_lock(&oscc->oscc_lock);
2923                 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
2924                 spin_unlock(&oscc->oscc_lock);
2925                 RETURN(0);
2926         }
2927         if (keylen == strlen("initial_recov") &&
2928             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
2929                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2930                 if (vallen != sizeof(int))
2931                         RETURN(-EINVAL);
2932                 imp->imp_initial_recov = *(int *)val;
2933                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
2934                        exp->exp_obd->obd_name,
2935                        imp->imp_initial_recov);
2936                 RETURN(0);
2937         }
2938
2939         if (keylen == strlen("async") &&
2940             memcmp(key, "async", keylen) == 0) {
2941                 struct client_obd *cl = &obd->u.cli;
2942                 if (vallen != sizeof(int))
2943                         RETURN(-EINVAL);
2944                 cl->cl_async = *(int *)val;
2945                 CDEBUG(D_HA, "%s: set async = %d\n",
2946                        obd->obd_name, cl->cl_async);
2947                 RETURN(0);
2948         }
2949         
2950         if (keylen == 5 && strcmp(key, "audit") == 0) {
2951                 struct ptlrpc_request *req;
2952                 char *bufs[2] = {key, val};
2953                 int size[2] = {keylen, vallen};
2954
2955                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2956                                       OST_SET_INFO, 2, size, bufs);
2957                 if (req == NULL)
2958                         RETURN(-ENOMEM);
2959
2960                 req->rq_replen = lustre_msg_size(0, size);
2961                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg),
2962                                    lustre_swab_audit_attr);
2963                 rc = ptlrpc_queue_wait(req);
2964                            
2965                 ptlrpc_req_finished(req);
2966                 RETURN(rc);
2967         }
2968         
2969         if (keylen == 9 && strcmp(key, "audit_obj") == 0) {
2970                 struct ptlrpc_request *req;
2971                 char *bufs[2] = {key, val};
2972                 int size[2] = {keylen, vallen};
2973
2974                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2975                                       OST_SET_INFO, 2, size, bufs);
2976                 if (req == NULL)
2977                         RETURN(-ENOMEM);
2978
2979                 req->rq_replen = lustre_msg_size(0, size);
2980                 lustre_swab_reqbuf(req, 1, sizeof(struct obdo),
2981                                    lustre_swab_obdo);
2982                 rc = ptlrpc_queue_wait(req);
2983                            
2984                 ptlrpc_req_finished(req);
2985                 RETURN(rc);
2986         }
2987
2988         if (keylen == 8 && memcmp(key, "auditlog", 8) == 0) {
2989                 struct ptlrpc_request *req;
2990                 char *bufs[2] = {key, val};
2991                 int size[2] = {keylen, vallen};
2992
2993                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2994                                       OST_SET_INFO, 2, size, bufs);
2995                 if (req == NULL)
2996                         RETURN(-ENOMEM);
2997
2998                 req->rq_replen = lustre_msg_size(0, size);
2999                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_msg),
3000                                    lustre_swab_audit_msg);
3001                 rc = ptlrpc_queue_wait(req);
3002                            
3003                 ptlrpc_req_finished(req);
3004                 RETURN(rc);
3005         }
3006
3007         if (keylen == strlen("sec") &&
3008             memcmp(key, "sec", keylen) == 0) {
3009                 struct client_obd *cli = &exp->exp_obd->u.cli;
3010
3011                 cli->cl_sec_flavor = ptlrpcs_name2flavor(val);
3012                 if (cli->cl_sec_flavor == PTLRPCS_FLVR_INVALID) {
3013                         CERROR("unrecognized security flavor %s\n", (char*) val);
3014                         RETURN(-EINVAL);
3015                 }
3016
3017                 RETURN(0);
3018         }
3019
3020         if (keylen == strlen("sec_flags") &&
3021             memcmp(key, "sec_flags", keylen) == 0) {
3022                 struct client_obd *cli = &exp->exp_obd->u.cli;
3023
3024                 cli->cl_sec_flags = *((unsigned long *) val);
3025                 RETURN(0);
3026         }
3027
3028         if (keylen == strlen("flush_cred") &&
3029             memcmp(key, "flush_cred", keylen) == 0) {
3030                 struct client_obd *cli = &exp->exp_obd->u.cli;
3031
3032                 if (cli->cl_import)
3033                         ptlrpcs_import_flush_current_creds(cli->cl_import);
3034                 RETURN(0);
3035         }
3036         if (keylen == strlen("crypto_cb") &&
3037             memcmp(key, "crypto_cb", keylen) == 0) {
3038                 LASSERT(vallen == sizeof(crypt_cb_t));
3039                 osc_crypt_cb = (crypt_cb_t)val;
3040                 RETURN(0);
3041         }
3042
3043         if (keylen < strlen("mds_conn") ||
3044             memcmp(key, "mds_conn", keylen) != 0)
3045                 RETURN(-EINVAL);
3046
3047         ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
3048                                 LLOG_UNLINK_ORIG_CTXT);
3049         if (ctxt) {
3050                 if (rc == 0)
3051                         rc = llog_initiator_connect(ctxt);
3052                 else
3053                         CERROR("cannot establish the connect for "
3054                                "ctxt %p: %d\n", ctxt, rc);
3055         }
3056
3057         imp->imp_server_timeout = 1;
3058         CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
3059         imp->imp_pingable = 1;
3060
3061         RETURN(rc);
3062 }
3063
3064
3065 static struct llog_operations osc_size_repl_logops = {
3066         lop_cancel: llog_obd_repl_cancel
3067 };
3068
3069 static struct llog_operations osc_unlink_orig_logops;
3070
3071 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3072                          struct obd_device *tgt, int count,
3073                          struct llog_catid *catid)
3074 {
3075         int rc;
3076         ENTRY;
3077
3078         osc_unlink_orig_logops = llog_lvfs_ops;
3079         osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup;
3080         osc_unlink_orig_logops.lop_cleanup = llog_catalog_cleanup;
3081         osc_unlink_orig_logops.lop_add = llog_catalog_add;
3082         osc_unlink_orig_logops.lop_connect = llog_origin_connect;
3083
3084         rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_ORIG_CTXT, tgt, count,
3085                             &catid->lci_logid, &osc_unlink_orig_logops);
3086         if (rc)
3087                 RETURN(rc);
3088
3089         rc = obd_llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3090                             &osc_size_repl_logops);
3091         RETURN(rc);
3092 }
3093
3094 static int osc_llog_finish(struct obd_device *obd,
3095                            struct obd_llogs *llogs, int count)
3096 {
3097         int rc;
3098         ENTRY;
3099
3100         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT));
3101         if (rc)
3102                 RETURN(rc);
3103
3104         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_REPL_CTXT));
3105         RETURN(rc);
3106 }
3107
3108 static int osc_connect(struct lustre_handle *exph,
3109                        struct obd_device *obd, struct obd_uuid *cluuid,
3110                        struct obd_connect_data *data,
3111                        unsigned long connect_flags)
3112 {
3113         int rc;
3114         ENTRY;
3115         rc = client_connect_import(exph, obd, cluuid, data, connect_flags);
3116         RETURN(rc);
3117 }
3118
3119 static int osc_disconnect(struct obd_export *exp, unsigned long flags)
3120 {
3121         struct obd_device *obd = class_exp2obd(exp);
3122         struct llog_ctxt *ctxt;
3123         int rc;
3124         ENTRY;
3125
3126         ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT);
3127         if (obd->u.cli.cl_conn_count == 1)
3128                 /* flush any remaining cancel messages out to the target */
3129                 llog_sync(ctxt, exp);
3130
3131         rc = client_disconnect_export(exp, flags);
3132         RETURN(rc);
3133 }
3134
3135 static int osc_import_event(struct obd_device *obd,
3136                             struct obd_import *imp,
3137                             enum obd_import_event event)
3138 {
3139         struct client_obd *cli;
3140         int rc = 0;
3141
3142         LASSERT(imp->imp_obd == obd);
3143
3144         switch (event) {
3145         case IMP_EVENT_DISCON: {
3146                 /* Only do this on the MDS OSC's */
3147                 if (imp->imp_server_timeout) {
3148                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3149
3150                         spin_lock(&oscc->oscc_lock);
3151                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3152                         spin_unlock(&oscc->oscc_lock);
3153                 }
3154                 break;
3155         }
3156         case IMP_EVENT_INACTIVE: {
3157                 if (obd->obd_observer)
3158                         rc = obd_notify(obd->obd_observer, obd, 0, 0);
3159                 break;
3160         }
3161         case IMP_EVENT_INVALIDATE: {
3162                 struct ldlm_namespace *ns = obd->obd_namespace;
3163
3164                 /* Reset grants */
3165                 cli = &obd->u.cli;
3166                 spin_lock(&cli->cl_loi_list_lock);
3167                 cli->cl_avail_grant = 0;
3168                 cli->cl_lost_grant = 0;
3169                 /* all pages go to failing rpcs due to the invalid import */
3170                 osc_check_rpcs(cli);
3171                 spin_unlock(&cli->cl_loi_list_lock);
3172
3173                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3174
3175                 break;
3176         }
3177         case IMP_EVENT_ACTIVE: {
3178                 /* Only do this on the MDS OSC's */
3179                 if (imp->imp_server_timeout) {
3180                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3181
3182                         spin_lock(&oscc->oscc_lock);
3183                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3184                         spin_unlock(&oscc->oscc_lock);
3185                 }
3186
3187                 if (obd->obd_observer)
3188                         rc = obd_notify(obd->obd_observer, obd, 1, 0);
3189                 break;
3190         }
3191         default:
3192                 CERROR("Unknown import event %d\n", event);
3193                 LBUG();
3194         }
3195         RETURN(rc);
3196 }
3197
3198 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
3199 {
3200         struct lprocfs_static_vars lvars;
3201         int rc;
3202         ENTRY;
3203
3204         lprocfs_init_vars(osc,&lvars);
3205         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
3206         if (rc < 0)
3207                 RETURN(rc);
3208
3209         rc = lproc_osc_attach_seqstat(dev);
3210         if (rc < 0) {
3211                 lprocfs_obd_detach(dev);
3212                 RETURN(rc);
3213         }
3214
3215         ptlrpc_lprocfs_register_obd(dev);
3216         RETURN(0);
3217 }
3218
3219 static int osc_detach(struct obd_device *dev)
3220 {
3221         ptlrpc_lprocfs_unregister_obd(dev);
3222         return lprocfs_obd_detach(dev);
3223 }
3224
3225 static int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3226 {
3227         int rc;
3228         ENTRY;
3229         rc = ptlrpcd_addref();
3230         if (rc)
3231                 RETURN(rc);
3232
3233         rc = client_obd_setup(obd, len, buf);
3234         if (rc)
3235                 ptlrpcd_decref();
3236         else
3237                 oscc_init(obd);
3238
3239         RETURN(rc);
3240 }
3241
3242 static int osc_cleanup(struct obd_device *obd, int flags)
3243 {
3244         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3245         int rc;
3246
3247         rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
3248                                     LDLM_FL_CONFIG_CHANGE, NULL);
3249         if (rc)
3250                 RETURN(rc);
3251
3252         spin_lock(&oscc->oscc_lock);
3253         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3254         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3255         spin_unlock(&oscc->oscc_lock);
3256
3257         rc = client_obd_cleanup(obd, flags);
3258         ptlrpcd_decref();
3259         RETURN(rc);
3260 }
3261
3262 struct obd_ops osc_obd_ops = {
3263         .o_owner                = THIS_MODULE,
3264         .o_attach               = osc_attach,
3265         .o_detach               = osc_detach,
3266         .o_setup                = osc_setup,
3267         .o_cleanup              = osc_cleanup,
3268         .o_add_conn             = client_import_add_conn,
3269         .o_del_conn             = client_import_del_conn,
3270         .o_connect              = osc_connect,
3271         .o_disconnect           = osc_disconnect,
3272         .o_statfs               = osc_statfs,
3273         .o_packmd               = osc_packmd,
3274         .o_unpackmd             = osc_unpackmd,
3275         .o_create               = osc_create,
3276         .o_destroy              = osc_destroy,
3277         .o_getattr              = osc_getattr,
3278         .o_getattr_async        = osc_getattr_async,
3279         .o_setattr              = osc_setattr,
3280         .o_brw                  = osc_brw,
3281         .o_brw_async            = osc_brw_async,
3282         .o_prep_async_page      = osc_prep_async_page,
3283         .o_queue_async_io       = osc_queue_async_io,
3284         .o_set_async_flags      = osc_set_async_flags,
3285         .o_queue_group_io       = osc_queue_group_io,
3286         .o_trigger_group_io     = osc_trigger_group_io,
3287         .o_teardown_async_page  = osc_teardown_async_page,
3288         .o_punch                = osc_punch,
3289         .o_sync                 = osc_sync,
3290         .o_enqueue              = osc_enqueue,
3291         .o_match                = osc_match,
3292         .o_change_cbdata        = osc_change_cbdata,
3293         .o_cancel               = osc_cancel,
3294         .o_cancel_unused        = osc_cancel_unused,
3295         .o_iocontrol            = osc_iocontrol,
3296         .o_get_info             = osc_get_info,
3297         .o_set_info             = osc_set_info,
3298         .o_import_event         = osc_import_event,
3299         .o_llog_init            = osc_llog_init,
3300         .o_llog_finish          = osc_llog_finish,
3301 };
3302
3303 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3304 struct obd_ops sanosc_obd_ops = {
3305         .o_owner                = THIS_MODULE,
3306         .o_attach               = osc_attach,
3307         .o_detach               = osc_detach,
3308         .o_cleanup              = client_obd_cleanup,
3309         .o_add_conn             = client_import_add_conn,
3310         .o_del_conn             = client_import_del_conn,
3311         .o_connect              = osc_connect,
3312         .o_disconnect           = client_disconnect_export,
3313         .o_statfs               = osc_statfs,
3314         .o_packmd               = osc_packmd,
3315         .o_unpackmd             = osc_unpackmd,
3316         .o_create               = osc_real_create,
3317         .o_destroy              = osc_destroy,
3318         .o_getattr              = osc_getattr,
3319         .o_getattr_async        = osc_getattr_async,
3320         .o_setattr              = osc_setattr,
3321         .o_setup                = client_sanobd_setup,
3322         .o_brw                  = sanosc_brw,
3323         .o_punch                = osc_punch,
3324         .o_sync                 = osc_sync,
3325         .o_enqueue              = osc_enqueue,
3326         .o_match                = osc_match,
3327         .o_change_cbdata        = osc_change_cbdata,
3328         .o_cancel               = osc_cancel,
3329         .o_cancel_unused        = osc_cancel_unused,
3330         .o_iocontrol            = osc_iocontrol,
3331         .o_import_event         = osc_import_event,
3332         .o_llog_init            = osc_llog_init,
3333         .o_llog_finish          = osc_llog_finish,
3334 };
3335 #endif
3336
3337 int __init osc_init(void)
3338 {
3339         struct lprocfs_static_vars lvars;
3340 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3341         struct lprocfs_static_vars sanlvars;
3342 #endif
3343         int rc;
3344         ENTRY;
3345
3346         lprocfs_init_vars(osc, &lvars);
3347 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3348         lprocfs_init_vars(osc, &sanlvars);
3349 #endif
3350
3351         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3352                                  OBD_OSC_DEVICENAME);
3353         if (rc)
3354                 RETURN(rc);
3355
3356 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3357         rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars,
3358                                  OBD_SANOSC_DEVICENAME);
3359         if (rc)
3360                 class_unregister_type(OBD_OSC_DEVICENAME);
3361 #endif
3362
3363         RETURN(rc);
3364 }
3365
3366 #ifdef __KERNEL__
3367 static void /*__exit*/ osc_exit(void)
3368 {
3369 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3370         class_unregister_type(OBD_SANOSC_DEVICENAME);
3371 #endif
3372         class_unregister_type(OBD_OSC_DEVICENAME);
3373 }
3374
3375 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3376 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3377 MODULE_LICENSE("GPL");
3378
3379 module_init(osc_init);
3380 module_exit(osc_exit);
3381 #endif