Whamcloud - gitweb
- fixed typos in comments and some alingment in cobd.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #ifdef __KERNEL__
35 # include <linux/version.h>
36 # include <linux/module.h>
37 # include <linux/mm.h>
38 # include <linux/highmem.h>
39 # include <linux/ctype.h>
40 # include <linux/init.h>
41 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 #  include <linux/workqueue.h>
43 #  include <linux/smp_lock.h>
44 # else
45 #  include <linux/locks.h>
46 # endif
47 #else /* __KERNEL__ */
48 # include <liblustre.h>
49 #endif
50
51 #include <linux/lustre_dlm.h>
52 #include <libcfs/kp30.h>
53 #include <linux/lustre_net.h>
54 #include <linux/lustre_sec.h>
55 #include <lustre/lustre_user.h>
56 #include <linux/obd_ost.h>
57 #include <linux/obd_lov.h>
58
59 #ifdef  __CYGWIN__
60 # include <ctype.h>
61 #endif
62
63 #include <linux/lustre_ha.h>
64 #include <linux/lprocfs_status.h>
65 #include <linux/lustre_log.h>
66 #include "osc_internal.h"
67
68 /* Pack OSC object metadata for disk storage (LE byte order). */
69 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
70                       struct lov_stripe_md *lsm)
71 {
72         int lmm_size;
73         ENTRY;
74
75         lmm_size = sizeof(**lmmp);
76         if (!lmmp)
77                 RETURN(lmm_size);
78
79         if (*lmmp && !lsm) {
80                 OBD_FREE(*lmmp, lmm_size);
81                 *lmmp = NULL;
82                 RETURN(0);
83         }
84
85         if (!*lmmp) {
86                 OBD_ALLOC(*lmmp, lmm_size);
87                 if (!*lmmp)
88                         RETURN(-ENOMEM);
89         }
90
91         if (lsm) {
92                 LASSERT(lsm->lsm_object_id);
93                 LASSERT(lsm->lsm_object_gr);
94                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
95                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
96         }
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         ENTRY;
107
108         if (lmm != NULL) {
109                 if (lmm_bytes < sizeof (*lmm)) {
110                         CERROR("lov_mds_md too small: %d, need %d\n",
111                                lmm_bytes, (int)sizeof(*lmm));
112                         RETURN(-EINVAL);
113                 }
114                 /* XXX LOV_MAGIC etc check? */
115
116                 if (lmm->lmm_object_id == 0) {
117                         CERROR("lov_mds_md: zero lmm_object_id\n");
118                         RETURN(-EINVAL);
119                 }
120         }
121
122         lsm_size = lov_stripe_md_size(1);
123         if (lsmp == NULL)
124                 RETURN(lsm_size);
125
126         if (*lsmp != NULL && lmm == NULL) {
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 loi_init((*lsmp)->lsm_oinfo);
137         }
138
139         if (lmm != NULL) {
140                 /* XXX zero *lsmp? */
141                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
142                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
143                 LASSERT((*lsmp)->lsm_object_id);
144                 LASSERT((*lsmp)->lsm_object_gr);
145         }
146
147         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
148
149         RETURN(lsm_size);
150 }
151
152 static int osc_getattr_interpret(struct ptlrpc_request *req,
153                                  struct osc_getattr_async_args *aa, int rc)
154 {
155         struct ost_body *body;
156         ENTRY;
157
158         if (rc != 0)
159                 RETURN(rc);
160
161         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
162         if (body) {
163                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
164                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
165
166                 /* This should really be sent by the OST */
167                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
168                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
169         } else {
170                 CERROR("can't unpack ost_body\n");
171                 rc = -EPROTO;
172                 aa->aa_oa->o_valid = 0;
173         }
174
175         RETURN(rc);
176 }
177
178 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
179                              struct lov_stripe_md *md,
180                              struct ptlrpc_request_set *set)
181 {
182         struct ptlrpc_request *request;
183         struct ost_body *body;
184         int size = sizeof(*body);
185         struct osc_getattr_async_args *aa;
186         ENTRY;
187
188         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
189                                   OST_GETATTR, 1, &size, NULL);
190         if (!request)
191                 RETURN(-ENOMEM);
192
193         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
194         memcpy(&body->oa, oa, sizeof(*oa));
195
196         request->rq_replen = lustre_msg_size(1, &size);
197         request->rq_interpret_reply = osc_getattr_interpret;
198
199         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
200         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
201         aa->aa_oa = oa;
202
203         ptlrpc_set_add_req (set, request);
204         RETURN (0);
205 }
206
207 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
208                        struct lov_stripe_md *md)
209 {
210         struct ptlrpc_request *request;
211         struct ost_body *body;
212         int rc, size = sizeof(*body);
213         ENTRY;
214
215         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
216                                   OST_GETATTR, 1, &size, NULL);
217         if (!request)
218                 RETURN(-ENOMEM);
219
220         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
221         memcpy(&body->oa, oa, sizeof(*oa));
222
223         request->rq_replen = lustre_msg_size(1, &size);
224
225         rc = ptlrpc_queue_wait(request);
226         if (rc) {
227                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
228                 GOTO(out, rc);
229         }
230
231         body = lustre_swab_repbuf(request, 0, sizeof (*body),
232                                   lustre_swab_ost_body);
233         if (body == NULL) {
234                 CERROR ("can't unpack ost_body\n");
235                 GOTO (out, rc = -EPROTO);
236         }
237
238         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
239         memcpy(oa, &body->oa, sizeof(*oa));
240
241         /* This should really be sent by the OST */
242         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
243         oa->o_valid |= OBD_MD_FLBLKSZ;
244
245         EXIT;
246  out:
247         ptlrpc_req_finished(request);
248         return rc;
249 }
250
251 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
252                        struct lov_stripe_md *md, struct obd_trans_info *oti)
253 {
254         struct ptlrpc_request *request;
255         struct ost_body *body;
256         int rc, size = sizeof(*body);
257         ENTRY;
258
259         LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0);
260
261         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
262                                   OST_SETATTR, 1, &size, NULL);
263         if (!request)
264                 RETURN(-ENOMEM);
265
266         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
267         memcpy(&body->oa, oa, sizeof(*oa));
268
269         request->rq_replen = lustre_msg_size(1, &size);
270
271         rc = ptlrpc_queue_wait(request);
272         if (rc)
273                 GOTO(out, rc);
274
275         body = lustre_swab_repbuf(request, 0, sizeof(*body),
276                                   lustre_swab_ost_body);
277         if (body == NULL)
278                 GOTO(out, rc = -EPROTO);
279
280         memcpy(oa, &body->oa, sizeof(*oa));
281
282         EXIT;
283 out:
284         ptlrpc_req_finished(request);
285         RETURN(0);
286 }
287
288 int osc_real_create(struct obd_export *exp, struct obdo *oa,
289                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
290 {
291         struct ptlrpc_request *request;
292         struct ost_body *body;
293         struct lov_stripe_md *lsm;
294         int rc, size = sizeof(*body);
295         ENTRY;
296
297         LASSERT(oa);
298         LASSERT(ea);
299
300         lsm = *ea;
301         if (!lsm) {
302                 rc = obd_alloc_memmd(exp, &lsm);
303                 if (rc < 0)
304                         RETURN(rc);
305         }
306
307         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
308                                   OST_CREATE, 1, &size, NULL);
309         if (!request)
310                 GOTO(out, rc = -ENOMEM);
311
312         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
313         memcpy(&body->oa, oa, sizeof(body->oa));
314
315         request->rq_replen = lustre_msg_size(1, &size);
316         if (oa->o_valid & OBD_MD_FLINLINE) {
317                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
318                         oa->o_flags == OBD_FL_DELORPHAN);
319                 DEBUG_REQ(D_HA, request,
320                           "delorphan from OST integration");
321                 /* Don't resend the delorphan request */
322                 request->rq_no_resend = request->rq_no_delay = 1;
323         }
324
325         rc = ptlrpc_queue_wait(request);
326         if (rc)
327                 GOTO(out_req, rc);
328
329         body = lustre_swab_repbuf(request, 0, sizeof(*body),
330                                   lustre_swab_ost_body);
331         if (body == NULL) {
332                 CERROR ("can't unpack ost_body\n");
333                 GOTO (out_req, rc = -EPROTO);
334         }
335
336         memcpy(oa, &body->oa, sizeof(*oa));
337
338         /* This should really be sent by the OST */
339         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
340         oa->o_valid |= OBD_MD_FLBLKSZ;
341
342         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
343          * have valid lsm_oinfo data structs, so don't go touching that.
344          * This needs to be fixed in a big way.
345          */
346         lsm->lsm_object_id = oa->o_id;
347         lsm->lsm_object_gr = oa->o_gr;
348         *ea = lsm;
349
350         if (oti != NULL) {
351                 oti->oti_transno = request->rq_repmsg->transno;
352
353                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
354                         if (!oti->oti_logcookies)
355                                 oti_alloc_cookies(oti, 1);
356                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
357                                sizeof(oti->oti_onecookie));
358                 }
359         }
360
361         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
362         EXIT;
363 out_req:
364         ptlrpc_req_finished(request);
365 out:
366         if (rc && !*ea)
367                 obd_free_memmd(exp, &lsm);
368         return rc;
369 }
370
371 static int osc_punch(struct obd_export *exp, struct obdo *oa,
372                      struct lov_stripe_md *md, obd_size start,
373                      obd_size end, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *request;
376         struct ost_body *body;
377         int rc, size = sizeof(*body);
378         ENTRY;
379
380         if (!oa) {
381                 CERROR("oa NULL\n");
382                 RETURN(-EINVAL);
383         }
384
385         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
386                                   OST_PUNCH, 1, &size, NULL);
387         if (!request)
388                 RETURN(-ENOMEM);
389
390         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
391         memcpy(&body->oa, oa, sizeof(*oa));
392
393         /* overload the size and blocks fields in the oa with start/end */
394         body->oa.o_size = start;
395         body->oa.o_blocks = end;
396         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
397
398         request->rq_replen = lustre_msg_size(1, &size);
399
400         rc = ptlrpc_queue_wait(request);
401         if (rc)
402                 GOTO(out, rc);
403
404         body = lustre_swab_repbuf (request, 0, sizeof (*body),
405                                    lustre_swab_ost_body);
406         if (body == NULL) {
407                 CERROR ("can't unpack ost_body\n");
408                 GOTO (out, rc = -EPROTO);
409         }
410
411         memcpy(oa, &body->oa, sizeof(*oa));
412
413         EXIT;
414  out:
415         ptlrpc_req_finished(request);
416         return rc;
417 }
418
419 static int osc_sync(struct obd_export *exp, struct obdo *oa,
420                     struct lov_stripe_md *md, obd_size start, obd_size end)
421 {
422         struct ptlrpc_request *request;
423         struct ost_body *body;
424         int rc, size = sizeof(*body);
425         ENTRY;
426
427         if (!oa) {
428                 CERROR("oa NULL\n");
429                 RETURN(-EINVAL);
430         }
431
432         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
433                                   OST_SYNC, 1, &size, NULL);
434         if (!request)
435                 RETURN(-ENOMEM);
436
437         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
438         memcpy(&body->oa, oa, sizeof(*oa));
439
440         /* overload the size and blocks fields in the oa with start/end */
441         body->oa.o_size = start;
442         body->oa.o_blocks = end;
443         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
444
445         request->rq_replen = lustre_msg_size(1, &size);
446
447         rc = ptlrpc_queue_wait(request);
448         if (rc)
449                 GOTO(out, rc);
450
451         body = lustre_swab_repbuf(request, 0, sizeof(*body),
452                                   lustre_swab_ost_body);
453         if (body == NULL) {
454                 CERROR ("can't unpack ost_body\n");
455                 GOTO (out, rc = -EPROTO);
456         }
457
458         memcpy(oa, &body->oa, sizeof(*oa));
459
460         EXIT;
461  out:
462         ptlrpc_req_finished(request);
463         return rc;
464 }
465
466 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
467                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
468 {
469         struct ptlrpc_request *request;
470         struct ost_body *body;
471         int rc, size = sizeof(*body);
472         ENTRY;
473
474         if (!oa) {
475                 CERROR("oa NULL\n");
476                 RETURN(-EINVAL);
477         }
478
479         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
480                                   OST_DESTROY, 1, &size, NULL);
481         if (!request)
482                 RETURN(-ENOMEM);
483
484         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
485
486         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
487                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
488                        sizeof(*oti->oti_logcookies));
489                 oti->oti_logcookies++;
490         }
491
492         memcpy(&body->oa, oa, sizeof(*oa));
493         request->rq_replen = lustre_msg_size(1, &size);
494
495         if (oti != NULL && oti->oti_async) {
496                 /* asynchrounous destroy */
497                 ptlrpcd_add_req(request);
498                 rc = 0;
499         } else {
500                 rc = ptlrpc_queue_wait(request);
501         
502                 if (rc == -ENOENT)
503                         rc = 0;
504
505                 if (rc) {
506                         ptlrpc_req_finished(request);
507                         RETURN(rc);
508                 }
509
510                 body = lustre_swab_repbuf(request, 0, sizeof(*body),
511                                           lustre_swab_ost_body);
512                 if (body == NULL) {
513                         CERROR ("Can't unpack body\n");
514                         ptlrpc_req_finished(request);
515                         RETURN(-EPROTO);
516                 }
517
518                 memcpy(oa, &body->oa, sizeof(*oa));
519                 ptlrpc_req_finished(request);
520         }
521         RETURN(rc);
522 }
523
524 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
525                                 long writing_bytes)
526 {
527         obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
528
529         LASSERT(!(oa->o_valid & bits));
530
531         oa->o_valid |= bits;
532         spin_lock(&cli->cl_loi_list_lock);
533         oa->o_dirty = cli->cl_dirty;
534         oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
535         oa->o_grant = cli->cl_avail_grant;
536         oa->o_dropped = cli->cl_lost_grant;
537         cli->cl_lost_grant = 0;
538         spin_unlock(&cli->cl_loi_list_lock);
539         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
540                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
541 }
542
543 /* caller must hold loi_list_lock */
544 static void osc_consume_write_grant(struct client_obd *cli,
545                                     struct osc_async_page *oap)
546 {
547         cli->cl_dirty += PAGE_SIZE;
548         cli->cl_avail_grant -= PAGE_SIZE;
549         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
550         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
551         LASSERT(cli->cl_avail_grant >= 0);
552 }
553
554 static unsigned long rpcs_in_flight(struct client_obd *cli)
555 {
556         return cli->cl_r_in_flight + cli->cl_w_in_flight;
557 }
558
559 /* caller must hold loi_list_lock */
560 void osc_wake_cache_waiters(struct client_obd *cli)
561 {
562         struct list_head *l, *tmp;
563         struct osc_cache_waiter *ocw;
564
565         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
566                 /* if we can't dirty more, we must wait until some is written */
567                 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
568                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
569                                cli->cl_dirty, cli->cl_dirty_max);
570                         return;
571                 }
572
573                 /* if still dirty cache but no grant wait for pending RPCs that
574                  * may yet return us some grant before doing sync writes */
575                 if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
576                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
577                                cli->cl_w_in_flight);
578                 }
579                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
580                 list_del_init(&ocw->ocw_entry);
581                 if (cli->cl_avail_grant < PAGE_SIZE) {
582                         /* no more RPCs in flight to return grant, do sync IO */
583                         ocw->ocw_rc = -EDQUOT;
584                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
585                 } else {
586                         osc_consume_write_grant(cli, ocw->ocw_oap);
587                 }
588
589                 wake_up(&ocw->ocw_waitq);
590         }
591
592         EXIT;
593 }
594
595 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
596 {
597         spin_lock(&cli->cl_loi_list_lock);
598         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
599         cli->cl_avail_grant += body->oa.o_grant;
600         /* waiters are woken in brw_interpret_oap */
601         spin_unlock(&cli->cl_loi_list_lock);
602 }
603
604 /* We assume that the reason this OSC got a short read is because it read
605  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
606  * via the LOV, and it _knows_ it's reading inside the file, it's just that
607  * this stripe never got written at or beyond this stripe offset yet. */
608 static void handle_short_read(int nob_read, obd_count page_count,
609                               struct brw_page *pga)
610 {
611         char *ptr;
612
613         /* skip bytes read OK */
614         while (nob_read > 0) {
615                 LASSERT (page_count > 0);
616
617                 if (pga->count > nob_read) {
618                         /* EOF inside this page */
619                         ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
620                         memset(ptr + nob_read, 0, pga->count - nob_read);
621                         kunmap(pga->pg);
622                         page_count--;
623                         pga++;
624                         break;
625                 }
626
627                 nob_read -= pga->count;
628                 page_count--;
629                 pga++;
630         }
631
632         /* zero remaining pages */
633         while (page_count-- > 0) {
634                 ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
635                 memset(ptr, 0, pga->count);
636                 kunmap(pga->pg);
637                 pga++;
638         }
639 }
640
641 static int check_write_rcs(struct ptlrpc_request *request,
642                            int requested_nob, int niocount,
643                            obd_count page_count, struct brw_page *pga)
644 {
645         int *remote_rcs, i;
646
647         /* return error if any niobuf was in error */
648         remote_rcs = lustre_swab_repbuf(request, 1,
649                                         sizeof(*remote_rcs) * niocount, NULL);
650         if (remote_rcs == NULL) {
651                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
652                 return(-EPROTO);
653         }
654         if (lustre_msg_swabbed(request->rq_repmsg))
655                 for (i = 0; i < niocount; i++)
656                         __swab32s((__u32 *)&remote_rcs[i]);
657
658         for (i = 0; i < niocount; i++) {
659                 if (remote_rcs[i] < 0)
660                         return(remote_rcs[i]);
661
662                 if (remote_rcs[i] != 0) {
663                         CERROR("rc[%d] invalid (%d) req %p\n",
664                                 i, remote_rcs[i], request);
665                         return(-EPROTO);
666                 }
667         }
668
669         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
670                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
671                        requested_nob, request->rq_bulk->bd_nob_transferred);
672                 return(-EPROTO);
673         }
674
675         return (0);
676 }
677
678 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
679 {
680         if (p1->flag != p2->flag) {
681                 unsigned mask = ~OBD_BRW_FROM_GRANT;
682
683                 /* warn if we try to combine flags that we don't know to be
684                  * safe to combine */
685                 if ((p1->flag & mask) != (p2->flag & mask))
686                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
687                                "same brw?\n", p1->flag, p2->flag);
688                 return 0;
689         }
690
691         return (p1->disk_offset + p1->count == p2->disk_offset);
692 }
693
694 #if CHECKSUM_BULK
695 static obd_count cksum_pages(int nob, obd_count page_count,
696                              struct brw_page *pga)
697 {
698         obd_count cksum = 0;
699         char *ptr;
700
701         while (nob > 0) {
702                 LASSERT (page_count > 0);
703
704                 ptr = kmap(pga->pg);
705                 ost_checksum(&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
706                              pga->count > nob ? nob : pga->count);
707                 kunmap(pga->pg);
708
709                 nob -= pga->count;
710                 page_count--;
711                 pga++;
712         }
713
714         return (cksum);
715 }
716 #endif
717
718 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
719                                 struct lov_stripe_md *lsm, obd_count page_count,
720                                 struct brw_page *pga, int *requested_nobp,
721                                 int *niocountp, struct ptlrpc_request **reqp)
722 {
723         struct ptlrpc_request   *req;
724         struct ptlrpc_bulk_desc *desc;
725         struct client_obd       *cli = &imp->imp_obd->u.cli;
726         struct ost_body         *body;
727         struct obd_ioobj        *ioobj;
728         struct niobuf_remote    *niobuf;
729         int                      niocount;
730         int                      size[3];
731         int                      i;
732         int                      requested_nob;
733         int                      opc;
734         int                      rc;
735
736         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
737
738         for (niocount = i = 1; i < page_count; i++)
739                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
740                         niocount++;
741
742         size[0] = sizeof(*body);
743         size[1] = sizeof(*ioobj);
744         size[2] = niocount * sizeof(*niobuf);
745
746         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
747         if (req == NULL)
748                 return (-ENOMEM);
749
750         if (opc == OST_WRITE)
751                 desc = ptlrpc_prep_bulk_imp (req, page_count,
752                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
753         else
754                 desc = ptlrpc_prep_bulk_imp (req, page_count,
755                                              BULK_PUT_SINK, OST_BULK_PORTAL);
756         if (desc == NULL)
757                 GOTO(out, rc = -ENOMEM);
758         /* NB request now owns desc and will free it when it gets freed */
759
760         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
761         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
762         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
763
764         memcpy(&body->oa, oa, sizeof(*oa));
765
766         obdo_to_ioobj(oa, ioobj);
767         ioobj->ioo_bufcnt = niocount;
768
769         LASSERT (page_count > 0);
770
771         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
772                 struct brw_page *pg = &pga[i];
773                 struct brw_page *pg_prev = pg - 1;
774
775                 LASSERT(pg->count > 0);
776                 LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE,
777                          "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg,
778                          pg->page_offset, pg->count);
779                 LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset,
780                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
781                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
782                          i, page_count,
783                          pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset,
784                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
785                          pg_prev->disk_offset);
786
787                 ptlrpc_prep_bulk_page(desc, pg->pg,
788                                       pg->page_offset & ~PAGE_MASK, pg->count);
789                 requested_nob += pg->count;
790
791                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
792                         niobuf--;
793                         niobuf->len += pg->count;
794                 } else {
795                         niobuf->offset = pg->disk_offset;
796                         niobuf->len    = pg->count;
797                         niobuf->flags  = pg->flag;
798                 }
799         }
800
801         LASSERT((void *)(niobuf - niocount) ==
802                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
803         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
804
805         /* size[0] still sizeof (*body) */
806         if (opc == OST_WRITE) {
807 #if CHECKSUM_BULK
808                 body->oa.o_valid |= OBD_MD_FLCKSUM;
809                 body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
810 #endif
811                 /* 1 RC per niobuf */
812                 size[1] = sizeof(__u32) * niocount;
813                 req->rq_replen = lustre_msg_size(2, size);
814         } else {
815                 /* 1 RC for the whole I/O */
816                 req->rq_replen = lustre_msg_size(1, size);
817         }
818
819         *niocountp = niocount;
820         *requested_nobp = requested_nob;
821         *reqp = req;
822         return (0);
823
824  out:
825         ptlrpc_req_finished (req);
826         return (rc);
827 }
828
829 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
830                                 int requested_nob, int niocount,
831                                 obd_count page_count, struct brw_page *pga,
832                                 int rc)
833 {
834         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
835         struct ost_body *body;
836         ENTRY;
837
838         if (rc < 0)
839                 RETURN(rc);
840
841         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
842         if (body == NULL) {
843                 CERROR ("Can't unpack body\n");
844                 RETURN(-EPROTO);
845         }
846
847         osc_update_grant(cli, body);
848         memcpy(oa, &body->oa, sizeof(*oa));
849
850         if (req->rq_reqmsg->opc == OST_WRITE) {
851                 if (rc > 0) {
852                         CERROR ("Unexpected +ve rc %d\n", rc);
853                         RETURN(-EPROTO);
854                 }
855                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
856
857                 RETURN(check_write_rcs(req, requested_nob, niocount,
858                                        page_count, pga));
859         }
860
861         if (rc > requested_nob) {
862                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
863                 RETURN(-EPROTO);
864         }
865
866         if (rc != req->rq_bulk->bd_nob_transferred) {
867                 CERROR ("Unexpected rc %d (%d transferred)\n",
868                         rc, req->rq_bulk->bd_nob_transferred);
869                 return (-EPROTO);
870         }
871
872         if (rc < requested_nob)
873                 handle_short_read(rc, page_count, pga);
874
875 #if CHECKSUM_BULK
876         if (oa->o_valid & OBD_MD_FLCKSUM) {
877                 const struct ptlrpc_peer *peer =
878                         &req->rq_import->imp_connection->c_peer;
879                 static int cksum_counter;
880                 obd_count server_cksum = oa->o_cksum;
881                 obd_count cksum = cksum_pages(rc, page_count, pga);
882                 char str[PTL_NALFMT_SIZE];
883
884                 ptlrpc_peernid2str(peer, str);
885
886                 cksum_counter++;
887                 if (server_cksum != cksum) {
888                         CERROR("Bad checksum: server %x, client %x, server NID "
889                                LPX64" (%s)\n", server_cksum, cksum,
890                                peer->peer_id.nid, str);
891                         cksum_counter = 0;
892                         oa->o_cksum = cksum;
893                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
894                         CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
895                               cksum_counter, peer->peer_id.nid, str, cksum);
896                 }
897         } else {
898                 static int cksum_missed;
899
900                 cksum_missed++;
901                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
902                         CERROR("Request checksum %u from "LPX64", no reply\n",
903                                cksum_missed,
904                                req->rq_import->imp_connection->c_peer.peer_id.nid);
905         }
906 #endif
907         RETURN(0);
908 }
909
910 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
911                             struct lov_stripe_md *lsm,
912                             obd_count page_count, struct brw_page *pga)
913 {
914         int                    requested_nob;
915         int                    niocount;
916         struct ptlrpc_request *request;
917         int                    rc;
918         ENTRY;
919
920 restart_bulk:
921         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
922                                   page_count, pga, &requested_nob, &niocount,
923                                   &request);
924         if (rc != 0)
925                 return (rc);
926
927         rc = ptlrpc_queue_wait(request);
928
929         if (rc == -ETIMEDOUT && request->rq_resend) {
930                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
931                 ptlrpc_req_finished(request);
932                 goto restart_bulk;
933         }
934
935         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
936                                   page_count, pga, rc);
937
938         ptlrpc_req_finished(request);
939         RETURN (rc);
940 }
941
942 static int brw_interpret(struct ptlrpc_request *request,
943                          struct osc_brw_async_args *aa, int rc)
944 {
945         struct obdo *oa      = aa->aa_oa;
946         int requested_nob    = aa->aa_requested_nob;
947         int niocount         = aa->aa_nio_count;
948         obd_count page_count = aa->aa_page_count;
949         struct brw_page *pga = aa->aa_pga;
950         ENTRY;
951
952         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
953                                   page_count, pga, rc);
954         RETURN (rc);
955 }
956
957 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
958                           struct lov_stripe_md *lsm, obd_count page_count,
959                           struct brw_page *pga, struct ptlrpc_request_set *set)
960 {
961         struct ptlrpc_request     *request;
962         int                        requested_nob;
963         int                        nio_count;
964         struct osc_brw_async_args *aa;
965         int                        rc;
966         ENTRY;
967
968         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
969                                   page_count, pga, &requested_nob, &nio_count,
970                                   &request);
971         if (rc == 0) {
972                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
973                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
974                 aa->aa_oa = oa;
975                 aa->aa_requested_nob = requested_nob;
976                 aa->aa_nio_count = nio_count;
977                 aa->aa_page_count = page_count;
978                 aa->aa_pga = pga;
979
980                 request->rq_interpret_reply = brw_interpret;
981                 ptlrpc_set_add_req(set, request);
982         }
983         RETURN (rc);
984 }
985
986 #ifndef min_t
987 #define min_t(type,x,y) \
988         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
989 #endif
990
991 /*
992  * ugh, we want disk allocation on the target to happen in offset order.  we'll
993  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
994  * fine for our small page arrays and doesn't require allocation.  its an
995  * insertion sort that swaps elements that are strides apart, shrinking the
996  * stride down until its '1' and the array is sorted.
997  */
998 static void sort_brw_pages(struct brw_page *array, int num)
999 {
1000         int stride, i, j;
1001         struct brw_page tmp;
1002
1003         if (num == 1)
1004                 return;
1005         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1006                 ;
1007
1008         do {
1009                 stride /= 3;
1010                 for (i = stride ; i < num ; i++) {
1011                         tmp = array[i];
1012                         j = i;
1013                         while (j >= stride && array[j - stride].disk_offset >
1014                                 tmp.disk_offset) {
1015                                 array[j] = array[j - stride];
1016                                 j -= stride;
1017                         }
1018                         array[j] = tmp;
1019                 }
1020         } while (stride > 1);
1021 }
1022
1023 /* make sure we the regions we're passing to elan don't violate its '4
1024  * fragments' constraint.  portal headers are a fragment, all full
1025  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1026  * counts as a fragment.  I think.  see bug 934. */
1027 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1028 {
1029         int frags_left = 3;
1030         int saw_whole_frag = 0;
1031         int i;
1032
1033         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1034                 if (pg->count == PAGE_SIZE) {
1035                         if (!saw_whole_frag) {
1036                                 saw_whole_frag = 1;
1037                                 frags_left--;
1038                         }
1039                 } else {
1040                         frags_left--;
1041                 }
1042         }
1043         return i;
1044 }
1045
1046 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1047                    struct lov_stripe_md *lsm, obd_count page_count,
1048                    struct brw_page *pga, struct obd_trans_info *oti)
1049 {
1050         ENTRY;
1051
1052         if (cmd == OBD_BRW_CHECK) {
1053                 /* The caller just wants to know if there's a chance that this
1054                  * I/O can succeed */
1055                 struct obd_import *imp = class_exp2cliimp(exp);
1056
1057                 if (imp == NULL || imp->imp_invalid)
1058                         RETURN(-EIO);
1059                 RETURN(0);
1060         }
1061
1062         while (page_count) {
1063                 obd_count pages_per_brw;
1064                 int rc;
1065
1066                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1067                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1068                 else
1069                         pages_per_brw = page_count;
1070
1071                 sort_brw_pages(pga, pages_per_brw);
1072                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1073
1074                 rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga);
1075
1076                 if (rc != 0)
1077                         RETURN(rc);
1078
1079                 page_count -= pages_per_brw;
1080                 pga += pages_per_brw;
1081         }
1082         RETURN(0);
1083 }
1084
1085 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1086                          struct lov_stripe_md *lsm, obd_count page_count,
1087                          struct brw_page *pga, struct ptlrpc_request_set *set,
1088                          struct obd_trans_info *oti)
1089 {
1090         ENTRY;
1091
1092         if (cmd == OBD_BRW_CHECK) {
1093                 /* The caller just wants to know if there's a chance that this
1094                  * I/O can succeed */
1095                 struct obd_import *imp = class_exp2cliimp(exp);
1096
1097                 if (imp == NULL || imp->imp_invalid)
1098                         RETURN(-EIO);
1099                 RETURN(0);
1100         }
1101
1102         while (page_count) {
1103                 obd_count pages_per_brw;
1104                 int rc;
1105
1106                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1107                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1108                 else
1109                         pages_per_brw = page_count;
1110
1111                 sort_brw_pages(pga, pages_per_brw);
1112                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1113
1114                 rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set);
1115
1116                 if (rc != 0)
1117                         RETURN(rc);
1118
1119                 page_count -= pages_per_brw;
1120                 pga += pages_per_brw;
1121         }
1122         RETURN(0);
1123 }
1124
1125 static void osc_check_rpcs(struct client_obd *cli);
1126 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1127                            int sent);
1128 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
1129 static void lop_update_pending(struct client_obd *cli,
1130                                struct loi_oap_pages *lop, int cmd, int delta);
1131
1132 /* this is called when a sync waiter receives an interruption.  Its job is to
1133  * get the caller woken as soon as possible.  If its page hasn't been put in an
1134  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1135  * desiring interruption which will forcefully complete the rpc once the rpc
1136  * has timed out */
1137 static void osc_occ_interrupted(struct oig_callback_context *occ)
1138 {
1139         struct osc_async_page *oap;
1140         struct loi_oap_pages *lop;
1141         struct lov_oinfo *loi;
1142         ENTRY;
1143
1144         /* XXX member_of() */
1145         oap = list_entry(occ, struct osc_async_page, oap_occ);
1146
1147         spin_lock(&oap->oap_cli->cl_loi_list_lock);
1148
1149         oap->oap_interrupted = 1;
1150
1151         /* ok, it's been put in an rpc. */
1152         if (oap->oap_request != NULL) {
1153                 ptlrpc_mark_interrupted(oap->oap_request);
1154                 ptlrpcd_wake(oap->oap_request);
1155                 GOTO(unlock, 0);
1156         }
1157
1158         /* we don't get interruption callbacks until osc_trigger_sync_io()
1159          * has been called and put the sync oaps in the pending/urgent lists.*/
1160         if (!list_empty(&oap->oap_pending_item)) {
1161                 list_del_init(&oap->oap_pending_item);
1162                 if (oap->oap_async_flags & ASYNC_URGENT)
1163                         list_del_init(&oap->oap_urgent_item);
1164
1165                 loi = oap->oap_loi;
1166                 lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
1167                         &loi->loi_write_lop : &loi->loi_read_lop;
1168                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1169                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1170
1171                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1172                 oap->oap_oig = NULL;
1173         }
1174
1175 unlock:
1176         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1177 }
1178
1179 /* this must be called holding the loi list lock to give coverage to exit_cache,
1180  * async_flag maintenance, and oap_request */
1181 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1182                               struct osc_async_page *oap, int sent, int rc)
1183 {
1184         osc_exit_cache(cli, oap, sent);
1185         oap->oap_async_flags = 0;
1186         oap->oap_interrupted = 0;
1187
1188         if (oap->oap_request != NULL) {
1189                 ptlrpc_req_finished(oap->oap_request);
1190                 oap->oap_request = NULL;
1191         }
1192
1193         if (rc == 0 && oa != NULL)
1194                 oap->oap_loi->loi_blocks = oa->o_blocks;
1195
1196         if (oap->oap_oig) {
1197                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1198                 oap->oap_oig = NULL;
1199                 EXIT;
1200                 return;
1201         }
1202
1203         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1204                                            oa, rc);
1205 }
1206
1207 static int brw_interpret_oap(struct ptlrpc_request *request,
1208                              struct osc_brw_async_args *aa, int rc)
1209 {
1210         struct osc_async_page *oap;
1211         struct client_obd *cli;
1212         struct list_head *pos, *n;
1213         struct timeval now;
1214         ENTRY;
1215
1216         do_gettimeofday(&now);
1217         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1218                                   aa->aa_nio_count, aa->aa_page_count,
1219                                   aa->aa_pga, rc);
1220
1221         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1222
1223         cli = aa->aa_cli;
1224         /* in failout recovery we ignore writeback failure and want
1225          * to just tell llite to unlock the page and continue */
1226         if (request->rq_reqmsg->opc == OST_WRITE && 
1227             (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
1228                 CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n", 
1229                        cli->cl_import, 
1230                        cli->cl_import ? cli->cl_import->imp_invalid : -1);
1231                 rc = 0;
1232         }
1233
1234         spin_lock(&cli->cl_loi_list_lock);
1235
1236         if (request->rq_reqmsg->opc == OST_WRITE)
1237                 lprocfs_stime_record(&cli->cl_write_stime, &now,
1238                                      &request->rq_rpcd_start);
1239         else
1240                 lprocfs_stime_record(&cli->cl_read_stime, &now,
1241                                      &request->rq_rpcd_start);
1242
1243
1244
1245         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1246          * is called so we know whether to go to sync BRWs or wait for more
1247          * RPCs to complete */
1248         if (request->rq_reqmsg->opc == OST_WRITE)
1249                 cli->cl_w_in_flight--;
1250         else
1251                 cli->cl_r_in_flight--;
1252
1253         /* the caller may re-use the oap after the completion call so
1254          * we need to clean it up a little */
1255         list_for_each_safe(pos, n, &aa->aa_oaps) {
1256                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1257
1258                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1259                        //oap->oap_page, oap->oap_page->index, oap);
1260
1261                 list_del_init(&oap->oap_rpc_item);
1262                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1263         }
1264
1265         osc_wake_cache_waiters(cli);
1266         osc_check_rpcs(cli);
1267
1268         spin_unlock(&cli->cl_loi_list_lock);
1269
1270         obdo_free(aa->aa_oa);
1271         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1272
1273         RETURN(0);
1274 }
1275
1276 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1277                                             struct list_head *rpc_list,
1278                                             int page_count, int cmd)
1279 {
1280         struct ptlrpc_request *req;
1281         struct brw_page *pga = NULL;
1282         int requested_nob, nio_count;
1283         struct osc_brw_async_args *aa;
1284         struct obdo *oa = NULL;
1285         struct obd_async_page_ops *ops = NULL;
1286         void *caller_data = NULL;
1287         struct list_head *pos;
1288         int i, rc;
1289
1290         LASSERT(!list_empty(rpc_list));
1291
1292         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1293         if (pga == NULL)
1294                 RETURN(ERR_PTR(-ENOMEM));
1295
1296         oa = obdo_alloc();
1297         if (oa == NULL)
1298                 GOTO(out, req = ERR_PTR(-ENOMEM));
1299
1300         i = 0;
1301         list_for_each(pos, rpc_list) {
1302                 struct osc_async_page *oap;
1303
1304                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1305                 if (ops == NULL) {
1306                         ops = oap->oap_caller_ops;
1307                         caller_data = oap->oap_caller_data;
1308                 }
1309                 pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off;
1310                 pga[i].page_offset = pga[i].disk_offset;
1311                 pga[i].pg = oap->oap_page;
1312                 pga[i].count = oap->oap_count;
1313                 pga[i].flag = oap->oap_brw_flags;
1314                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1315                        pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1316                 i++;
1317         }
1318
1319         /* always get the data for the obdo for the rpc */
1320         LASSERT(ops != NULL);
1321         ops->ap_fill_obdo(caller_data, cmd, oa);
1322
1323         sort_brw_pages(pga, page_count);
1324         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1325                                   pga, &requested_nob, &nio_count, &req);
1326         if (rc != 0) {
1327                 CERROR("prep_req failed: %d\n", rc);
1328                 GOTO(out, req = ERR_PTR(rc));
1329         }
1330
1331         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1332         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1333         aa->aa_oa = oa;
1334         aa->aa_requested_nob = requested_nob;
1335         aa->aa_nio_count = nio_count;
1336         aa->aa_page_count = page_count;
1337         aa->aa_pga = pga;
1338         aa->aa_cli = cli;
1339
1340 out:
1341         if (IS_ERR(req)) {
1342                 if (oa)
1343                         obdo_free(oa);
1344                 if (pga)
1345                         OBD_FREE(pga, sizeof(*pga) * page_count);
1346         }
1347         RETURN(req);
1348 }
1349
1350 static void lop_update_pending(struct client_obd *cli,
1351                                struct loi_oap_pages *lop, int cmd, int delta)
1352 {
1353         lop->lop_num_pending += delta;
1354         if (cmd == OBD_BRW_WRITE)
1355                 cli->cl_pending_w_pages += delta;
1356         else
1357                 cli->cl_pending_r_pages += delta;
1358 }
1359
1360 /* the loi lock is held across this function but it's allowed to release
1361  * and reacquire it during its work */
1362 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1363                             int cmd, struct loi_oap_pages *lop)
1364 {
1365         struct ptlrpc_request *request;
1366         obd_count page_count = 0;
1367         struct list_head *tmp, *pos;
1368         struct osc_async_page *oap = NULL;
1369         struct osc_brw_async_args *aa;
1370         struct obd_async_page_ops *ops;
1371         LIST_HEAD(rpc_list);
1372         ENTRY;
1373
1374         /* first we find the pages we're allowed to work with */
1375         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1376                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1377                 ops = oap->oap_caller_ops;
1378
1379                 LASSERT(oap->oap_magic == OAP_MAGIC);
1380
1381                 /* in llite being 'ready' equates to the page being locked
1382                  * until completion unlocks it.  commit_write submits a page
1383                  * as not ready because its unlock will happen unconditionally
1384                  * as the call returns.  if we race with commit_write giving
1385                  * us that page we dont' want to create a hole in the page
1386                  * stream, so we stop and leave the rpc to be fired by
1387                  * another dirtier or kupdated interval (the not ready page
1388                  * will still be on the dirty list).  we could call in
1389                  * at the end of ll_file_write to process the queue again. */
1390                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1391                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1392                         if (rc < 0)
1393                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1394                                                 "instead of ready\n", oap,
1395                                                 oap->oap_page, rc);
1396                         switch (rc) {
1397                         case -EAGAIN:
1398                                 /* llite is telling us that the page is still
1399                                  * in commit_write and that we should try
1400                                  * and put it in an rpc again later.  we
1401                                  * break out of the loop so we don't create
1402                                  * a hole in the sequence of pages in the rpc
1403                                  * stream.*/
1404                                 pos = NULL;
1405                                 break;
1406                         case -EINTR:
1407                                 /* the io isn't needed.. tell the checks
1408                                  * below to complete the rpc with EINTR */
1409                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1410                                 oap->oap_count = -EINTR;
1411                                 break;
1412                         case 0:
1413                                 oap->oap_async_flags |= ASYNC_READY;
1414                                 break;
1415                         default:
1416                                 LASSERTF(0, "oap %p page %p returned %d "
1417                                             "from make_ready\n", oap,
1418                                             oap->oap_page, rc);
1419                                 break;
1420                         }
1421                 }
1422                 if (pos == NULL)
1423                         break;
1424
1425                 /* take the page out of our book-keeping */
1426                 list_del_init(&oap->oap_pending_item);
1427                 lop_update_pending(cli, lop, cmd, -1);
1428                 list_del_init(&oap->oap_urgent_item);
1429
1430                 /* ask the caller for the size of the io as the rpc leaves. */
1431                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1432                         oap->oap_count =
1433                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1434                 if (oap->oap_count <= 0) {
1435                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1436                                oap->oap_count);
1437                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1438                         continue;
1439                 }
1440
1441                 /* now put the page back in our accounting */
1442                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1443                 if (++page_count >= cli->cl_max_pages_per_rpc)
1444                         break;
1445         }
1446
1447         osc_wake_cache_waiters(cli);
1448
1449         if (page_count == 0)
1450                 RETURN(0);
1451
1452         loi_list_maint(cli, loi);
1453         spin_unlock(&cli->cl_loi_list_lock);
1454
1455         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1456         if (IS_ERR(request)) {
1457                 /* this should happen rarely and is pretty bad, it makes the
1458                  * pending list not follow the dirty order */
1459                 spin_lock(&cli->cl_loi_list_lock);
1460                 list_for_each_safe(pos, tmp, &rpc_list) {
1461                         oap = list_entry(pos, struct osc_async_page,
1462                                          oap_rpc_item);
1463                         list_del_init(&oap->oap_rpc_item);
1464
1465                         /* queued sync pages can be torn down while the pages
1466                          * were between the pending list and the rpc */
1467                         if (oap->oap_interrupted) {
1468                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1469                                 osc_ap_completion(cli, NULL, oap, 0,
1470                                                   oap->oap_count);
1471                                 continue;
1472                         }
1473
1474                         /* put the page back in the loi/lop lists */
1475                         list_add_tail(&oap->oap_pending_item,
1476                                       &lop->lop_pending);
1477                         lop_update_pending(cli, lop, cmd, 1);
1478                         if (oap->oap_async_flags & ASYNC_URGENT)
1479                                 list_add(&oap->oap_urgent_item,
1480                                          &lop->lop_urgent);
1481                 }
1482                 loi_list_maint(cli, loi);
1483                 RETURN(PTR_ERR(request));
1484         }
1485
1486         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1487         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1488         INIT_LIST_HEAD(&aa->aa_oaps);
1489         list_splice(&rpc_list, &aa->aa_oaps);
1490         INIT_LIST_HEAD(&rpc_list);
1491
1492 #ifdef __KERNEL__
1493         if (cmd == OBD_BRW_READ) {
1494                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1495                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1496         } else {
1497                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1498                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1499                                  cli->cl_w_in_flight);
1500         }
1501 #endif
1502
1503         spin_lock(&cli->cl_loi_list_lock);
1504
1505         if (cmd == OBD_BRW_READ)
1506                 cli->cl_r_in_flight++;
1507         else
1508                 cli->cl_w_in_flight++;
1509         /* queued sync pages can be torn down while the pages
1510          * were between the pending list and the rpc */
1511         list_for_each(pos, &aa->aa_oaps) {
1512                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1513                 if (oap->oap_interrupted) {
1514                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1515                                oap, request);
1516                         ptlrpc_mark_interrupted(request);
1517                         break;
1518                 }
1519         }
1520
1521         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1522                         request, page_count, aa, cli->cl_r_in_flight,
1523                         cli->cl_w_in_flight);
1524
1525         oap->oap_request = ptlrpc_request_addref(request);
1526         request->rq_interpret_reply = brw_interpret_oap;
1527         ptlrpcd_add_req(request);
1528         RETURN(1);
1529 }
1530
1531 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1532                          int cmd)
1533 {
1534         int optimal;
1535         ENTRY;
1536
1537         if (lop->lop_num_pending == 0)
1538                 RETURN(0);
1539
1540         /* if we have an invalid import we want to drain the queued pages
1541          * by forcing them through rpcs that immediately fail and complete
1542          * the pages.  recovery relies on this to empty the queued pages
1543          * before canceling the locks and evicting down the llite pages */
1544         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1545                 RETURN(1);
1546
1547         /* stream rpcs in queue order as long as as there is an urgent page
1548          * queued.  this is our cheap solution for good batching in the case
1549          * where writepage marks some random page in the middle of the file as
1550          * urgent because of, say, memory pressure */
1551         if (!list_empty(&lop->lop_urgent))
1552                 RETURN(1);
1553
1554         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1555         optimal = cli->cl_max_pages_per_rpc;
1556         if (cmd == OBD_BRW_WRITE) {
1557                 /* trigger a write rpc stream as long as there are dirtiers
1558                  * waiting for space.  as they're waiting, they're not going to
1559                  * create more pages to coallesce with what's waiting.. */
1560                 if (!list_empty(&cli->cl_cache_waiters))
1561                         RETURN(1);
1562
1563                 /* *2 to avoid triggering rpcs that would want to include pages
1564                  * that are being queued but which can't be made ready until
1565                  * the queuer finishes with the page. this is a wart for
1566                  * llite::commit_write() */
1567                 optimal += 16;
1568         }
1569         if (lop->lop_num_pending >= optimal)
1570                 RETURN(1);
1571
1572         RETURN(0);
1573 }
1574
1575 static void on_list(struct list_head *item, struct list_head *list,
1576                     int should_be_on)
1577 {
1578         if (list_empty(item) && should_be_on)
1579                 list_add_tail(item, list);
1580         else if (!list_empty(item) && !should_be_on)
1581                 list_del_init(item);
1582 }
1583
1584 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1585  * can find pages to build into rpcs quickly */
1586 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1587 {
1588         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1589                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1590                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1591
1592         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1593                 loi->loi_write_lop.lop_num_pending);
1594
1595         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1596                 loi->loi_read_lop.lop_num_pending);
1597 }
1598
1599 #define LOI_DEBUG(LOI, STR, args...)                                     \
1600         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1601                !list_empty(&(LOI)->loi_cli_item),                        \
1602                (LOI)->loi_write_lop.lop_num_pending,                     \
1603                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1604                (LOI)->loi_read_lop.lop_num_pending,                      \
1605                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1606                args)                                                     \
1607
1608 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1609 {
1610         ENTRY;
1611         /* first return all objects which we already know to have
1612          * pages ready to be stuffed into rpcs */
1613         if (!list_empty(&cli->cl_loi_ready_list))
1614                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1615                                   struct lov_oinfo, loi_cli_item));
1616
1617         /* then if we have cache waiters, return all objects with queued
1618          * writes.  This is especially important when many small files
1619          * have filled up the cache and not been fired into rpcs because
1620          * they don't pass the nr_pending/object threshhold */
1621         if (!list_empty(&cli->cl_cache_waiters) &&
1622             !list_empty(&cli->cl_loi_write_list))
1623                 RETURN(list_entry(cli->cl_loi_write_list.next,
1624                                   struct lov_oinfo, loi_write_item));
1625
1626         /* then return all queued objects when we have an invalid import
1627          * so that they get flushed */
1628         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1629                 if (!list_empty(&cli->cl_loi_write_list))
1630                         RETURN(list_entry(cli->cl_loi_write_list.next,
1631                                           struct lov_oinfo, loi_write_item));
1632                 if (!list_empty(&cli->cl_loi_read_list))
1633                         RETURN(list_entry(cli->cl_loi_read_list.next,
1634                                           struct lov_oinfo, loi_read_item));
1635         }
1636         RETURN(NULL);
1637 }
1638
1639 /* called with the loi list lock held */
1640 static void osc_check_rpcs(struct client_obd *cli)
1641 {
1642         struct lov_oinfo *loi;
1643         int rc = 0, race_counter = 0;
1644         ENTRY;
1645
1646         while ((loi = osc_next_loi(cli)) != NULL) {
1647                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1648                 
1649                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)                        
1650                         break;
1651
1652                 /* attempt some read/write balancing by alternating between
1653                  * reads and writes in an object.  The makes_rpc checks here
1654                  * would be redundant if we were getting read/write work items
1655                  * instead of objects.  we don't want send_oap_rpc to drain a
1656                  * partial read pending queue when we're given this object to
1657                  * do io on writes while there are cache waiters */
1658                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1659                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1660                                               &loi->loi_write_lop);
1661                         if (rc < 0)
1662                                 break;
1663                         if (rc > 0)
1664                                 race_counter = 0;
1665                         else
1666                                 race_counter++;
1667                 }
1668                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1669                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1670                                               &loi->loi_read_lop);
1671                         if (rc < 0)
1672                                 break;
1673                         if (rc > 0)
1674                                 race_counter = 0;
1675                         else
1676                                 race_counter++;
1677                 }
1678
1679                 /* attempt some inter-object balancing by issueing rpcs
1680                  * for each object in turn */
1681                 if (!list_empty(&loi->loi_cli_item))
1682                         list_del_init(&loi->loi_cli_item);
1683                 if (!list_empty(&loi->loi_write_item))
1684                         list_del_init(&loi->loi_write_item);
1685                 if (!list_empty(&loi->loi_read_item))
1686                         list_del_init(&loi->loi_read_item);
1687
1688                 loi_list_maint(cli, loi);
1689
1690                 /* send_oap_rpc fails with 0 when make_ready tells it to
1691                  * back off.  llite's make_ready does this when it tries
1692                  * to lock a page queued for write that is already locked.
1693                  * we want to try sending rpcs from many objects, but we
1694                  * don't want to spin failing with 0.  */
1695                 if (race_counter == 10)
1696                         break;
1697         }
1698         EXIT;
1699 }
1700
1701 /* we're trying to queue a page in the osc so we're subject to the
1702  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1703  * If the osc's queued pages are already at that limit, then we want to sleep
1704  * until there is space in the osc's queue for us.  We also may be waiting for
1705  * write credits from the OST if there are RPCs in flight that may return some
1706  * before we fall back to sync writes.
1707  *
1708  * We need this know our allocation was granted in the presence of signals */
1709 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1710 {
1711         int rc;
1712         ENTRY;
1713         spin_lock(&cli->cl_loi_list_lock);
1714         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1715         spin_unlock(&cli->cl_loi_list_lock);
1716         RETURN(rc);
1717 };
1718
1719 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1720  * grant or cache space. */
1721 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1722                            struct osc_async_page *oap)
1723 {
1724         struct osc_cache_waiter ocw;
1725         struct l_wait_info lwi = { 0 };
1726         struct timeval start, stop;
1727
1728         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1729                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1730                cli->cl_avail_grant);
1731
1732         if (cli->cl_dirty_max < PAGE_SIZE)
1733                 return(-EDQUOT);
1734
1735         /* Hopefully normal case - cache space and write credits available */
1736         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1737             cli->cl_avail_grant >= PAGE_SIZE) {
1738                 /* account for ourselves */
1739                 osc_consume_write_grant(cli, oap);
1740                 return(0);
1741         }
1742
1743         /* Make sure that there are write rpcs in flight to wait for.  This
1744          * is a little silly as this object may not have any pending but
1745          * other objects sure might. */
1746         if (cli->cl_w_in_flight) {                
1747                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1748                 init_waitqueue_head(&ocw.ocw_waitq);
1749                 ocw.ocw_oap = oap;
1750                 ocw.ocw_rc = 0;
1751
1752                 loi_list_maint(cli, loi);
1753                 osc_check_rpcs(cli);
1754                 spin_unlock(&cli->cl_loi_list_lock);
1755
1756                 CDEBUG(0, "sleeping for cache space\n");
1757                 do_gettimeofday(&start);
1758                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1759                 do_gettimeofday(&stop);
1760                 spin_lock(&cli->cl_loi_list_lock);
1761                 lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start);
1762                 if (!list_empty(&ocw.ocw_entry)) {
1763                         list_del(&ocw.ocw_entry);
1764                         RETURN(-EINTR);
1765                 }
1766                 RETURN(ocw.ocw_rc);
1767         }
1768
1769         RETURN(-EDQUOT);
1770 }
1771
1772 /* the companion to enter_cache, called when an oap is no longer part of the
1773  * dirty accounting.. so writeback completes or truncate happens before writing
1774  * starts.  must be called with the loi lock held. */
1775 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1776                            int sent)
1777 {
1778         ENTRY;
1779
1780         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1781                 EXIT;
1782                 return;
1783         }
1784
1785         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1786         cli->cl_dirty -= PAGE_SIZE;
1787         if (!sent) {
1788                 cli->cl_lost_grant += PAGE_SIZE;
1789                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1790                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1791         }
1792
1793         EXIT;
1794 }
1795
1796 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1797                         struct lov_oinfo *loi, struct page *page,
1798                         obd_off offset, struct obd_async_page_ops *ops,
1799                         void *data, void **res)
1800 {
1801         struct osc_async_page *oap;
1802         ENTRY;
1803
1804         OBD_ALLOC(oap, sizeof(*oap));
1805         if (oap == NULL)
1806                 return -ENOMEM;
1807
1808         oap->oap_magic = OAP_MAGIC;
1809         oap->oap_cli = &exp->exp_obd->u.cli;
1810         oap->oap_loi = loi;
1811
1812         oap->oap_caller_ops = ops;
1813         oap->oap_caller_data = data;
1814
1815         oap->oap_page = page;
1816         oap->oap_obj_off = offset;
1817
1818         INIT_LIST_HEAD(&oap->oap_pending_item);
1819         INIT_LIST_HEAD(&oap->oap_urgent_item);
1820         INIT_LIST_HEAD(&oap->oap_rpc_item);
1821
1822         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
1823
1824         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
1825         *res = oap;
1826         RETURN(0);
1827 }
1828
1829 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1830                               struct lov_oinfo *loi, void *cookie,
1831                               int cmd, obd_off off, int count,
1832                               obd_flags brw_flags, enum async_flags async_flags)
1833 {
1834         struct client_obd *cli = &exp->exp_obd->u.cli;
1835         struct osc_async_page *oap;
1836         struct loi_oap_pages *lop;
1837         int rc;
1838         ENTRY;
1839
1840         oap = OAP_FROM_COOKIE(cookie);
1841
1842         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1843                 RETURN(-EIO);
1844
1845         if (!list_empty(&oap->oap_pending_item) ||
1846             !list_empty(&oap->oap_urgent_item) ||
1847             !list_empty(&oap->oap_rpc_item))
1848                 RETURN(-EBUSY);
1849
1850         if (loi == NULL)
1851                 loi = &lsm->lsm_oinfo[0];
1852
1853         spin_lock(&cli->cl_loi_list_lock);
1854
1855         oap->oap_cmd = cmd;
1856         oap->oap_async_flags = async_flags;
1857         oap->oap_page_off = off;
1858         oap->oap_count = count;
1859         oap->oap_brw_flags = brw_flags;
1860
1861         if (cmd == OBD_BRW_WRITE) {
1862                 rc = osc_enter_cache(cli, loi, oap);
1863                 if (rc) {
1864                         spin_unlock(&cli->cl_loi_list_lock);
1865                         RETURN(rc);
1866                 }
1867                 lop = &loi->loi_write_lop;
1868         } else {
1869                 lop = &loi->loi_read_lop;
1870         }
1871
1872         if (oap->oap_async_flags & ASYNC_URGENT)
1873                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1874         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1875         lop_update_pending(cli, lop, cmd, 1);
1876
1877         loi_list_maint(cli, loi);
1878
1879         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
1880                   cmd);
1881
1882         osc_check_rpcs(cli);
1883         spin_unlock(&cli->cl_loi_list_lock);
1884
1885         RETURN(0);
1886 }
1887
1888 /* aka (~was & now & flag), but this is more clear :) */
1889 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1890
1891 static int osc_set_async_flags(struct obd_export *exp,
1892                                struct lov_stripe_md *lsm,
1893                                struct lov_oinfo *loi, void *cookie,
1894                                obd_flags async_flags)
1895 {
1896         struct client_obd *cli = &exp->exp_obd->u.cli;
1897         struct loi_oap_pages *lop;
1898         struct osc_async_page *oap;
1899         int rc = 0;
1900         ENTRY;
1901
1902         oap = OAP_FROM_COOKIE(cookie);
1903
1904         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1905                 RETURN(-EIO);
1906
1907         if (loi == NULL)
1908                 loi = &lsm->lsm_oinfo[0];
1909
1910         if (oap->oap_cmd == OBD_BRW_WRITE) {
1911                 lop = &loi->loi_write_lop;
1912         } else {
1913                 lop = &loi->loi_read_lop;
1914         }
1915
1916         spin_lock(&cli->cl_loi_list_lock);
1917
1918         if (list_empty(&oap->oap_pending_item))
1919                 GOTO(out, rc = -EINVAL);
1920
1921         if ((oap->oap_async_flags & async_flags) == async_flags)
1922                 GOTO(out, rc = 0);
1923
1924         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1925                 oap->oap_async_flags |= ASYNC_READY;
1926
1927         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
1928                 if (list_empty(&oap->oap_rpc_item)) {
1929                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1930                         loi_list_maint(cli, loi);
1931                 }
1932         }
1933
1934         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
1935                         oap->oap_async_flags);
1936 out:
1937         osc_check_rpcs(cli);
1938         spin_unlock(&cli->cl_loi_list_lock);
1939         RETURN(rc);
1940 }
1941
1942 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1943                              struct lov_oinfo *loi,
1944                              struct obd_io_group *oig, void *cookie,
1945                              int cmd, obd_off off, int count,
1946                              obd_flags brw_flags,
1947                              obd_flags async_flags)
1948 {
1949         struct client_obd *cli = &exp->exp_obd->u.cli;
1950         struct osc_async_page *oap;
1951         struct loi_oap_pages *lop;
1952         ENTRY;
1953
1954         oap = OAP_FROM_COOKIE(cookie);
1955
1956         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1957                 RETURN(-EIO);
1958
1959         if (!list_empty(&oap->oap_pending_item) ||
1960             !list_empty(&oap->oap_urgent_item) ||
1961             !list_empty(&oap->oap_rpc_item))
1962                 RETURN(-EBUSY);
1963
1964         if (loi == NULL)
1965                 loi = &lsm->lsm_oinfo[0];
1966
1967         spin_lock(&cli->cl_loi_list_lock);
1968
1969         oap->oap_cmd = cmd;
1970         oap->oap_page_off = off;
1971         oap->oap_count = count;
1972         oap->oap_brw_flags = brw_flags;
1973         oap->oap_async_flags = async_flags;
1974
1975         if (cmd == OBD_BRW_WRITE)
1976                 lop = &loi->loi_write_lop;
1977         else
1978                 lop = &loi->loi_read_lop;
1979
1980         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
1981         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
1982                 oap->oap_oig = oig;
1983                 oig_add_one(oig, &oap->oap_occ);
1984         }
1985
1986         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
1987
1988         spin_unlock(&cli->cl_loi_list_lock);
1989
1990         RETURN(0);
1991 }
1992
1993 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
1994                                  struct loi_oap_pages *lop, int cmd)
1995 {
1996         struct list_head *pos, *tmp;
1997         struct osc_async_page *oap;
1998
1999         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2000                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2001                 list_del(&oap->oap_pending_item);
2002                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2003                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2004                 lop_update_pending(cli, lop, cmd, 1);
2005         }
2006         loi_list_maint(cli, loi);
2007 }
2008
2009 static int osc_trigger_group_io(struct obd_export *exp,
2010                                 struct lov_stripe_md *lsm,
2011                                 struct lov_oinfo *loi,
2012                                 struct obd_io_group *oig)
2013 {
2014         struct client_obd *cli = &exp->exp_obd->u.cli;
2015         ENTRY;
2016
2017         if (loi == NULL)
2018                 loi = &lsm->lsm_oinfo[0];
2019
2020         spin_lock(&cli->cl_loi_list_lock);
2021
2022         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2023         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2024
2025         osc_check_rpcs(cli);
2026         spin_unlock(&cli->cl_loi_list_lock);
2027
2028         RETURN(0);
2029 }
2030
2031 static int osc_teardown_async_page(struct obd_export *exp,
2032                                    struct lov_stripe_md *lsm,
2033                                    struct lov_oinfo *loi, void *cookie)
2034 {
2035         struct client_obd *cli = &exp->exp_obd->u.cli;
2036         struct loi_oap_pages *lop;
2037         struct osc_async_page *oap;
2038         int rc = 0;
2039         ENTRY;
2040
2041         oap = OAP_FROM_COOKIE(cookie);
2042
2043         if (loi == NULL)
2044                 loi = &lsm->lsm_oinfo[0];
2045
2046         if (oap->oap_cmd == OBD_BRW_WRITE) {
2047                 lop = &loi->loi_write_lop;
2048         } else {
2049                 lop = &loi->loi_read_lop;
2050         }
2051
2052         spin_lock(&cli->cl_loi_list_lock);
2053
2054         if (!list_empty(&oap->oap_rpc_item))
2055                 GOTO(out, rc = -EBUSY);
2056
2057         osc_exit_cache(cli, oap, 0);
2058         osc_wake_cache_waiters(cli);
2059
2060         if (!list_empty(&oap->oap_urgent_item)) {
2061                 list_del_init(&oap->oap_urgent_item);
2062                 oap->oap_async_flags &= ~ASYNC_URGENT;
2063         }
2064         if (!list_empty(&oap->oap_pending_item)) {
2065                 list_del_init(&oap->oap_pending_item);
2066                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2067         }
2068         loi_list_maint(cli, loi);
2069
2070         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2071 out:
2072         spin_unlock(&cli->cl_loi_list_lock);
2073         if (rc == 0)
2074                 OBD_FREE(oap, sizeof(*oap));
2075         RETURN(rc);
2076 }
2077
2078 #ifdef __KERNEL__
2079 /* Note: caller will lock/unlock, and set uptodate on the pages */
2080 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2081 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2082                            struct lov_stripe_md *lsm, obd_count page_count,
2083                            struct brw_page *pga)
2084 {
2085         struct ptlrpc_request *request = NULL;
2086         struct ost_body *body;
2087         struct niobuf_remote *nioptr;
2088         struct obd_ioobj *iooptr;
2089         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2090         int swab;
2091         ENTRY;
2092
2093         /* XXX does not handle 'new' brw protocol */
2094
2095         size[1] = sizeof(struct obd_ioobj);
2096         size[2] = page_count * sizeof(*nioptr);
2097
2098         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2099                                   OST_SAN_READ, 3, size, NULL);
2100         if (!request)
2101                 RETURN(-ENOMEM);
2102
2103         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2104         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2105         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2106                                 sizeof(*nioptr) * page_count);
2107
2108         memcpy(&body->oa, oa, sizeof(body->oa));
2109
2110         obdo_to_ioobj(oa, iooptr);
2111         iooptr->ioo_bufcnt = page_count;
2112
2113         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2114                 LASSERT(PageLocked(pga[mapped].pg));
2115                 LASSERT(mapped == 0 ||
2116                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2117
2118                 nioptr->offset = pga[mapped].disk_offset;
2119                 nioptr->len    = pga[mapped].count;
2120                 nioptr->flags  = pga[mapped].flag;
2121         }
2122
2123         size[1] = page_count * sizeof(*nioptr);
2124         request->rq_replen = lustre_msg_size(2, size);
2125
2126         rc = ptlrpc_queue_wait(request);
2127         if (rc)
2128                 GOTO(out_req, rc);
2129
2130         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2131                                   lustre_swab_ost_body);
2132         if (body == NULL) {
2133                 CERROR("Can't unpack body\n");
2134                 GOTO(out_req, rc = -EPROTO);
2135         }
2136
2137         memcpy(oa, &body->oa, sizeof(*oa));
2138
2139         swab = lustre_msg_swabbed(request->rq_repmsg);
2140         LASSERT_REPSWAB(request, 1);
2141         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2142         if (!nioptr) {
2143                 /* nioptr missing or short */
2144                 GOTO(out_req, rc = -EPROTO);
2145         }
2146
2147         /* actual read */
2148         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2149                 struct page *page = pga[mapped].pg;
2150                 struct buffer_head *bh;
2151                 kdev_t dev;
2152
2153                 if (swab)
2154                         lustre_swab_niobuf_remote (nioptr);
2155
2156                 /* got san device associated */
2157                 LASSERT(exp->exp_obd != NULL);
2158                 dev = exp->exp_obd->u.cli.cl_sandev;
2159
2160                 /* hole */
2161                 if (!nioptr->offset) {
2162                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2163                                         page->mapping->host->i_ino,
2164                                         page->index);
2165                         memset(page_address(page), 0, PAGE_SIZE);
2166                         continue;
2167                 }
2168
2169                 if (!page->buffers) {
2170                         create_empty_buffers(page, dev, PAGE_SIZE);
2171                         bh = page->buffers;
2172
2173                         clear_bit(BH_New, &bh->b_state);
2174                         set_bit(BH_Mapped, &bh->b_state);
2175                         bh->b_blocknr = (unsigned long)nioptr->offset;
2176
2177                         clear_bit(BH_Uptodate, &bh->b_state);
2178
2179                         ll_rw_block(READ, 1, &bh);
2180                 } else {
2181                         bh = page->buffers;
2182
2183                         /* if buffer already existed, it must be the
2184                          * one we mapped before, check it */
2185                         LASSERT(!test_bit(BH_New, &bh->b_state));
2186                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2187                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2188
2189                         /* wait it's io completion */
2190                         if (test_bit(BH_Lock, &bh->b_state))
2191                                 wait_on_buffer(bh);
2192
2193                         if (!test_bit(BH_Uptodate, &bh->b_state))
2194                                 ll_rw_block(READ, 1, &bh);
2195                 }
2196
2197
2198                 /* must do syncronous write here */
2199                 wait_on_buffer(bh);
2200                 if (!buffer_uptodate(bh)) {
2201                         /* I/O error */
2202                         rc = -EIO;
2203                         goto out_req;
2204                 }
2205         }
2206
2207 out_req:
2208         ptlrpc_req_finished(request);
2209         RETURN(rc);
2210 }
2211
2212 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2213                             struct lov_stripe_md *lsm, obd_count page_count,
2214                             struct brw_page *pga)
2215 {
2216         struct ptlrpc_request *request = NULL;
2217         struct ost_body *body;
2218         struct niobuf_remote *nioptr;
2219         struct obd_ioobj *iooptr;
2220         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2221         int swab;
2222         ENTRY;
2223
2224         size[1] = sizeof(struct obd_ioobj);
2225         size[2] = page_count * sizeof(*nioptr);
2226
2227         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2228                                   OST_SAN_WRITE, 3, size, NULL);
2229         if (!request)
2230                 RETURN(-ENOMEM);
2231
2232         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2233         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2234         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2235                                 sizeof (*nioptr) * page_count);
2236
2237         memcpy(&body->oa, oa, sizeof(body->oa));
2238
2239         obdo_to_ioobj(oa, iooptr);
2240         iooptr->ioo_bufcnt = page_count;
2241
2242         /* pack request */
2243         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2244                 LASSERT(PageLocked(pga[mapped].pg));
2245                 LASSERT(mapped == 0 ||
2246                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2247
2248                 nioptr->offset = pga[mapped].disk_offset;
2249                 nioptr->len    = pga[mapped].count;
2250                 nioptr->flags  = pga[mapped].flag;
2251         }
2252
2253         size[1] = page_count * sizeof(*nioptr);
2254         request->rq_replen = lustre_msg_size(2, size);
2255
2256         rc = ptlrpc_queue_wait(request);
2257         if (rc)
2258                 GOTO(out_req, rc);
2259
2260         swab = lustre_msg_swabbed (request->rq_repmsg);
2261         LASSERT_REPSWAB (request, 1);
2262         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2263         if (!nioptr) {
2264                 CERROR("absent/short niobuf array\n");
2265                 GOTO(out_req, rc = -EPROTO);
2266         }
2267
2268         /* actual write */
2269         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2270                 struct page *page = pga[mapped].pg;
2271                 struct buffer_head *bh;
2272                 kdev_t dev;
2273
2274                 if (swab)
2275                         lustre_swab_niobuf_remote (nioptr);
2276
2277                 /* got san device associated */
2278                 LASSERT(exp->exp_obd != NULL);
2279                 dev = exp->exp_obd->u.cli.cl_sandev;
2280
2281                 if (!page->buffers) {
2282                         create_empty_buffers(page, dev, PAGE_SIZE);
2283                 } else {
2284                         /* checking */
2285                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2286                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2287                         LASSERT(page->buffers->b_blocknr ==
2288                                 (unsigned long)nioptr->offset);
2289                 }
2290                 bh = page->buffers;
2291
2292                 LASSERT(bh);
2293
2294                 /* if buffer locked, wait it's io completion */
2295                 if (test_bit(BH_Lock, &bh->b_state))
2296                         wait_on_buffer(bh);
2297
2298                 clear_bit(BH_New, &bh->b_state);
2299                 set_bit(BH_Mapped, &bh->b_state);
2300
2301                 /* override the block nr */
2302                 bh->b_blocknr = (unsigned long)nioptr->offset;
2303
2304                 /* we are about to write it, so set it
2305                  * uptodate/dirty
2306                  * page lock should garentee no race condition here */
2307                 set_bit(BH_Uptodate, &bh->b_state);
2308                 set_bit(BH_Dirty, &bh->b_state);
2309
2310                 ll_rw_block(WRITE, 1, &bh);
2311
2312                 /* must do syncronous write here */
2313                 wait_on_buffer(bh);
2314                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2315                         /* I/O error */
2316                         rc = -EIO;
2317                         goto out_req;
2318                 }
2319         }
2320
2321 out_req:
2322         ptlrpc_req_finished(request);
2323         RETURN(rc);
2324 }
2325
2326 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2327                       struct lov_stripe_md *lsm, obd_count page_count,
2328                       struct brw_page *pga, struct obd_trans_info *oti)
2329 {
2330         ENTRY;
2331
2332         while (page_count) {
2333                 obd_count pages_per_brw;
2334                 int rc;
2335
2336                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2337                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2338                 else
2339                         pages_per_brw = page_count;
2340
2341                 if (cmd & OBD_BRW_WRITE)
2342                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2343                 else
2344                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2345
2346                 if (rc != 0)
2347                         RETURN(rc);
2348
2349                 page_count -= pages_per_brw;
2350                 pga += pages_per_brw;
2351         }
2352         RETURN(0);
2353 }
2354 #endif
2355 #endif
2356
2357 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
2358 {
2359         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2360
2361         if (lock == NULL) {
2362                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2363                 return;
2364         }
2365
2366         l_lock(&lock->l_resource->lr_namespace->ns_lock);
2367 #ifdef __KERNEL__
2368         if (lock->l_ast_data && lock->l_ast_data != data) {
2369                 struct inode *new_inode = data;
2370                 struct inode *old_inode = lock->l_ast_data;
2371                 if (!(old_inode->i_state & I_FREEING))
2372                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2373                 LASSERTF(old_inode->i_state & I_FREEING,
2374                          "Found existing inode %p/%lu/%u state %lu in lock: "
2375                          "setting data to %p/%lu/%u\n", old_inode,
2376                          old_inode->i_ino, old_inode->i_generation,
2377                          old_inode->i_state,
2378                          new_inode, new_inode->i_ino, new_inode->i_generation);
2379         }
2380 #endif
2381         lock->l_ast_data = data;
2382         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2383         LDLM_LOCK_PUT(lock);
2384 }
2385
2386 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2387                              ldlm_iterator_t replace, void *data)
2388 {
2389         struct ldlm_res_id res_id = { .name = {0} };
2390         struct obd_device *obd = class_exp2obd(exp);
2391
2392         res_id.name[0] = lsm->lsm_object_id;
2393         res_id.name[2] = lsm->lsm_object_gr;
2394         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2395         return 0;
2396 }
2397
2398 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2399                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2400                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2401                        void *data, __u32 lvb_len, void *lvb_swabber,
2402                        struct lustre_handle *lockh)
2403 {
2404         struct obd_device *obd = exp->exp_obd;
2405         struct ldlm_res_id res_id = { .name = {0} };
2406         struct ost_lvb lvb;
2407         struct ldlm_reply *rep;
2408         struct ptlrpc_request *req = NULL;
2409         int rc;
2410         ENTRY;
2411
2412         res_id.name[0] = lsm->lsm_object_id;
2413         res_id.name[2] = lsm->lsm_object_gr;
2414
2415         /* Filesystem lock extents are extended to page boundaries so that
2416          * dealing with the page cache is a little smoother.  */
2417         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2418         policy->l_extent.end |= ~PAGE_MASK;
2419
2420         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2421                 goto no_match;
2422
2423         /* Next, search for already existing extent locks that will cover us */
2424         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2425                              lockh);
2426         if (rc == 1) {
2427                 osc_set_data_with_check(lockh, data);
2428                 if (*flags & LDLM_FL_HAS_INTENT) {
2429                         /* I would like to be able to ASSERT here that rss <=
2430                          * kms, but I can't, for reasons which are explained in
2431                          * lov_enqueue() */
2432                 }
2433                 /* We already have a lock, and it's referenced */
2434                 RETURN(ELDLM_OK);
2435         }
2436
2437         /* If we're trying to read, we also search for an existing PW lock.  The
2438          * VFS and page cache already protect us locally, so lots of readers/
2439          * writers can share a single PW lock.
2440          *
2441          * There are problems with conversion deadlocks, so instead of
2442          * converting a read lock to a write lock, we'll just enqueue a new
2443          * one.
2444          *
2445          * At some point we should cancel the read lock instead of making them
2446          * send us a blocking callback, but there are problems with canceling
2447          * locks out from other users right now, too. */
2448
2449         if (mode == LCK_PR) {
2450                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2451                                      policy, LCK_PW, lockh);
2452                 if (rc == 1) {
2453                         /* FIXME: This is not incredibly elegant, but it might
2454                          * be more elegant than adding another parameter to
2455                          * lock_match.  I want a second opinion. */
2456                         ldlm_lock_addref(lockh, LCK_PR);
2457                         ldlm_lock_decref(lockh, LCK_PW);
2458                         osc_set_data_with_check(lockh, data);
2459                         RETURN(ELDLM_OK);
2460                 }
2461         }
2462         if (mode == LCK_PW) {
2463                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2464                                      policy, LCK_PR, lockh);
2465                 if (rc == 1) {
2466                         rc = ldlm_cli_convert(lockh, mode, flags);
2467                         if (!rc) {
2468                                 /* Update readers/writers accounting */
2469                                 ldlm_lock_addref(lockh, LCK_PW);
2470                                 ldlm_lock_decref(lockh, LCK_PR);
2471                                 osc_set_data_with_check(lockh, data);
2472                                 RETURN(ELDLM_OK);
2473                         }
2474                         /* If the conversion failed, we need to drop refcount
2475                            on matched lock before we get new one */
2476                         /* XXX Won't it save us some efforts if we cancel PR
2477                            lock here? We are going to take PW lock anyway and it
2478                            will invalidate PR lock */
2479                         ldlm_lock_decref(lockh, LCK_PR);
2480                         if (rc != EDEADLOCK) {
2481                                 RETURN(rc);
2482                         }
2483                 }
2484         }
2485
2486         if (mode == LCK_PW) {
2487                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2488                                      policy, LCK_PR, lockh);
2489                 if (rc == 1) {
2490                         rc = ldlm_cli_convert(lockh, mode, flags);
2491                         if (!rc) {
2492                                 /* Update readers/writers accounting */
2493                                 ldlm_lock_addref(lockh, LCK_PW);
2494                                 ldlm_lock_decref(lockh, LCK_PR);
2495                                 osc_set_data_with_check(lockh, data);
2496                                 RETURN(ELDLM_OK);
2497                         }
2498                         /* If the conversion failed, we need to drop refcount
2499                            on matched lock before we get new one */
2500                         /* XXX Won't it save us some efforts if we cancel PR
2501                            lock here? We are going to take PW lock anyway and it
2502                            will invalidate PR lock */
2503                         ldlm_lock_decref(lockh, LCK_PR);
2504                         if (rc != EDEADLOCK) {
2505                                 RETURN(rc);
2506                         }
2507                 }
2508         }
2509
2510  no_match:
2511         if (*flags & LDLM_FL_HAS_INTENT) {
2512                 int size[2] = {0, sizeof(struct ldlm_request)};
2513
2514                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2515                                       LDLM_ENQUEUE, 2, size, NULL);
2516                 if (req == NULL)
2517                         RETURN(-ENOMEM);
2518
2519                 size[0] = sizeof(*rep);
2520                 size[1] = sizeof(lvb);
2521                 req->rq_replen = lustre_msg_size(2, size);
2522         }
2523         rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2524                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2525                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2526         if (req != NULL) {
2527                 if (rc == ELDLM_LOCK_ABORTED) {
2528                         /* swabbed by ldlm_cli_enqueue() */
2529                         LASSERT_REPSWABBED(req, 0);
2530                         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2531                         LASSERT(rep != NULL);
2532                         if (rep->lock_policy_res1)
2533                                 rc = rep->lock_policy_res1;
2534                 }
2535                 ptlrpc_req_finished(req);
2536         }
2537
2538         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2539                 CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
2540                        lvb.lvb_size, lvb.lvb_blocks);
2541                 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2542                 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2543         }
2544
2545         RETURN(rc);
2546 }
2547
2548 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2549                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2550                      int *flags, void *data, struct lustre_handle *lockh)
2551 {
2552         struct ldlm_res_id res_id = { .name = {0} };
2553         struct obd_device *obd = exp->exp_obd;
2554         int rc;
2555         ENTRY;
2556
2557         res_id.name[0] = lsm->lsm_object_id;
2558         res_id.name[2] = lsm->lsm_object_gr;
2559
2560         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2561
2562         /* Filesystem lock extents are extended to page boundaries so that
2563          * dealing with the page cache is a little smoother */
2564         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2565         policy->l_extent.end |= ~PAGE_MASK;
2566
2567         /* Next, search for already existing extent locks that will cover us */
2568         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2569                              policy, mode, lockh);
2570         if (rc) {
2571                // if (!(*flags & LDLM_FL_TEST_LOCK))
2572                         osc_set_data_with_check(lockh, data);
2573                 RETURN(rc);
2574         }
2575         /* If we're trying to read, we also search for an existing PW lock.  The
2576          * VFS and page cache already protect us locally, so lots of readers/
2577          * writers can share a single PW lock. */
2578         if (mode == LCK_PR) {
2579                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2580                                      policy, LCK_PW, lockh);
2581                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2582                         /* FIXME: This is not incredibly elegant, but it might
2583                          * be more elegant than adding another parameter to
2584                          * lock_match.  I want a second opinion. */
2585                         osc_set_data_with_check(lockh, data);
2586                         ldlm_lock_addref(lockh, LCK_PR);
2587                         ldlm_lock_decref(lockh, LCK_PW);
2588                 }
2589         }
2590         RETURN(rc);
2591 }
2592
2593 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2594                       __u32 mode, struct lustre_handle *lockh)
2595 {
2596         ENTRY;
2597
2598         if (mode == LCK_GROUP)
2599                 ldlm_lock_decref_and_cancel(lockh, mode);
2600         else
2601                 ldlm_lock_decref(lockh, mode);
2602
2603         RETURN(0);
2604 }
2605
2606 static int osc_cancel_unused(struct obd_export *exp,
2607                              struct lov_stripe_md *lsm,
2608                              int flags, void *opaque)
2609 {
2610         struct obd_device *obd = class_exp2obd(exp);
2611         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2612
2613         if (lsm != NULL) {
2614                 res_id.name[0] = lsm->lsm_object_id;
2615                 res_id.name[2] = lsm->lsm_object_gr;
2616                 resp = &res_id;
2617         }
2618
2619         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2620 }
2621
2622 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2623                       unsigned long max_age)
2624 {
2625         struct obd_statfs *msfs;
2626         struct ptlrpc_request *request;
2627         int rc, size = sizeof(*osfs);
2628         ENTRY;
2629
2630         /* We could possibly pass max_age in the request (as an absolute
2631          * timestamp or a "seconds.usec ago") so the target can avoid doing
2632          * extra calls into the filesystem if that isn't necessary (e.g.
2633          * during mount that would help a bit).  Having relative timestamps
2634          * is not so great if request processing is slow, while absolute
2635          * timestamps are not ideal because they need time synchronization. */
2636         request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
2637                                   OST_STATFS, 0, NULL, NULL);
2638         if (!request)
2639                 RETURN(-ENOMEM);
2640
2641         request->rq_replen = lustre_msg_size(1, &size);
2642         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2643
2644         rc = ptlrpc_queue_wait(request);
2645         if (rc)
2646                 GOTO(out, rc);
2647
2648         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2649                                   lustre_swab_obd_statfs);
2650         if (msfs == NULL) {
2651                 CERROR("Can't unpack obd_statfs\n");
2652                 GOTO(out, rc = -EPROTO);
2653         }
2654
2655         memcpy(osfs, msfs, sizeof(*osfs));
2656
2657         EXIT;
2658  out:
2659         ptlrpc_req_finished(request);
2660         return rc;
2661 }
2662
2663 /* Retrieve object striping information.
2664  *
2665  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2666  * the maximum number of OST indices which will fit in the user buffer.
2667  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2668  */
2669 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2670 {
2671         struct lov_user_md lum, *lumk;
2672         int rc, lum_size;
2673         ENTRY;
2674
2675         if (!lsm)
2676                 RETURN(-ENODATA);
2677
2678         rc = copy_from_user(&lum, lump, sizeof(lum));
2679         if (rc)
2680                 RETURN(-EFAULT);
2681
2682         if (lum.lmm_magic != LOV_USER_MAGIC)
2683                 RETURN(-EINVAL);
2684
2685         if (lum.lmm_stripe_count > 0) {
2686                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2687                 OBD_ALLOC(lumk, lum_size);
2688                 if (!lumk)
2689                         RETURN(-ENOMEM);
2690
2691                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2692                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
2693         } else {
2694                 lum_size = sizeof(lum);
2695                 lumk = &lum;
2696         }
2697
2698         lumk->lmm_object_id = lsm->lsm_object_id;
2699         lumk->lmm_object_gr = lsm->lsm_object_gr;
2700         lumk->lmm_stripe_count = 1;
2701
2702         if (copy_to_user(lump, lumk, lum_size))
2703                 rc = -EFAULT;
2704
2705         if (lumk != &lum)
2706                 OBD_FREE(lumk, lum_size);
2707
2708         RETURN(rc);
2709 }
2710
2711 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2712                          void *karg, void *uarg)
2713 {
2714         struct obd_device *obd = exp->exp_obd;
2715         struct obd_ioctl_data *data = karg;
2716         int err = 0;
2717         ENTRY;
2718
2719 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2720         MOD_INC_USE_COUNT;
2721 #else
2722         if (!try_module_get(THIS_MODULE)) {
2723                 CERROR("Can't get module. Is it alive?");
2724                 return -EINVAL;
2725         }
2726 #endif
2727         switch (cmd) {
2728         case OBD_IOC_LOV_GET_CONFIG: {
2729                 char *buf;
2730                 struct lov_desc *desc;
2731                 struct obd_uuid uuid;
2732
2733                 buf = NULL;
2734                 len = 0;
2735                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2736                         GOTO(out, err = -EINVAL);
2737
2738                 data = (struct obd_ioctl_data *)buf;
2739
2740                 if (sizeof(*desc) > data->ioc_inllen1) {
2741                         OBD_FREE(buf, len);
2742                         GOTO(out, err = -EINVAL);
2743                 }
2744
2745                 if (data->ioc_inllen2 < sizeof(uuid)) {
2746                         OBD_FREE(buf, len);
2747                         GOTO(out, err = -EINVAL);
2748                 }
2749
2750                 if (data->ioc_inllen3 < sizeof(__u32)) {
2751                         OBD_FREE(buf, len);
2752                         GOTO(out, err = -EINVAL);
2753                 }
2754
2755                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2756                 desc->ld_tgt_count = 1;
2757                 desc->ld_active_tgt_count = 1;
2758                 desc->ld_default_stripe_count = 1;
2759                 desc->ld_default_stripe_size = 0;
2760                 desc->ld_default_stripe_offset = 0;
2761                 desc->ld_pattern = 0;
2762                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2763                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2764                 *((__u32 *)data->ioc_inlbuf3) = 1;
2765
2766                 err = copy_to_user((void *)uarg, buf, len);
2767                 if (err)
2768                         err = -EFAULT;
2769                 obd_ioctl_freedata(buf, len);
2770                 GOTO(out, err);
2771         }
2772         case LL_IOC_LOV_SETSTRIPE:
2773                 err = obd_alloc_memmd(exp, karg);
2774                 if (err > 0)
2775                         err = 0;
2776                 GOTO(out, err);
2777         case LL_IOC_LOV_GETSTRIPE:
2778                 err = osc_getstripe(karg, uarg);
2779                 GOTO(out, err);
2780         case OBD_IOC_CLIENT_RECOVER:
2781                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2782                                             data->ioc_inlbuf1);
2783                 if (err > 0)
2784                         err = 0;
2785                 GOTO(out, err);
2786         case IOC_OSC_SET_ACTIVE:
2787                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2788                                                data->ioc_offset);
2789                 GOTO(out, err);
2790         case IOC_OSC_CTL_RECOVERY:
2791                 err = ptlrpc_import_control_recovery(obd->u.cli.cl_import,
2792                                                      data->ioc_offset);
2793                 GOTO(out, err);
2794         default:
2795                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm);
2796                 GOTO(out, err = -ENOTTY);
2797         }
2798 out:
2799 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2800         MOD_DEC_USE_COUNT;
2801 #else
2802         module_put(THIS_MODULE);
2803 #endif
2804         return err;
2805 }
2806
2807 static int osc_get_info(struct obd_export *exp, __u32 keylen,
2808                         void *key, __u32 *vallen, void *val)
2809 {
2810         ENTRY;
2811         if (!vallen || !val)
2812                 RETURN(-EFAULT);
2813
2814         if (keylen > strlen("lock_to_stripe") &&
2815             strcmp(key, "lock_to_stripe") == 0) {
2816                 __u32 *stripe = val;
2817                 *vallen = sizeof(*stripe);
2818                 *stripe = 0;
2819                 RETURN(0);
2820         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2821                 struct ptlrpc_request *req;
2822                 obd_id *reply;
2823                 char *bufs[1] = {key};
2824                 int rc;
2825                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2826                                       OST_GET_INFO, 1, (int *)&keylen, bufs);
2827                 if (req == NULL)
2828                         RETURN(-ENOMEM);
2829
2830                 req->rq_replen = lustre_msg_size(1, (int *)vallen);
2831                 rc = ptlrpc_queue_wait(req);
2832                 if (rc)
2833                         GOTO(out, rc);
2834
2835                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
2836                                            lustre_swab_ost_last_id);
2837                 if (reply == NULL) {
2838                         CERROR("Can't unpack OST last ID\n");
2839                         GOTO(out, rc = -EPROTO);
2840                 }
2841                 *((obd_id *)val) = *reply;
2842         out:
2843                 ptlrpc_req_finished(req);
2844                 RETURN(rc);
2845         }
2846         RETURN(-EPROTO);
2847 }
2848
2849 static int osc_set_info(struct obd_export *exp, obd_count keylen,
2850                         void *key, obd_count vallen, void *val)
2851 {
2852         struct obd_device  *obd = exp->exp_obd;
2853         struct obd_import *imp = class_exp2cliimp(exp);
2854         struct llog_ctxt *ctxt;
2855         int rc = 0;
2856         ENTRY;
2857
2858         if (keylen == strlen("next_id") &&
2859             memcmp(key, "next_id", strlen("next_id")) == 0) {
2860                 if (vallen != sizeof(obd_id))
2861                         RETURN(-EINVAL);
2862                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
2863                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
2864                        exp->exp_obd->obd_name,
2865                        obd->u.cli.cl_oscc.oscc_next_id);
2866
2867                 RETURN(0);
2868         }
2869
2870         if (keylen == strlen("growth_count") &&
2871             memcmp(key, "growth_count", strlen("growth_count")) == 0) {
2872                 if (vallen != sizeof(int))
2873                         RETURN(-EINVAL);
2874                 obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val);
2875                 RETURN(0);
2876         }
2877
2878         if (keylen == strlen("unlinked") &&
2879             memcmp(key, "unlinked", keylen) == 0) {
2880                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2881                 spin_lock(&oscc->oscc_lock);
2882                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
2883                 spin_unlock(&oscc->oscc_lock);
2884                 RETURN(0);
2885         }
2886         if (keylen == strlen("unrecovery") &&
2887             memcmp(key, "unrecovery", keylen) == 0) {
2888                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2889                 spin_lock(&oscc->oscc_lock);
2890                 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
2891                 spin_unlock(&oscc->oscc_lock);
2892                 RETURN(0);
2893         }
2894         if (keylen == strlen("initial_recov") &&
2895             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
2896                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2897                 if (vallen != sizeof(int))
2898                         RETURN(-EINVAL);
2899                 imp->imp_initial_recov = *(int *)val;
2900                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
2901                        exp->exp_obd->obd_name,
2902                        imp->imp_initial_recov);
2903                 RETURN(0);
2904         }
2905
2906         if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
2907                 struct client_obd *cl = &obd->u.cli;
2908                 if (vallen != sizeof(int))
2909                         RETURN(-EINVAL);
2910                 cl->cl_async = *(int *)val;
2911                 CDEBUG(D_HA, "%s: set async = %d\n",
2912                        obd->obd_name, cl->cl_async);
2913                 RETURN(0);
2914         }
2915
2916         if (keylen == strlen("sec") && memcmp(key, "sec", keylen) == 0) {
2917                 struct client_obd *cli = &exp->exp_obd->u.cli;
2918
2919                 if (vallen == strlen("null") &&
2920                     memcmp(val, "null", vallen) == 0) {
2921                         cli->cl_sec_flavor = PTLRPC_SEC_NULL;
2922                         cli->cl_sec_subflavor = 0;
2923                         RETURN(0);
2924                 }
2925                 if (vallen == strlen("krb5i") &&
2926                     memcmp(val, "krb5i", vallen) == 0) {
2927                         cli->cl_sec_flavor = PTLRPC_SEC_GSS;
2928                         cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5I;
2929                         RETURN(0);
2930                 }
2931                 if (vallen == strlen("krb5p") &&
2932                     memcmp(val, "krb5p", vallen) == 0) {
2933                         cli->cl_sec_flavor = PTLRPC_SEC_GSS;
2934                         cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5P;
2935                         RETURN(0);
2936                 }
2937                 CERROR("unrecognized security type %s\n", (char*) val);
2938                 RETURN(-EINVAL);
2939         }
2940
2941         if (keylen < strlen("mds_conn") || memcmp(key, "mds_conn", keylen) != 0)
2942                 RETURN(-EINVAL);
2943
2944         ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
2945         if (ctxt) {
2946                 if (rc == 0)
2947                         rc = llog_initiator_connect(ctxt);
2948                 else
2949                         CERROR("cannot establish the connect for ctxt %p: %d\n",
2950                                ctxt, rc);
2951         }
2952
2953         imp->imp_server_timeout = 1;
2954         CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
2955         imp->imp_pingable = 1;
2956
2957         RETURN(rc);
2958 }
2959
2960
2961 static struct llog_operations osc_size_repl_logops = {
2962         lop_cancel: llog_obd_repl_cancel
2963 };
2964
2965 static struct llog_operations osc_unlink_orig_logops;
2966 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
2967                          struct obd_device *tgt, int count,
2968                          struct llog_catid *catid)
2969 {
2970         int rc;
2971         ENTRY;
2972
2973         osc_unlink_orig_logops = llog_lvfs_ops;
2974         osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup;
2975         osc_unlink_orig_logops.lop_cleanup = llog_catalog_cleanup;
2976         osc_unlink_orig_logops.lop_add = llog_catalog_add;
2977         osc_unlink_orig_logops.lop_connect = llog_origin_connect;
2978
2979         rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_ORIG_CTXT, tgt, count,
2980                             &catid->lci_logid, &osc_unlink_orig_logops);
2981         if (rc)
2982                 RETURN(rc);
2983
2984         rc = obd_llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
2985                             &osc_size_repl_logops);
2986         RETURN(rc);
2987 }
2988
2989 static int osc_llog_finish(struct obd_device *obd,
2990                            struct obd_llogs *llogs, int count)
2991 {
2992         int rc;
2993         ENTRY;
2994
2995         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT));
2996         if (rc)
2997                 RETURN(rc);
2998
2999         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_REPL_CTXT));
3000         RETURN(rc);
3001 }
3002
3003
3004 static int osc_connect(struct lustre_handle *exph,
3005                        struct obd_device *obd, struct obd_uuid *cluuid,
3006                        struct obd_connect_data *data,
3007                        unsigned long connect_flags)
3008 {
3009         int rc;
3010         ENTRY;
3011         rc = client_connect_import(exph, obd, cluuid, data, connect_flags);
3012         RETURN(rc);
3013 }
3014
3015 static int osc_disconnect(struct obd_export *exp, unsigned long flags)
3016 {
3017         struct obd_device *obd = class_exp2obd(exp);
3018         struct llog_ctxt *ctxt;
3019         int rc;
3020         ENTRY;
3021
3022         ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT);
3023         if (obd->u.cli.cl_conn_count == 1)
3024                 /* flush any remaining cancel messages out to the target */
3025                 llog_sync(ctxt, exp);
3026
3027         rc = client_disconnect_export(exp, flags);
3028         RETURN(rc);
3029 }
3030
3031 static int osc_import_event(struct obd_device *obd,
3032                             struct obd_import *imp, 
3033                             enum obd_import_event event)
3034 {
3035         struct client_obd *cli;
3036         int rc = 0;
3037
3038         LASSERT(imp->imp_obd == obd);
3039
3040         switch (event) {
3041         case IMP_EVENT_DISCON: {
3042                 /* Only do this on the MDS OSC's */
3043                 if (imp->imp_server_timeout) {
3044                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3045                         
3046                         spin_lock(&oscc->oscc_lock);
3047                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3048                         spin_unlock(&oscc->oscc_lock);
3049                 }
3050                 break;
3051         }
3052         case IMP_EVENT_INACTIVE: {
3053                 if (obd->obd_observer)
3054                         rc = obd_notify(obd->obd_observer, obd, 0, 0);
3055                 break;
3056         }
3057         case IMP_EVENT_INVALIDATE: {
3058                 struct ldlm_namespace *ns = obd->obd_namespace;
3059
3060                 /* Reset grants */
3061                 cli = &obd->u.cli;
3062                 spin_lock(&cli->cl_loi_list_lock);
3063                 cli->cl_avail_grant = 0;
3064                 cli->cl_lost_grant = 0;
3065                 /* all pages go to failing rpcs due to the invalid import */
3066                 osc_check_rpcs(cli);
3067                 spin_unlock(&cli->cl_loi_list_lock);
3068                 
3069                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3070
3071                 break;
3072         }
3073         case IMP_EVENT_ACTIVE: {
3074                 /* Only do this on the MDS OSC's */
3075                 if (imp->imp_server_timeout) {
3076                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3077
3078                         spin_lock(&oscc->oscc_lock);
3079                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3080                         spin_unlock(&oscc->oscc_lock);
3081                 }
3082
3083                 if (obd->obd_observer)
3084                         rc = obd_notify(obd->obd_observer, obd, 1, 0);
3085                 break;
3086         }
3087         default:
3088                 CERROR("Unknown import event %d\n", event);
3089                 LBUG();
3090         }
3091         RETURN(rc);
3092 }
3093
3094 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
3095 {
3096         struct lprocfs_static_vars lvars;
3097         int rc;
3098         ENTRY;
3099
3100         lprocfs_init_vars(osc,&lvars);
3101         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
3102         if (rc < 0)
3103                 RETURN(rc);
3104
3105         rc = lproc_osc_attach_seqstat(dev);
3106         if (rc < 0) {
3107                 lprocfs_obd_detach(dev);
3108                 RETURN(rc);
3109         }
3110
3111         ptlrpc_lprocfs_register_obd(dev);
3112         RETURN(0);
3113 }
3114
3115 static int osc_detach(struct obd_device *dev)
3116 {
3117         ptlrpc_lprocfs_unregister_obd(dev);
3118         return lprocfs_obd_detach(dev);
3119 }
3120
3121 static int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3122 {
3123         int rc;
3124         ENTRY;
3125         rc = ptlrpcd_addref();
3126         if (rc)
3127                 RETURN(rc);
3128
3129         rc = client_obd_setup(obd, len, buf);
3130         if (rc)
3131                 ptlrpcd_decref();
3132         else
3133                 oscc_init(obd);
3134
3135         RETURN(rc);
3136 }
3137
3138 static int osc_cleanup(struct obd_device *obd, int flags)
3139 {
3140         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3141         int rc;
3142
3143         rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
3144                                     LDLM_FL_CONFIG_CHANGE, NULL);
3145         if (rc)
3146                 RETURN(rc);
3147
3148         spin_lock(&oscc->oscc_lock);
3149         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3150         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3151         spin_unlock(&oscc->oscc_lock);
3152
3153         rc = client_obd_cleanup(obd, flags);
3154         ptlrpcd_decref();
3155         RETURN(rc);
3156 }
3157
3158 struct obd_ops osc_obd_ops = {
3159         .o_owner                = THIS_MODULE,
3160         .o_attach               = osc_attach,
3161         .o_detach               = osc_detach,
3162         .o_setup                = osc_setup,
3163         .o_cleanup              = osc_cleanup,
3164         .o_add_conn             = client_import_add_conn,
3165         .o_del_conn             = client_import_del_conn,
3166         .o_connect              = osc_connect,
3167         .o_disconnect           = osc_disconnect,
3168         .o_statfs               = osc_statfs,
3169         .o_packmd               = osc_packmd,
3170         .o_unpackmd             = osc_unpackmd,
3171         .o_create               = osc_create,
3172         .o_destroy              = osc_destroy,
3173         .o_getattr              = osc_getattr,
3174         .o_getattr_async        = osc_getattr_async,
3175         .o_setattr              = osc_setattr,
3176         .o_brw                  = osc_brw,
3177         .o_brw_async            = osc_brw_async,
3178         .o_prep_async_page      = osc_prep_async_page,
3179         .o_queue_async_io       = osc_queue_async_io,
3180         .o_set_async_flags      = osc_set_async_flags,
3181         .o_queue_group_io       = osc_queue_group_io,
3182         .o_trigger_group_io     = osc_trigger_group_io,
3183         .o_teardown_async_page  = osc_teardown_async_page,
3184         .o_punch                = osc_punch,
3185         .o_sync                 = osc_sync,
3186         .o_enqueue              = osc_enqueue,
3187         .o_match                = osc_match,
3188         .o_change_cbdata        = osc_change_cbdata,
3189         .o_cancel               = osc_cancel,
3190         .o_cancel_unused        = osc_cancel_unused,
3191         .o_iocontrol            = osc_iocontrol,
3192         .o_get_info             = osc_get_info,
3193         .o_set_info             = osc_set_info,
3194         .o_import_event         = osc_import_event,
3195         .o_llog_init            = osc_llog_init,
3196         .o_llog_finish          = osc_llog_finish,
3197 };
3198
3199 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3200 struct obd_ops sanosc_obd_ops = {
3201         .o_owner                = THIS_MODULE,
3202         .o_attach               = osc_attach,
3203         .o_detach               = osc_detach,
3204         .o_cleanup              = client_obd_cleanup,
3205         .o_add_conn             = client_import_add_conn,
3206         .o_del_conn             = client_import_del_conn,
3207         .o_connect              = osc_connect,
3208         .o_disconnect           = client_disconnect_export,
3209         .o_statfs               = osc_statfs,
3210         .o_packmd               = osc_packmd,
3211         .o_unpackmd             = osc_unpackmd,
3212         .o_create               = osc_real_create,
3213         .o_destroy              = osc_destroy,
3214         .o_getattr              = osc_getattr,
3215         .o_getattr_async        = osc_getattr_async,
3216         .o_setattr              = osc_setattr,
3217         .o_setup                = client_sanobd_setup,
3218         .o_brw                  = sanosc_brw,
3219         .o_punch                = osc_punch,
3220         .o_sync                 = osc_sync,
3221         .o_enqueue              = osc_enqueue,
3222         .o_match                = osc_match,
3223         .o_change_cbdata        = osc_change_cbdata,
3224         .o_cancel               = osc_cancel,
3225         .o_cancel_unused        = osc_cancel_unused,
3226         .o_iocontrol            = osc_iocontrol,
3227         .o_import_event         = osc_import_event,
3228         .o_llog_init            = osc_llog_init,
3229         .o_llog_finish          = osc_llog_finish,
3230 };
3231 #endif
3232
3233 int __init osc_init(void)
3234 {
3235         struct lprocfs_static_vars lvars;
3236 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3237         struct lprocfs_static_vars sanlvars;
3238 #endif
3239         int rc;
3240         ENTRY;
3241
3242         lprocfs_init_vars(osc, &lvars);
3243 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3244         lprocfs_init_vars(osc, &sanlvars);
3245 #endif
3246
3247         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3248                                  LUSTRE_OSC_NAME);
3249         if (rc)
3250                 RETURN(rc);
3251
3252 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3253         rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars,
3254                                  LUSTRE_SANOSC_NAME);
3255         if (rc)
3256                 class_unregister_type(LUSTRE_OSC_NAME);
3257 #endif
3258
3259         RETURN(rc);
3260 }
3261
3262 #ifdef __KERNEL__
3263 static void /*__exit*/ osc_exit(void)
3264 {
3265 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3266         class_unregister_type(LUSTRE_SANOSC_NAME);
3267 #endif
3268         class_unregister_type(LUSTRE_OSC_NAME);
3269 }
3270
3271 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3272 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3273 MODULE_LICENSE("GPL");
3274
3275 module_init(osc_init);
3276 module_exit(osc_exit);
3277 #endif