Whamcloud - gitweb
b=6326
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #ifdef __KERNEL__
35 # include <linux/version.h>
36 # include <linux/module.h>
37 # include <linux/mm.h>
38 # include <linux/highmem.h>
39 # include <linux/ctype.h>
40 # include <linux/init.h>
41 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 #  include <linux/workqueue.h>
43 #  include <linux/smp_lock.h>
44 # else
45 #  include <linux/locks.h>
46 # endif
47 #else /* __KERNEL__ */
48 # include <liblustre.h>
49 #endif
50
51 #include <linux/lustre_dlm.h>
52 #include <libcfs/kp30.h>
53 #include <linux/lustre_net.h>
54 #include <linux/lustre_sec.h>
55 #include <lustre/lustre_user.h>
56 #include <linux/obd_ost.h>
57 #include <linux/obd_lov.h>
58
59 #ifdef  __CYGWIN__
60 # include <ctype.h>
61 #endif
62
63 #include <linux/lustre_ha.h>
64 #include <linux/lprocfs_status.h>
65 #include <linux/lustre_log.h>
66 #include "osc_internal.h"
67
68 /* Pack OSC object metadata for disk storage (LE byte order). */
69 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
70                       struct lov_stripe_md *lsm)
71 {
72         int lmm_size;
73         ENTRY;
74
75         lmm_size = sizeof(**lmmp);
76         if (!lmmp)
77                 RETURN(lmm_size);
78
79         if (*lmmp && !lsm) {
80                 OBD_FREE(*lmmp, lmm_size);
81                 *lmmp = NULL;
82                 RETURN(0);
83         }
84
85         if (!*lmmp) {
86                 OBD_ALLOC(*lmmp, lmm_size);
87                 if (!*lmmp)
88                         RETURN(-ENOMEM);
89         }
90
91         if (lsm) {
92                 LASSERT(lsm->lsm_object_id);
93                 LASSERT(lsm->lsm_object_gr);
94                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
95                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
96         }
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         ENTRY;
107
108         if (lmm != NULL) {
109                 if (lmm_bytes < sizeof (*lmm)) {
110                         CERROR("lov_mds_md too small: %d, need %d\n",
111                                lmm_bytes, (int)sizeof(*lmm));
112                         RETURN(-EINVAL);
113                 }
114                 /* XXX LOV_MAGIC etc check? */
115
116                 if (lmm->lmm_object_id == 0) {
117                         CERROR("lov_mds_md: zero lmm_object_id\n");
118                         RETURN(-EINVAL);
119                 }
120         }
121
122         lsm_size = lov_stripe_md_size(1);
123         if (lsmp == NULL)
124                 RETURN(lsm_size);
125
126         if (*lsmp != NULL && lmm == NULL) {
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 loi_init((*lsmp)->lsm_oinfo);
137         }
138
139         if (lmm != NULL) {
140                 /* XXX zero *lsmp? */
141                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
142                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
143                 LASSERT((*lsmp)->lsm_object_id);
144                 LASSERT((*lsmp)->lsm_object_gr);
145         }
146
147         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
148
149         RETURN(lsm_size);
150 }
151
152 static int osc_getattr_interpret(struct ptlrpc_request *req,
153                                  struct osc_getattr_async_args *aa, int rc)
154 {
155         struct ost_body *body;
156         ENTRY;
157
158         if (rc != 0)
159                 RETURN(rc);
160
161         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
162         if (body) {
163                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
164                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
165
166                 /* This should really be sent by the OST */
167                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
168                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
169         } else {
170                 CERROR("can't unpack ost_body\n");
171                 rc = -EPROTO;
172                 aa->aa_oa->o_valid = 0;
173         }
174
175         RETURN(rc);
176 }
177
178 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
179                              struct lov_stripe_md *md,
180                              struct ptlrpc_request_set *set)
181 {
182         struct ptlrpc_request *request;
183         struct ost_body *body;
184         int size = sizeof(*body);
185         struct osc_getattr_async_args *aa;
186         ENTRY;
187
188         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
189                                   OST_GETATTR, 1, &size, NULL);
190         if (!request)
191                 RETURN(-ENOMEM);
192
193         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
194         memcpy(&body->oa, oa, sizeof(*oa));
195
196         request->rq_replen = lustre_msg_size(1, &size);
197         request->rq_interpret_reply = osc_getattr_interpret;
198
199         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
200         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
201         aa->aa_oa = oa;
202
203         ptlrpc_set_add_req (set, request);
204         RETURN (0);
205 }
206
207 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
208                        struct lov_stripe_md *md)
209 {
210         struct ptlrpc_request *request;
211         struct ost_body *body;
212         int rc, size = sizeof(*body);
213         ENTRY;
214
215         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
216                                   OST_GETATTR, 1, &size, NULL);
217         if (!request)
218                 RETURN(-ENOMEM);
219
220         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
221         memcpy(&body->oa, oa, sizeof(*oa));
222
223         request->rq_replen = lustre_msg_size(1, &size);
224
225         rc = ptlrpc_queue_wait(request);
226         if (rc) {
227                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
228                 GOTO(out, rc);
229         }
230
231         body = lustre_swab_repbuf(request, 0, sizeof (*body),
232                                   lustre_swab_ost_body);
233         if (body == NULL) {
234                 CERROR ("can't unpack ost_body\n");
235                 GOTO (out, rc = -EPROTO);
236         }
237
238         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
239         memcpy(oa, &body->oa, sizeof(*oa));
240
241         /* This should really be sent by the OST */
242         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
243         oa->o_valid |= OBD_MD_FLBLKSZ;
244
245         EXIT;
246  out:
247         ptlrpc_req_finished(request);
248         return rc;
249 }
250
251 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
252                        struct lov_stripe_md *md, struct obd_trans_info *oti)
253 {
254         struct ptlrpc_request *request;
255         struct ost_body *body;
256         int rc, size = sizeof(*body);
257         ENTRY;
258
259         LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0);
260
261         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
262                                   OST_SETATTR, 1, &size, NULL);
263         if (!request)
264                 RETURN(-ENOMEM);
265
266         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
267         memcpy(&body->oa, oa, sizeof(*oa));
268
269         request->rq_replen = lustre_msg_size(1, &size);
270
271         rc = ptlrpc_queue_wait(request);
272         if (rc)
273                 GOTO(out, rc);
274
275         body = lustre_swab_repbuf(request, 0, sizeof(*body),
276                                   lustre_swab_ost_body);
277         if (body == NULL)
278                 GOTO(out, rc = -EPROTO);
279
280         memcpy(oa, &body->oa, sizeof(*oa));
281
282         EXIT;
283 out:
284         ptlrpc_req_finished(request);
285         RETURN(0);
286 }
287
288 int osc_real_create(struct obd_export *exp, struct obdo *oa,
289                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
290 {
291         struct ptlrpc_request *request;
292         struct ost_body *body;
293         struct lov_stripe_md *lsm;
294         int rc, size = sizeof(*body);
295         ENTRY;
296
297         LASSERT(oa);
298         LASSERT(ea);
299
300         lsm = *ea;
301         if (!lsm) {
302                 rc = obd_alloc_memmd(exp, &lsm);
303                 if (rc < 0)
304                         RETURN(rc);
305         }
306
307         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
308                                   OST_CREATE, 1, &size, NULL);
309         if (!request)
310                 GOTO(out, rc = -ENOMEM);
311
312         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
313         memcpy(&body->oa, oa, sizeof(body->oa));
314
315         request->rq_replen = lustre_msg_size(1, &size);
316         if (oa->o_valid & OBD_MD_FLINLINE) {
317                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
318                         oa->o_flags == OBD_FL_DELORPHAN);
319                 DEBUG_REQ(D_HA, request,
320                           "delorphan from OST integration");
321                 /* Don't resend the delorphan request */
322                 request->rq_no_resend = request->rq_no_delay = 1;
323         }
324
325         rc = ptlrpc_queue_wait(request);
326         if (rc)
327                 GOTO(out_req, rc);
328
329         body = lustre_swab_repbuf(request, 0, sizeof(*body),
330                                   lustre_swab_ost_body);
331         if (body == NULL) {
332                 CERROR ("can't unpack ost_body\n");
333                 GOTO (out_req, rc = -EPROTO);
334         }
335
336         memcpy(oa, &body->oa, sizeof(*oa));
337
338         /* This should really be sent by the OST */
339         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
340         oa->o_valid |= OBD_MD_FLBLKSZ;
341
342         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
343          * have valid lsm_oinfo data structs, so don't go touching that.
344          * This needs to be fixed in a big way.
345          */
346         lsm->lsm_object_id = oa->o_id;
347         lsm->lsm_object_gr = oa->o_gr;
348         *ea = lsm;
349
350         if (oti != NULL) {
351                 oti->oti_transno = request->rq_repmsg->transno;
352
353                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
354                         if (!oti->oti_logcookies)
355                                 oti_alloc_cookies(oti, 1);
356                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
357                                sizeof(oti->oti_onecookie));
358                 }
359         }
360
361         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
362         EXIT;
363 out_req:
364         ptlrpc_req_finished(request);
365 out:
366         if (rc && !*ea)
367                 obd_free_memmd(exp, &lsm);
368         return rc;
369 }
370
371 static int osc_punch(struct obd_export *exp, struct obdo *oa,
372                      struct lov_stripe_md *md, obd_size start,
373                      obd_size end, struct obd_trans_info *oti)
374 {
375         struct ptlrpc_request *request;
376         struct ost_body *body;
377         int rc, size = sizeof(*body);
378         ENTRY;
379
380         if (!oa) {
381                 CERROR("oa NULL\n");
382                 RETURN(-EINVAL);
383         }
384
385         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
386                                   OST_PUNCH, 1, &size, NULL);
387         if (!request)
388                 RETURN(-ENOMEM);
389
390         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
391         memcpy(&body->oa, oa, sizeof(*oa));
392
393         /* overload the size and blocks fields in the oa with start/end */
394         body->oa.o_size = start;
395         body->oa.o_blocks = end;
396         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
397
398         request->rq_replen = lustre_msg_size(1, &size);
399
400         rc = ptlrpc_queue_wait(request);
401         if (rc)
402                 GOTO(out, rc);
403
404         body = lustre_swab_repbuf (request, 0, sizeof (*body),
405                                    lustre_swab_ost_body);
406         if (body == NULL) {
407                 CERROR ("can't unpack ost_body\n");
408                 GOTO (out, rc = -EPROTO);
409         }
410
411         memcpy(oa, &body->oa, sizeof(*oa));
412
413         EXIT;
414  out:
415         ptlrpc_req_finished(request);
416         return rc;
417 }
418
419 static int osc_sync(struct obd_export *exp, struct obdo *oa,
420                     struct lov_stripe_md *md, obd_size start, obd_size end)
421 {
422         struct ptlrpc_request *request;
423         struct ost_body *body;
424         int rc, size = sizeof(*body);
425         ENTRY;
426
427         if (!oa) {
428                 CERROR("oa NULL\n");
429                 RETURN(-EINVAL);
430         }
431
432         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
433                                   OST_SYNC, 1, &size, NULL);
434         if (!request)
435                 RETURN(-ENOMEM);
436
437         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
438         memcpy(&body->oa, oa, sizeof(*oa));
439
440         /* overload the size and blocks fields in the oa with start/end */
441         body->oa.o_size = start;
442         body->oa.o_blocks = end;
443         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
444
445         request->rq_replen = lustre_msg_size(1, &size);
446
447         rc = ptlrpc_queue_wait(request);
448         if (rc)
449                 GOTO(out, rc);
450
451         body = lustre_swab_repbuf(request, 0, sizeof(*body),
452                                   lustre_swab_ost_body);
453         if (body == NULL) {
454                 CERROR ("can't unpack ost_body\n");
455                 GOTO (out, rc = -EPROTO);
456         }
457
458         memcpy(oa, &body->oa, sizeof(*oa));
459
460         EXIT;
461  out:
462         ptlrpc_req_finished(request);
463         return rc;
464 }
465
466 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
467                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
468 {
469         struct ptlrpc_request *request;
470         struct ost_body *body;
471         int rc, size = sizeof(*body);
472         ENTRY;
473
474         if (!oa) {
475                 CERROR("oa NULL\n");
476                 RETURN(-EINVAL);
477         }
478
479         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
480                                   OST_DESTROY, 1, &size, NULL);
481         if (!request)
482                 RETURN(-ENOMEM);
483
484         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
485
486         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
487                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
488                        sizeof(*oti->oti_logcookies));
489                 oti->oti_logcookies++;
490         }
491
492         memcpy(&body->oa, oa, sizeof(*oa));
493         request->rq_replen = lustre_msg_size(1, &size);
494
495         rc = ptlrpc_queue_wait(request);
496         
497         if (rc == -ENOENT)
498                 rc = 0;
499         if (rc)
500                 GOTO(out, rc);
501
502         body = lustre_swab_repbuf(request, 0, sizeof(*body),
503                                   lustre_swab_ost_body);
504         if (body == NULL) {
505                 CERROR ("Can't unpack body\n");
506                 GOTO (out, rc = -EPROTO);
507         }
508
509         memcpy(oa, &body->oa, sizeof(*oa));
510
511         EXIT;
512  out:
513         ptlrpc_req_finished(request);
514         return rc;
515 }
516
517 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
518                                 long writing_bytes)
519 {
520         obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
521
522         LASSERT(!(oa->o_valid & bits));
523
524         oa->o_valid |= bits;
525         spin_lock(&cli->cl_loi_list_lock);
526         oa->o_dirty = cli->cl_dirty;
527         oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
528         oa->o_grant = cli->cl_avail_grant;
529         oa->o_dropped = cli->cl_lost_grant;
530         cli->cl_lost_grant = 0;
531         spin_unlock(&cli->cl_loi_list_lock);
532         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
533                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
534 }
535
536 /* caller must hold loi_list_lock */
537 static void osc_consume_write_grant(struct client_obd *cli,
538                                     struct osc_async_page *oap)
539 {
540         cli->cl_dirty += PAGE_SIZE;
541         cli->cl_avail_grant -= PAGE_SIZE;
542         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
543         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
544         LASSERT(cli->cl_avail_grant >= 0);
545 }
546
547 static unsigned long rpcs_in_flight(struct client_obd *cli)
548 {
549         return cli->cl_r_in_flight + cli->cl_w_in_flight;
550 }
551
552 /* caller must hold loi_list_lock */
553 void osc_wake_cache_waiters(struct client_obd *cli)
554 {
555         struct list_head *l, *tmp;
556         struct osc_cache_waiter *ocw;
557
558         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
559                 /* if we can't dirty more, we must wait until some is written */
560                 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
561                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
562                                cli->cl_dirty, cli->cl_dirty_max);
563                         return;
564                 }
565
566                 /* if still dirty cache but no grant wait for pending RPCs that
567                  * may yet return us some grant before doing sync writes */
568                 if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
569                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
570                                cli->cl_w_in_flight);
571                 }
572                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
573                 list_del_init(&ocw->ocw_entry);
574                 if (cli->cl_avail_grant < PAGE_SIZE) {
575                         /* no more RPCs in flight to return grant, do sync IO */
576                         ocw->ocw_rc = -EDQUOT;
577                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
578                 } else {
579                         osc_consume_write_grant(cli, ocw->ocw_oap);
580                 }
581
582                 wake_up(&ocw->ocw_waitq);
583         }
584
585         EXIT;
586 }
587
588 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
589 {
590         spin_lock(&cli->cl_loi_list_lock);
591         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
592         cli->cl_avail_grant += body->oa.o_grant;
593         /* waiters are woken in brw_interpret_oap */
594         spin_unlock(&cli->cl_loi_list_lock);
595 }
596
597 /* We assume that the reason this OSC got a short read is because it read
598  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
599  * via the LOV, and it _knows_ it's reading inside the file, it's just that
600  * this stripe never got written at or beyond this stripe offset yet. */
601 static void handle_short_read(int nob_read, obd_count page_count,
602                               struct brw_page *pga)
603 {
604         char *ptr;
605
606         /* skip bytes read OK */
607         while (nob_read > 0) {
608                 LASSERT (page_count > 0);
609
610                 if (pga->count > nob_read) {
611                         /* EOF inside this page */
612                         ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
613                         memset(ptr + nob_read, 0, pga->count - nob_read);
614                         kunmap(pga->pg);
615                         page_count--;
616                         pga++;
617                         break;
618                 }
619
620                 nob_read -= pga->count;
621                 page_count--;
622                 pga++;
623         }
624
625         /* zero remaining pages */
626         while (page_count-- > 0) {
627                 ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
628                 memset(ptr, 0, pga->count);
629                 kunmap(pga->pg);
630                 pga++;
631         }
632 }
633
634 static int check_write_rcs(struct ptlrpc_request *request,
635                            int requested_nob, int niocount,
636                            obd_count page_count, struct brw_page *pga)
637 {
638         int    *remote_rcs, i;
639
640         /* return error if any niobuf was in error */
641         remote_rcs = lustre_swab_repbuf(request, 1,
642                                         sizeof(*remote_rcs) * niocount, NULL);
643         if (remote_rcs == NULL) {
644                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
645                 return(-EPROTO);
646         }
647         if (lustre_msg_swabbed(request->rq_repmsg))
648                 for (i = 0; i < niocount; i++)
649                         __swab32s(&remote_rcs[i]);
650
651         for (i = 0; i < niocount; i++) {
652                 if (remote_rcs[i] < 0)
653                         return(remote_rcs[i]);
654
655                 if (remote_rcs[i] != 0) {
656                         CERROR("rc[%d] invalid (%d) req %p\n",
657                                 i, remote_rcs[i], request);
658                         return(-EPROTO);
659                 }
660         }
661
662         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
663                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
664                        requested_nob, request->rq_bulk->bd_nob_transferred);
665                 return(-EPROTO);
666         }
667
668         return (0);
669 }
670
671 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
672 {
673         if (p1->flag != p2->flag) {
674                 unsigned mask = ~OBD_BRW_FROM_GRANT;
675
676                 /* warn if we try to combine flags that we don't know to be
677                  * safe to combine */
678                 if ((p1->flag & mask) != (p2->flag & mask))
679                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
680                                "same brw?\n", p1->flag, p2->flag);
681                 return 0;
682         }
683
684         return (p1->disk_offset + p1->count == p2->disk_offset);
685 }
686
687 #if CHECKSUM_BULK
688 static obd_count cksum_pages(int nob, obd_count page_count,
689                              struct brw_page *pga)
690 {
691         obd_count cksum = 0;
692         char *ptr;
693
694         while (nob > 0) {
695                 LASSERT (page_count > 0);
696
697                 ptr = kmap(pga->pg);
698                 ost_checksum(&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
699                              pga->count > nob ? nob : pga->count);
700                 kunmap(pga->pg);
701
702                 nob -= pga->count;
703                 page_count--;
704                 pga++;
705         }
706
707         return (cksum);
708 }
709 #endif
710
711 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
712                                 struct lov_stripe_md *lsm, obd_count page_count,
713                                 struct brw_page *pga, int *requested_nobp,
714                                 int *niocountp, struct ptlrpc_request **reqp)
715 {
716         struct ptlrpc_request   *req;
717         struct ptlrpc_bulk_desc *desc;
718         struct client_obd       *cli = &imp->imp_obd->u.cli;
719         struct ost_body         *body;
720         struct obd_ioobj        *ioobj;
721         struct niobuf_remote    *niobuf;
722         int                      niocount;
723         int                      size[3];
724         int                      i;
725         int                      requested_nob;
726         int                      opc;
727         int                      rc;
728
729         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
730
731         for (niocount = i = 1; i < page_count; i++)
732                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
733                         niocount++;
734
735         size[0] = sizeof(*body);
736         size[1] = sizeof(*ioobj);
737         size[2] = niocount * sizeof(*niobuf);
738
739         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL);
740         if (req == NULL)
741                 return (-ENOMEM);
742
743         if (opc == OST_WRITE)
744                 desc = ptlrpc_prep_bulk_imp (req, page_count,
745                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
746         else
747                 desc = ptlrpc_prep_bulk_imp (req, page_count,
748                                              BULK_PUT_SINK, OST_BULK_PORTAL);
749         if (desc == NULL)
750                 GOTO(out, rc = -ENOMEM);
751         /* NB request now owns desc and will free it when it gets freed */
752
753         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
754         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
755         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
756
757         memcpy(&body->oa, oa, sizeof(*oa));
758
759         obdo_to_ioobj(oa, ioobj);
760         ioobj->ioo_bufcnt = niocount;
761
762         LASSERT (page_count > 0);
763
764         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
765                 struct brw_page *pg = &pga[i];
766                 struct brw_page *pg_prev = pg - 1;
767
768                 LASSERT(pg->count > 0);
769                 LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE,
770                          "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg,
771                          pg->page_offset, pg->count);
772                 LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset,
773                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
774                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
775                          i, page_count,
776                          pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset,
777                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
778                          pg_prev->disk_offset);
779
780                 ptlrpc_prep_bulk_page(desc, pg->pg,
781                                       pg->page_offset & ~PAGE_MASK, pg->count);
782                 requested_nob += pg->count;
783
784                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
785                         niobuf--;
786                         niobuf->len += pg->count;
787                 } else {
788                         niobuf->offset = pg->disk_offset;
789                         niobuf->len    = pg->count;
790                         niobuf->flags  = pg->flag;
791                 }
792         }
793
794         LASSERT((void *)(niobuf - niocount) ==
795                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
796         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
797
798         /* size[0] still sizeof (*body) */
799         if (opc == OST_WRITE) {
800 #if CHECKSUM_BULK
801                 body->oa.o_valid |= OBD_MD_FLCKSUM;
802                 body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
803 #endif
804                 /* 1 RC per niobuf */
805                 size[1] = sizeof(__u32) * niocount;
806                 req->rq_replen = lustre_msg_size(2, size);
807         } else {
808                 /* 1 RC for the whole I/O */
809                 req->rq_replen = lustre_msg_size(1, size);
810         }
811
812         *niocountp = niocount;
813         *requested_nobp = requested_nob;
814         *reqp = req;
815         return (0);
816
817  out:
818         ptlrpc_req_finished (req);
819         return (rc);
820 }
821
822 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
823                                 int requested_nob, int niocount,
824                                 obd_count page_count, struct brw_page *pga,
825                                 int rc)
826 {
827         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
828         struct ost_body *body;
829         ENTRY;
830
831         if (rc < 0)
832                 RETURN(rc);
833
834         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
835         if (body == NULL) {
836                 CERROR ("Can't unpack body\n");
837                 RETURN(-EPROTO);
838         }
839
840         osc_update_grant(cli, body);
841         memcpy(oa, &body->oa, sizeof(*oa));
842
843         if (req->rq_reqmsg->opc == OST_WRITE) {
844                 if (rc > 0) {
845                         CERROR ("Unexpected +ve rc %d\n", rc);
846                         RETURN(-EPROTO);
847                 }
848                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
849
850                 RETURN(check_write_rcs(req, requested_nob, niocount,
851                                        page_count, pga));
852         }
853
854         if (rc > requested_nob) {
855                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
856                 RETURN(-EPROTO);
857         }
858
859         if (rc != req->rq_bulk->bd_nob_transferred) {
860                 CERROR ("Unexpected rc %d (%d transferred)\n",
861                         rc, req->rq_bulk->bd_nob_transferred);
862                 return (-EPROTO);
863         }
864
865         if (rc < requested_nob)
866                 handle_short_read(rc, page_count, pga);
867
868 #if CHECKSUM_BULK
869         if (oa->o_valid & OBD_MD_FLCKSUM) {
870                 const struct ptlrpc_peer *peer =
871                         &req->rq_import->imp_connection->c_peer;
872                 static int cksum_counter;
873                 obd_count server_cksum = oa->o_cksum;
874                 obd_count cksum = cksum_pages(rc, page_count, pga);
875                 char str[PTL_NALFMT_SIZE];
876
877                 ptlrpc_peernid2str(peer, str);
878
879                 cksum_counter++;
880                 if (server_cksum != cksum) {
881                         CERROR("Bad checksum: server %x, client %x, server NID "
882                                LPX64" (%s)\n", server_cksum, cksum,
883                                peer->peer_id.nid, str);
884                         cksum_counter = 0;
885                         oa->o_cksum = cksum;
886                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
887                         CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
888                               cksum_counter, peer->peer_id.nid, str, cksum);
889                 }
890         } else {
891                 static int cksum_missed;
892
893                 cksum_missed++;
894                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
895                         CERROR("Request checksum %u from "LPX64", no reply\n",
896                                cksum_missed,
897                                req->rq_import->imp_connection->c_peer.peer_id.nid);
898         }
899 #endif
900         RETURN(0);
901 }
902
903 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
904                             struct lov_stripe_md *lsm,
905                             obd_count page_count, struct brw_page *pga)
906 {
907         int                    requested_nob;
908         int                    niocount;
909         struct ptlrpc_request *request;
910         int                    rc;
911         ENTRY;
912
913 restart_bulk:
914         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
915                                   page_count, pga, &requested_nob, &niocount,
916                                   &request);
917         if (rc != 0)
918                 return (rc);
919
920         rc = ptlrpc_queue_wait(request);
921
922         if (rc == -ETIMEDOUT && request->rq_resend) {
923                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
924                 ptlrpc_req_finished(request);
925                 goto restart_bulk;
926         }
927
928         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
929                                   page_count, pga, rc);
930
931         ptlrpc_req_finished(request);
932         RETURN (rc);
933 }
934
935 static int brw_interpret(struct ptlrpc_request *request,
936                          struct osc_brw_async_args *aa, int rc)
937 {
938         struct obdo *oa      = aa->aa_oa;
939         int requested_nob    = aa->aa_requested_nob;
940         int niocount         = aa->aa_nio_count;
941         obd_count page_count = aa->aa_page_count;
942         struct brw_page *pga = aa->aa_pga;
943         ENTRY;
944
945         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
946                                   page_count, pga, rc);
947         RETURN (rc);
948 }
949
950 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
951                           struct lov_stripe_md *lsm, obd_count page_count,
952                           struct brw_page *pga, struct ptlrpc_request_set *set)
953 {
954         struct ptlrpc_request     *request;
955         int                        requested_nob;
956         int                        nio_count;
957         struct osc_brw_async_args *aa;
958         int                        rc;
959         ENTRY;
960
961         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
962                                   page_count, pga, &requested_nob, &nio_count,
963                                   &request);
964         if (rc == 0) {
965                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
966                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
967                 aa->aa_oa = oa;
968                 aa->aa_requested_nob = requested_nob;
969                 aa->aa_nio_count = nio_count;
970                 aa->aa_page_count = page_count;
971                 aa->aa_pga = pga;
972
973                 request->rq_interpret_reply = brw_interpret;
974                 ptlrpc_set_add_req(set, request);
975         }
976         RETURN (rc);
977 }
978
979 #ifndef min_t
980 #define min_t(type,x,y) \
981         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
982 #endif
983
984 /*
985  * ugh, we want disk allocation on the target to happen in offset order.  we'll
986  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
987  * fine for our small page arrays and doesn't require allocation.  its an
988  * insertion sort that swaps elements that are strides apart, shrinking the
989  * stride down until its '1' and the array is sorted.
990  */
991 static void sort_brw_pages(struct brw_page *array, int num)
992 {
993         int stride, i, j;
994         struct brw_page tmp;
995
996         if (num == 1)
997                 return;
998         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
999                 ;
1000
1001         do {
1002                 stride /= 3;
1003                 for (i = stride ; i < num ; i++) {
1004                         tmp = array[i];
1005                         j = i;
1006                         while (j >= stride && array[j - stride].disk_offset >
1007                                 tmp.disk_offset) {
1008                                 array[j] = array[j - stride];
1009                                 j -= stride;
1010                         }
1011                         array[j] = tmp;
1012                 }
1013         } while (stride > 1);
1014 }
1015
1016 /* make sure we the regions we're passing to elan don't violate its '4
1017  * fragments' constraint.  portal headers are a fragment, all full
1018  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1019  * counts as a fragment.  I think.  see bug 934. */
1020 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1021 {
1022         int frags_left = 3;
1023         int saw_whole_frag = 0;
1024         int i;
1025
1026         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1027                 if (pg->count == PAGE_SIZE) {
1028                         if (!saw_whole_frag) {
1029                                 saw_whole_frag = 1;
1030                                 frags_left--;
1031                         }
1032                 } else {
1033                         frags_left--;
1034                 }
1035         }
1036         return i;
1037 }
1038
1039 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1040                    struct lov_stripe_md *lsm, obd_count page_count,
1041                    struct brw_page *pga, struct obd_trans_info *oti)
1042 {
1043         ENTRY;
1044
1045         if (cmd == OBD_BRW_CHECK) {
1046                 /* The caller just wants to know if there's a chance that this
1047                  * I/O can succeed */
1048                 struct obd_import *imp = class_exp2cliimp(exp);
1049
1050                 if (imp == NULL || imp->imp_invalid)
1051                         RETURN(-EIO);
1052                 RETURN(0);
1053         }
1054
1055         while (page_count) {
1056                 obd_count pages_per_brw;
1057                 int rc;
1058
1059                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1060                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1061                 else
1062                         pages_per_brw = page_count;
1063
1064                 sort_brw_pages(pga, pages_per_brw);
1065                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1066
1067                 rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga);
1068
1069                 if (rc != 0)
1070                         RETURN(rc);
1071
1072                 page_count -= pages_per_brw;
1073                 pga += pages_per_brw;
1074         }
1075         RETURN(0);
1076 }
1077
1078 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1079                          struct lov_stripe_md *lsm, obd_count page_count,
1080                          struct brw_page *pga, struct ptlrpc_request_set *set,
1081                          struct obd_trans_info *oti)
1082 {
1083         ENTRY;
1084
1085         if (cmd == OBD_BRW_CHECK) {
1086                 /* The caller just wants to know if there's a chance that this
1087                  * I/O can succeed */
1088                 struct obd_import *imp = class_exp2cliimp(exp);
1089
1090                 if (imp == NULL || imp->imp_invalid)
1091                         RETURN(-EIO);
1092                 RETURN(0);
1093         }
1094
1095         while (page_count) {
1096                 obd_count pages_per_brw;
1097                 int rc;
1098
1099                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1100                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1101                 else
1102                         pages_per_brw = page_count;
1103
1104                 sort_brw_pages(pga, pages_per_brw);
1105                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1106
1107                 rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set);
1108
1109                 if (rc != 0)
1110                         RETURN(rc);
1111
1112                 page_count -= pages_per_brw;
1113                 pga += pages_per_brw;
1114         }
1115         RETURN(0);
1116 }
1117
1118 static void osc_check_rpcs(struct client_obd *cli);
1119 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1120                            int sent);
1121 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
1122 static void lop_update_pending(struct client_obd *cli,
1123                                struct loi_oap_pages *lop, int cmd, int delta);
1124
1125 /* this is called when a sync waiter receives an interruption.  Its job is to
1126  * get the caller woken as soon as possible.  If its page hasn't been put in an
1127  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1128  * desiring interruption which will forcefully complete the rpc once the rpc
1129  * has timed out */
1130 static void osc_occ_interrupted(struct oig_callback_context *occ)
1131 {
1132         struct osc_async_page *oap;
1133         struct loi_oap_pages *lop;
1134         struct lov_oinfo *loi;
1135         ENTRY;
1136
1137         /* XXX member_of() */
1138         oap = list_entry(occ, struct osc_async_page, oap_occ);
1139
1140         spin_lock(&oap->oap_cli->cl_loi_list_lock);
1141
1142         oap->oap_interrupted = 1;
1143
1144         /* ok, it's been put in an rpc. */
1145         if (oap->oap_request != NULL) {
1146                 ptlrpc_mark_interrupted(oap->oap_request);
1147                 ptlrpcd_wake(oap->oap_request);
1148                 GOTO(unlock, 0);
1149         }
1150
1151         /* we don't get interruption callbacks until osc_trigger_sync_io()
1152          * has been called and put the sync oaps in the pending/urgent lists.*/
1153         if (!list_empty(&oap->oap_pending_item)) {
1154                 list_del_init(&oap->oap_pending_item);
1155                 if (oap->oap_async_flags & ASYNC_URGENT)
1156                         list_del_init(&oap->oap_urgent_item);
1157
1158                 loi = oap->oap_loi;
1159                 lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
1160                         &loi->loi_write_lop : &loi->loi_read_lop;
1161                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1162                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1163
1164                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1165                 oap->oap_oig = NULL;
1166         }
1167
1168 unlock:
1169         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1170 }
1171
1172 /* this must be called holding the loi list lock to give coverage to exit_cache,
1173  * async_flag maintenance, and oap_request */
1174 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1175                               struct osc_async_page *oap, int sent, int rc)
1176 {
1177         osc_exit_cache(cli, oap, sent);
1178         oap->oap_async_flags = 0;
1179         oap->oap_interrupted = 0;
1180
1181         if (oap->oap_request != NULL) {
1182                 ptlrpc_req_finished(oap->oap_request);
1183                 oap->oap_request = NULL;
1184         }
1185
1186         if (rc == 0 && oa != NULL)
1187                 oap->oap_loi->loi_blocks = oa->o_blocks;
1188
1189         if (oap->oap_oig) {
1190                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1191                 oap->oap_oig = NULL;
1192                 EXIT;
1193                 return;
1194         }
1195
1196         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1197                                            oa, rc);
1198 }
1199
1200 static int brw_interpret_oap(struct ptlrpc_request *request,
1201                              struct osc_brw_async_args *aa, int rc)
1202 {
1203         struct osc_async_page *oap;
1204         struct client_obd *cli;
1205         struct list_head *pos, *n;
1206         struct timeval now;
1207         ENTRY;
1208
1209         do_gettimeofday(&now);
1210         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1211                                   aa->aa_nio_count, aa->aa_page_count,
1212                                   aa->aa_pga, rc);
1213
1214         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1215
1216         cli = aa->aa_cli;
1217         /* in failout recovery we ignore writeback failure and want
1218          * to just tell llite to unlock the page and continue */
1219         if (request->rq_reqmsg->opc == OST_WRITE && 
1220             (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
1221                 CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n", 
1222                        cli->cl_import, 
1223                        cli->cl_import ? cli->cl_import->imp_invalid : -1);
1224                 rc = 0;
1225         }
1226
1227         spin_lock(&cli->cl_loi_list_lock);
1228
1229         if (request->rq_reqmsg->opc == OST_WRITE)
1230                 lprocfs_stime_record(&cli->cl_write_stime, &now,
1231                                      &request->rq_rpcd_start);
1232         else
1233                 lprocfs_stime_record(&cli->cl_read_stime, &now,
1234                                      &request->rq_rpcd_start);
1235
1236
1237
1238         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1239          * is called so we know whether to go to sync BRWs or wait for more
1240          * RPCs to complete */
1241         if (request->rq_reqmsg->opc == OST_WRITE)
1242                 cli->cl_w_in_flight--;
1243         else
1244                 cli->cl_r_in_flight--;
1245
1246         /* the caller may re-use the oap after the completion call so
1247          * we need to clean it up a little */
1248         list_for_each_safe(pos, n, &aa->aa_oaps) {
1249                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1250
1251                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1252                        //oap->oap_page, oap->oap_page->index, oap);
1253
1254                 list_del_init(&oap->oap_rpc_item);
1255                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1256         }
1257
1258         osc_wake_cache_waiters(cli);
1259         osc_check_rpcs(cli);
1260
1261         spin_unlock(&cli->cl_loi_list_lock);
1262
1263         obdo_free(aa->aa_oa);
1264         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1265
1266         RETURN(0);
1267 }
1268
1269 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1270                                             struct list_head *rpc_list,
1271                                             int page_count, int cmd)
1272 {
1273         struct ptlrpc_request *req;
1274         struct brw_page *pga = NULL;
1275         int requested_nob, nio_count;
1276         struct osc_brw_async_args *aa;
1277         struct obdo *oa = NULL;
1278         struct obd_async_page_ops *ops = NULL;
1279         void *caller_data = NULL;
1280         struct list_head *pos;
1281         int i, rc;
1282
1283         LASSERT(!list_empty(rpc_list));
1284
1285         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1286         if (pga == NULL)
1287                 RETURN(ERR_PTR(-ENOMEM));
1288
1289         oa = obdo_alloc();
1290         if (oa == NULL)
1291                 GOTO(out, req = ERR_PTR(-ENOMEM));
1292
1293         i = 0;
1294         list_for_each(pos, rpc_list) {
1295                 struct osc_async_page *oap;
1296
1297                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1298                 if (ops == NULL) {
1299                         ops = oap->oap_caller_ops;
1300                         caller_data = oap->oap_caller_data;
1301                 }
1302                 pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off;
1303                 pga[i].page_offset = pga[i].disk_offset;
1304                 pga[i].pg = oap->oap_page;
1305                 pga[i].count = oap->oap_count;
1306                 pga[i].flag = oap->oap_brw_flags;
1307                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1308                        pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1309                 i++;
1310         }
1311
1312         /* always get the data for the obdo for the rpc */
1313         LASSERT(ops != NULL);
1314         ops->ap_fill_obdo(caller_data, cmd, oa);
1315
1316         sort_brw_pages(pga, page_count);
1317         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1318                                   pga, &requested_nob, &nio_count, &req);
1319         if (rc != 0) {
1320                 CERROR("prep_req failed: %d\n", rc);
1321                 GOTO(out, req = ERR_PTR(rc));
1322         }
1323
1324         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1325         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1326         aa->aa_oa = oa;
1327         aa->aa_requested_nob = requested_nob;
1328         aa->aa_nio_count = nio_count;
1329         aa->aa_page_count = page_count;
1330         aa->aa_pga = pga;
1331         aa->aa_cli = cli;
1332
1333 out:
1334         if (IS_ERR(req)) {
1335                 if (oa)
1336                         obdo_free(oa);
1337                 if (pga)
1338                         OBD_FREE(pga, sizeof(*pga) * page_count);
1339         }
1340         RETURN(req);
1341 }
1342
1343 static void lop_update_pending(struct client_obd *cli,
1344                                struct loi_oap_pages *lop, int cmd, int delta)
1345 {
1346         lop->lop_num_pending += delta;
1347         if (cmd == OBD_BRW_WRITE)
1348                 cli->cl_pending_w_pages += delta;
1349         else
1350                 cli->cl_pending_r_pages += delta;
1351 }
1352
1353 /* the loi lock is held across this function but it's allowed to release
1354  * and reacquire it during its work */
1355 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1356                             int cmd, struct loi_oap_pages *lop)
1357 {
1358         struct ptlrpc_request *request;
1359         obd_count page_count = 0;
1360         struct list_head *tmp, *pos;
1361         struct osc_async_page *oap = NULL;
1362         struct osc_brw_async_args *aa;
1363         struct obd_async_page_ops *ops;
1364         LIST_HEAD(rpc_list);
1365         ENTRY;
1366
1367         /* first we find the pages we're allowed to work with */
1368         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1369                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1370                 ops = oap->oap_caller_ops;
1371
1372                 LASSERT(oap->oap_magic == OAP_MAGIC);
1373
1374                 /* in llite being 'ready' equates to the page being locked
1375                  * until completion unlocks it.  commit_write submits a page
1376                  * as not ready because its unlock will happen unconditionally
1377                  * as the call returns.  if we race with commit_write giving
1378                  * us that page we dont' want to create a hole in the page
1379                  * stream, so we stop and leave the rpc to be fired by
1380                  * another dirtier or kupdated interval (the not ready page
1381                  * will still be on the dirty list).  we could call in
1382                  * at the end of ll_file_write to process the queue again. */
1383                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1384                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1385                         if (rc < 0)
1386                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1387                                                 "instead of ready\n", oap,
1388                                                 oap->oap_page, rc);
1389                         switch (rc) {
1390                         case -EAGAIN:
1391                                 /* llite is telling us that the page is still
1392                                  * in commit_write and that we should try
1393                                  * and put it in an rpc again later.  we
1394                                  * break out of the loop so we don't create
1395                                  * a hole in the sequence of pages in the rpc
1396                                  * stream.*/
1397                                 pos = NULL;
1398                                 break;
1399                         case -EINTR:
1400                                 /* the io isn't needed.. tell the checks
1401                                  * below to complete the rpc with EINTR */
1402                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1403                                 oap->oap_count = -EINTR;
1404                                 break;
1405                         case 0:
1406                                 oap->oap_async_flags |= ASYNC_READY;
1407                                 break;
1408                         default:
1409                                 LASSERTF(0, "oap %p page %p returned %d "
1410                                             "from make_ready\n", oap,
1411                                             oap->oap_page, rc);
1412                                 break;
1413                         }
1414                 }
1415                 if (pos == NULL)
1416                         break;
1417
1418                 /* take the page out of our book-keeping */
1419                 list_del_init(&oap->oap_pending_item);
1420                 lop_update_pending(cli, lop, cmd, -1);
1421                 list_del_init(&oap->oap_urgent_item);
1422
1423                 /* ask the caller for the size of the io as the rpc leaves. */
1424                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1425                         oap->oap_count =
1426                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1427                 if (oap->oap_count <= 0) {
1428                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1429                                oap->oap_count);
1430                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1431                         continue;
1432                 }
1433
1434                 /* now put the page back in our accounting */
1435                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1436                 if (++page_count >= cli->cl_max_pages_per_rpc)
1437                         break;
1438         }
1439
1440         osc_wake_cache_waiters(cli);
1441
1442         if (page_count == 0)
1443                 RETURN(0);
1444
1445         loi_list_maint(cli, loi);
1446         spin_unlock(&cli->cl_loi_list_lock);
1447
1448         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1449         if (IS_ERR(request)) {
1450                 /* this should happen rarely and is pretty bad, it makes the
1451                  * pending list not follow the dirty order */
1452                 spin_lock(&cli->cl_loi_list_lock);
1453                 list_for_each_safe(pos, tmp, &rpc_list) {
1454                         oap = list_entry(pos, struct osc_async_page,
1455                                          oap_rpc_item);
1456                         list_del_init(&oap->oap_rpc_item);
1457
1458                         /* queued sync pages can be torn down while the pages
1459                          * were between the pending list and the rpc */
1460                         if (oap->oap_interrupted) {
1461                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1462                                 osc_ap_completion(cli, NULL, oap, 0,
1463                                                   oap->oap_count);
1464                                 continue;
1465                         }
1466
1467                         /* put the page back in the loi/lop lists */
1468                         list_add_tail(&oap->oap_pending_item,
1469                                       &lop->lop_pending);
1470                         lop_update_pending(cli, lop, cmd, 1);
1471                         if (oap->oap_async_flags & ASYNC_URGENT)
1472                                 list_add(&oap->oap_urgent_item,
1473                                          &lop->lop_urgent);
1474                 }
1475                 loi_list_maint(cli, loi);
1476                 RETURN(PTR_ERR(request));
1477         }
1478
1479         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1480         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1481         INIT_LIST_HEAD(&aa->aa_oaps);
1482         list_splice(&rpc_list, &aa->aa_oaps);
1483         INIT_LIST_HEAD(&rpc_list);
1484
1485 #ifdef __KERNEL__
1486         if (cmd == OBD_BRW_READ) {
1487                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1488                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1489         } else {
1490                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1491                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1492                                  cli->cl_w_in_flight);
1493         }
1494 #endif
1495
1496         spin_lock(&cli->cl_loi_list_lock);
1497
1498         if (cmd == OBD_BRW_READ)
1499                 cli->cl_r_in_flight++;
1500         else
1501                 cli->cl_w_in_flight++;
1502         /* queued sync pages can be torn down while the pages
1503          * were between the pending list and the rpc */
1504         list_for_each(pos, &aa->aa_oaps) {
1505                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1506                 if (oap->oap_interrupted) {
1507                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1508                                oap, request);
1509                         ptlrpc_mark_interrupted(request);
1510                         break;
1511                 }
1512         }
1513
1514         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1515                         request, page_count, aa, cli->cl_r_in_flight,
1516                         cli->cl_w_in_flight);
1517
1518         oap->oap_request = ptlrpc_request_addref(request);
1519         request->rq_interpret_reply = brw_interpret_oap;
1520         ptlrpcd_add_req(request);
1521         RETURN(1);
1522 }
1523
1524 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1525                          int cmd)
1526 {
1527         int optimal;
1528         ENTRY;
1529
1530         if (lop->lop_num_pending == 0)
1531                 RETURN(0);
1532
1533         /* if we have an invalid import we want to drain the queued pages
1534          * by forcing them through rpcs that immediately fail and complete
1535          * the pages.  recovery relies on this to empty the queued pages
1536          * before canceling the locks and evicting down the llite pages */
1537         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1538                 RETURN(1);
1539
1540         /* stream rpcs in queue order as long as as there is an urgent page
1541          * queued.  this is our cheap solution for good batching in the case
1542          * where writepage marks some random page in the middle of the file as
1543          * urgent because of, say, memory pressure */
1544         if (!list_empty(&lop->lop_urgent))
1545                 RETURN(1);
1546
1547         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1548         optimal = cli->cl_max_pages_per_rpc;
1549         if (cmd == OBD_BRW_WRITE) {
1550                 /* trigger a write rpc stream as long as there are dirtiers
1551                  * waiting for space.  as they're waiting, they're not going to
1552                  * create more pages to coallesce with what's waiting.. */
1553                 if (!list_empty(&cli->cl_cache_waiters))
1554                         RETURN(1);
1555
1556                 /* *2 to avoid triggering rpcs that would want to include pages
1557                  * that are being queued but which can't be made ready until
1558                  * the queuer finishes with the page. this is a wart for
1559                  * llite::commit_write() */
1560                 optimal += 16;
1561         }
1562         if (lop->lop_num_pending >= optimal)
1563                 RETURN(1);
1564
1565         RETURN(0);
1566 }
1567
1568 static void on_list(struct list_head *item, struct list_head *list,
1569                     int should_be_on)
1570 {
1571         if (list_empty(item) && should_be_on)
1572                 list_add_tail(item, list);
1573         else if (!list_empty(item) && !should_be_on)
1574                 list_del_init(item);
1575 }
1576
1577 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1578  * can find pages to build into rpcs quickly */
1579 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1580 {
1581         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1582                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1583                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1584
1585         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1586                 loi->loi_write_lop.lop_num_pending);
1587
1588         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1589                 loi->loi_read_lop.lop_num_pending);
1590 }
1591
1592 #define LOI_DEBUG(LOI, STR, args...)                                     \
1593         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1594                !list_empty(&(LOI)->loi_cli_item),                        \
1595                (LOI)->loi_write_lop.lop_num_pending,                     \
1596                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1597                (LOI)->loi_read_lop.lop_num_pending,                      \
1598                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1599                args)                                                     \
1600
1601 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1602 {
1603         ENTRY;
1604         /* first return all objects which we already know to have
1605          * pages ready to be stuffed into rpcs */
1606         if (!list_empty(&cli->cl_loi_ready_list))
1607                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1608                                   struct lov_oinfo, loi_cli_item));
1609
1610         /* then if we have cache waiters, return all objects with queued
1611          * writes.  This is especially important when many small files
1612          * have filled up the cache and not been fired into rpcs because
1613          * they don't pass the nr_pending/object threshhold */
1614         if (!list_empty(&cli->cl_cache_waiters) &&
1615             !list_empty(&cli->cl_loi_write_list))
1616                 RETURN(list_entry(cli->cl_loi_write_list.next,
1617                                   struct lov_oinfo, loi_write_item));
1618
1619         /* then return all queued objects when we have an invalid import
1620          * so that they get flushed */
1621         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1622                 if (!list_empty(&cli->cl_loi_write_list))
1623                         RETURN(list_entry(cli->cl_loi_write_list.next,
1624                                           struct lov_oinfo, loi_write_item));
1625                 if (!list_empty(&cli->cl_loi_read_list))
1626                         RETURN(list_entry(cli->cl_loi_read_list.next,
1627                                           struct lov_oinfo, loi_read_item));
1628         }
1629         RETURN(NULL);
1630 }
1631
1632 /* called with the loi list lock held */
1633 static void osc_check_rpcs(struct client_obd *cli)
1634 {
1635         struct lov_oinfo *loi;
1636         int rc = 0, race_counter = 0;
1637         ENTRY;
1638
1639         while ((loi = osc_next_loi(cli)) != NULL) {
1640                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1641                 
1642                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)                        
1643                         break;
1644
1645                 /* attempt some read/write balancing by alternating between
1646                  * reads and writes in an object.  The makes_rpc checks here
1647                  * would be redundant if we were getting read/write work items
1648                  * instead of objects.  we don't want send_oap_rpc to drain a
1649                  * partial read pending queue when we're given this object to
1650                  * do io on writes while there are cache waiters */
1651                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1652                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1653                                               &loi->loi_write_lop);
1654                         if (rc < 0)
1655                                 break;
1656                         if (rc > 0)
1657                                 race_counter = 0;
1658                         else
1659                                 race_counter++;
1660                 }
1661                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1662                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1663                                               &loi->loi_read_lop);
1664                         if (rc < 0)
1665                                 break;
1666                         if (rc > 0)
1667                                 race_counter = 0;
1668                         else
1669                                 race_counter++;
1670                 }
1671
1672                 /* attempt some inter-object balancing by issueing rpcs
1673                  * for each object in turn */
1674                 if (!list_empty(&loi->loi_cli_item))
1675                         list_del_init(&loi->loi_cli_item);
1676                 if (!list_empty(&loi->loi_write_item))
1677                         list_del_init(&loi->loi_write_item);
1678                 if (!list_empty(&loi->loi_read_item))
1679                         list_del_init(&loi->loi_read_item);
1680
1681                 loi_list_maint(cli, loi);
1682
1683                 /* send_oap_rpc fails with 0 when make_ready tells it to
1684                  * back off.  llite's make_ready does this when it tries
1685                  * to lock a page queued for write that is already locked.
1686                  * we want to try sending rpcs from many objects, but we
1687                  * don't want to spin failing with 0.  */
1688                 if (race_counter == 10)
1689                         break;
1690         }
1691         EXIT;
1692 }
1693
1694 /* we're trying to queue a page in the osc so we're subject to the
1695  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1696  * If the osc's queued pages are already at that limit, then we want to sleep
1697  * until there is space in the osc's queue for us.  We also may be waiting for
1698  * write credits from the OST if there are RPCs in flight that may return some
1699  * before we fall back to sync writes.
1700  *
1701  * We need this know our allocation was granted in the presence of signals */
1702 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1703 {
1704         int rc;
1705         ENTRY;
1706         spin_lock(&cli->cl_loi_list_lock);
1707         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1708         spin_unlock(&cli->cl_loi_list_lock);
1709         RETURN(rc);
1710 };
1711
1712 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1713  * grant or cache space. */
1714 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1715                            struct osc_async_page *oap)
1716 {
1717         struct osc_cache_waiter ocw;
1718         struct l_wait_info lwi = { 0 };
1719         struct timeval start, stop;
1720
1721         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1722                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1723                cli->cl_avail_grant);
1724
1725         if (cli->cl_dirty_max < PAGE_SIZE)
1726                 return(-EDQUOT);
1727
1728         /* Hopefully normal case - cache space and write credits available */
1729         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1730             cli->cl_avail_grant >= PAGE_SIZE) {
1731                 /* account for ourselves */
1732                 osc_consume_write_grant(cli, oap);
1733                 return(0);
1734         }
1735
1736         /* Make sure that there are write rpcs in flight to wait for.  This
1737          * is a little silly as this object may not have any pending but
1738          * other objects sure might. */
1739         if (cli->cl_w_in_flight) {                
1740                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1741                 init_waitqueue_head(&ocw.ocw_waitq);
1742                 ocw.ocw_oap = oap;
1743                 ocw.ocw_rc = 0;
1744
1745                 loi_list_maint(cli, loi);
1746                 osc_check_rpcs(cli);
1747                 spin_unlock(&cli->cl_loi_list_lock);
1748
1749                 CDEBUG(0, "sleeping for cache space\n");
1750                 do_gettimeofday(&start);
1751                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1752                 do_gettimeofday(&stop);
1753                 spin_lock(&cli->cl_loi_list_lock);
1754                 lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start);
1755                 if (!list_empty(&ocw.ocw_entry)) {
1756                         list_del(&ocw.ocw_entry);
1757                         RETURN(-EINTR);
1758                 }
1759                 RETURN(ocw.ocw_rc);
1760         }
1761
1762         RETURN(-EDQUOT);
1763 }
1764
1765 /* the companion to enter_cache, called when an oap is no longer part of the
1766  * dirty accounting.. so writeback completes or truncate happens before writing
1767  * starts.  must be called with the loi lock held. */
1768 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1769                            int sent)
1770 {
1771         ENTRY;
1772
1773         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1774                 EXIT;
1775                 return;
1776         }
1777
1778         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1779         cli->cl_dirty -= PAGE_SIZE;
1780         if (!sent) {
1781                 cli->cl_lost_grant += PAGE_SIZE;
1782                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1783                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1784         }
1785
1786         EXIT;
1787 }
1788
1789 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1790                         struct lov_oinfo *loi, struct page *page,
1791                         obd_off offset, struct obd_async_page_ops *ops,
1792                         void *data, void **res)
1793 {
1794         struct osc_async_page *oap;
1795         ENTRY;
1796
1797         OBD_ALLOC(oap, sizeof(*oap));
1798         if (oap == NULL)
1799                 return -ENOMEM;
1800
1801         oap->oap_magic = OAP_MAGIC;
1802         oap->oap_cli = &exp->exp_obd->u.cli;
1803         oap->oap_loi = loi;
1804
1805         oap->oap_caller_ops = ops;
1806         oap->oap_caller_data = data;
1807
1808         oap->oap_page = page;
1809         oap->oap_obj_off = offset;
1810
1811         INIT_LIST_HEAD(&oap->oap_pending_item);
1812         INIT_LIST_HEAD(&oap->oap_urgent_item);
1813         INIT_LIST_HEAD(&oap->oap_rpc_item);
1814
1815         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
1816
1817         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
1818         *res = oap;
1819         RETURN(0);
1820 }
1821
1822 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1823                               struct lov_oinfo *loi, void *cookie,
1824                               int cmd, obd_off off, int count,
1825                               obd_flags brw_flags, enum async_flags async_flags)
1826 {
1827         struct client_obd *cli = &exp->exp_obd->u.cli;
1828         struct osc_async_page *oap;
1829         struct loi_oap_pages *lop;
1830         int rc;
1831         ENTRY;
1832
1833         oap = OAP_FROM_COOKIE(cookie);
1834
1835         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1836                 RETURN(-EIO);
1837
1838         if (!list_empty(&oap->oap_pending_item) ||
1839             !list_empty(&oap->oap_urgent_item) ||
1840             !list_empty(&oap->oap_rpc_item))
1841                 RETURN(-EBUSY);
1842
1843         if (loi == NULL)
1844                 loi = &lsm->lsm_oinfo[0];
1845
1846         spin_lock(&cli->cl_loi_list_lock);
1847
1848         oap->oap_cmd = cmd;
1849         oap->oap_async_flags = async_flags;
1850         oap->oap_page_off = off;
1851         oap->oap_count = count;
1852         oap->oap_brw_flags = brw_flags;
1853
1854         if (cmd == OBD_BRW_WRITE) {
1855                 rc = osc_enter_cache(cli, loi, oap);
1856                 if (rc) {
1857                         spin_unlock(&cli->cl_loi_list_lock);
1858                         RETURN(rc);
1859                 }
1860                 lop = &loi->loi_write_lop;
1861         } else {
1862                 lop = &loi->loi_read_lop;
1863         }
1864
1865         if (oap->oap_async_flags & ASYNC_URGENT)
1866                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1867         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1868         lop_update_pending(cli, lop, cmd, 1);
1869
1870         loi_list_maint(cli, loi);
1871
1872         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
1873                   cmd);
1874
1875         osc_check_rpcs(cli);
1876         spin_unlock(&cli->cl_loi_list_lock);
1877
1878         RETURN(0);
1879 }
1880
1881 /* aka (~was & now & flag), but this is more clear :) */
1882 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1883
1884 static int osc_set_async_flags(struct obd_export *exp,
1885                                struct lov_stripe_md *lsm,
1886                                struct lov_oinfo *loi, void *cookie,
1887                                obd_flags async_flags)
1888 {
1889         struct client_obd *cli = &exp->exp_obd->u.cli;
1890         struct loi_oap_pages *lop;
1891         struct osc_async_page *oap;
1892         int rc = 0;
1893         ENTRY;
1894
1895         oap = OAP_FROM_COOKIE(cookie);
1896
1897         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1898                 RETURN(-EIO);
1899
1900         if (loi == NULL)
1901                 loi = &lsm->lsm_oinfo[0];
1902
1903         if (oap->oap_cmd == OBD_BRW_WRITE) {
1904                 lop = &loi->loi_write_lop;
1905         } else {
1906                 lop = &loi->loi_read_lop;
1907         }
1908
1909         spin_lock(&cli->cl_loi_list_lock);
1910
1911         if (list_empty(&oap->oap_pending_item))
1912                 GOTO(out, rc = -EINVAL);
1913
1914         if ((oap->oap_async_flags & async_flags) == async_flags)
1915                 GOTO(out, rc = 0);
1916
1917         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1918                 oap->oap_async_flags |= ASYNC_READY;
1919
1920         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
1921                 if (list_empty(&oap->oap_rpc_item)) {
1922                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1923                         loi_list_maint(cli, loi);
1924                 }
1925         }
1926
1927         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
1928                         oap->oap_async_flags);
1929 out:
1930         osc_check_rpcs(cli);
1931         spin_unlock(&cli->cl_loi_list_lock);
1932         RETURN(rc);
1933 }
1934
1935 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1936                              struct lov_oinfo *loi,
1937                              struct obd_io_group *oig, void *cookie,
1938                              int cmd, obd_off off, int count,
1939                              obd_flags brw_flags,
1940                              obd_flags async_flags)
1941 {
1942         struct client_obd *cli = &exp->exp_obd->u.cli;
1943         struct osc_async_page *oap;
1944         struct loi_oap_pages *lop;
1945         ENTRY;
1946
1947         oap = OAP_FROM_COOKIE(cookie);
1948
1949         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1950                 RETURN(-EIO);
1951
1952         if (!list_empty(&oap->oap_pending_item) ||
1953             !list_empty(&oap->oap_urgent_item) ||
1954             !list_empty(&oap->oap_rpc_item))
1955                 RETURN(-EBUSY);
1956
1957         if (loi == NULL)
1958                 loi = &lsm->lsm_oinfo[0];
1959
1960         spin_lock(&cli->cl_loi_list_lock);
1961
1962         oap->oap_cmd = cmd;
1963         oap->oap_page_off = off;
1964         oap->oap_count = count;
1965         oap->oap_brw_flags = brw_flags;
1966         oap->oap_async_flags = async_flags;
1967
1968         if (cmd == OBD_BRW_WRITE)
1969                 lop = &loi->loi_write_lop;
1970         else
1971                 lop = &loi->loi_read_lop;
1972
1973         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
1974         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
1975                 oap->oap_oig = oig;
1976                 oig_add_one(oig, &oap->oap_occ);
1977         }
1978
1979         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
1980
1981         spin_unlock(&cli->cl_loi_list_lock);
1982
1983         RETURN(0);
1984 }
1985
1986 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
1987                                  struct loi_oap_pages *lop, int cmd)
1988 {
1989         struct list_head *pos, *tmp;
1990         struct osc_async_page *oap;
1991
1992         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
1993                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1994                 list_del(&oap->oap_pending_item);
1995                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1996                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1997                 lop_update_pending(cli, lop, cmd, 1);
1998         }
1999         loi_list_maint(cli, loi);
2000 }
2001
2002 static int osc_trigger_group_io(struct obd_export *exp,
2003                                 struct lov_stripe_md *lsm,
2004                                 struct lov_oinfo *loi,
2005                                 struct obd_io_group *oig)
2006 {
2007         struct client_obd *cli = &exp->exp_obd->u.cli;
2008         ENTRY;
2009
2010         if (loi == NULL)
2011                 loi = &lsm->lsm_oinfo[0];
2012
2013         spin_lock(&cli->cl_loi_list_lock);
2014
2015         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2016         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2017
2018         osc_check_rpcs(cli);
2019         spin_unlock(&cli->cl_loi_list_lock);
2020
2021         RETURN(0);
2022 }
2023
2024 static int osc_teardown_async_page(struct obd_export *exp,
2025                                    struct lov_stripe_md *lsm,
2026                                    struct lov_oinfo *loi, void *cookie)
2027 {
2028         struct client_obd *cli = &exp->exp_obd->u.cli;
2029         struct loi_oap_pages *lop;
2030         struct osc_async_page *oap;
2031         int rc = 0;
2032         ENTRY;
2033
2034         oap = OAP_FROM_COOKIE(cookie);
2035
2036         if (loi == NULL)
2037                 loi = &lsm->lsm_oinfo[0];
2038
2039         if (oap->oap_cmd == OBD_BRW_WRITE) {
2040                 lop = &loi->loi_write_lop;
2041         } else {
2042                 lop = &loi->loi_read_lop;
2043         }
2044
2045         spin_lock(&cli->cl_loi_list_lock);
2046
2047         if (!list_empty(&oap->oap_rpc_item))
2048                 GOTO(out, rc = -EBUSY);
2049
2050         osc_exit_cache(cli, oap, 0);
2051         osc_wake_cache_waiters(cli);
2052
2053         if (!list_empty(&oap->oap_urgent_item)) {
2054                 list_del_init(&oap->oap_urgent_item);
2055                 oap->oap_async_flags &= ~ASYNC_URGENT;
2056         }
2057         if (!list_empty(&oap->oap_pending_item)) {
2058                 list_del_init(&oap->oap_pending_item);
2059                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2060         }
2061         loi_list_maint(cli, loi);
2062
2063         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2064 out:
2065         spin_unlock(&cli->cl_loi_list_lock);
2066         if (rc == 0)
2067                 OBD_FREE(oap, sizeof(*oap));
2068         RETURN(rc);
2069 }
2070
2071 #ifdef __KERNEL__
2072 /* Note: caller will lock/unlock, and set uptodate on the pages */
2073 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2074 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2075                            struct lov_stripe_md *lsm, obd_count page_count,
2076                            struct brw_page *pga)
2077 {
2078         struct ptlrpc_request *request = NULL;
2079         struct ost_body *body;
2080         struct niobuf_remote *nioptr;
2081         struct obd_ioobj *iooptr;
2082         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2083         int swab;
2084         ENTRY;
2085
2086         /* XXX does not handle 'new' brw protocol */
2087
2088         size[1] = sizeof(struct obd_ioobj);
2089         size[2] = page_count * sizeof(*nioptr);
2090
2091         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2092                                   OST_SAN_READ, 3, size, NULL);
2093         if (!request)
2094                 RETURN(-ENOMEM);
2095
2096         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2097         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2098         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2099                                 sizeof(*nioptr) * page_count);
2100
2101         memcpy(&body->oa, oa, sizeof(body->oa));
2102
2103         obdo_to_ioobj(oa, iooptr);
2104         iooptr->ioo_bufcnt = page_count;
2105
2106         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2107                 LASSERT(PageLocked(pga[mapped].pg));
2108                 LASSERT(mapped == 0 ||
2109                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2110
2111                 nioptr->offset = pga[mapped].disk_offset;
2112                 nioptr->len    = pga[mapped].count;
2113                 nioptr->flags  = pga[mapped].flag;
2114         }
2115
2116         size[1] = page_count * sizeof(*nioptr);
2117         request->rq_replen = lustre_msg_size(2, size);
2118
2119         rc = ptlrpc_queue_wait(request);
2120         if (rc)
2121                 GOTO(out_req, rc);
2122
2123         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2124                                   lustre_swab_ost_body);
2125         if (body == NULL) {
2126                 CERROR("Can't unpack body\n");
2127                 GOTO(out_req, rc = -EPROTO);
2128         }
2129
2130         memcpy(oa, &body->oa, sizeof(*oa));
2131
2132         swab = lustre_msg_swabbed(request->rq_repmsg);
2133         LASSERT_REPSWAB(request, 1);
2134         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2135         if (!nioptr) {
2136                 /* nioptr missing or short */
2137                 GOTO(out_req, rc = -EPROTO);
2138         }
2139
2140         /* actual read */
2141         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2142                 struct page *page = pga[mapped].pg;
2143                 struct buffer_head *bh;
2144                 kdev_t dev;
2145
2146                 if (swab)
2147                         lustre_swab_niobuf_remote (nioptr);
2148
2149                 /* got san device associated */
2150                 LASSERT(exp->exp_obd != NULL);
2151                 dev = exp->exp_obd->u.cli.cl_sandev;
2152
2153                 /* hole */
2154                 if (!nioptr->offset) {
2155                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2156                                         page->mapping->host->i_ino,
2157                                         page->index);
2158                         memset(page_address(page), 0, PAGE_SIZE);
2159                         continue;
2160                 }
2161
2162                 if (!page->buffers) {
2163                         create_empty_buffers(page, dev, PAGE_SIZE);
2164                         bh = page->buffers;
2165
2166                         clear_bit(BH_New, &bh->b_state);
2167                         set_bit(BH_Mapped, &bh->b_state);
2168                         bh->b_blocknr = (unsigned long)nioptr->offset;
2169
2170                         clear_bit(BH_Uptodate, &bh->b_state);
2171
2172                         ll_rw_block(READ, 1, &bh);
2173                 } else {
2174                         bh = page->buffers;
2175
2176                         /* if buffer already existed, it must be the
2177                          * one we mapped before, check it */
2178                         LASSERT(!test_bit(BH_New, &bh->b_state));
2179                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2180                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2181
2182                         /* wait it's io completion */
2183                         if (test_bit(BH_Lock, &bh->b_state))
2184                                 wait_on_buffer(bh);
2185
2186                         if (!test_bit(BH_Uptodate, &bh->b_state))
2187                                 ll_rw_block(READ, 1, &bh);
2188                 }
2189
2190
2191                 /* must do syncronous write here */
2192                 wait_on_buffer(bh);
2193                 if (!buffer_uptodate(bh)) {
2194                         /* I/O error */
2195                         rc = -EIO;
2196                         goto out_req;
2197                 }
2198         }
2199
2200 out_req:
2201         ptlrpc_req_finished(request);
2202         RETURN(rc);
2203 }
2204
2205 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2206                             struct lov_stripe_md *lsm, obd_count page_count,
2207                             struct brw_page *pga)
2208 {
2209         struct ptlrpc_request *request = NULL;
2210         struct ost_body *body;
2211         struct niobuf_remote *nioptr;
2212         struct obd_ioobj *iooptr;
2213         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2214         int swab;
2215         ENTRY;
2216
2217         size[1] = sizeof(struct obd_ioobj);
2218         size[2] = page_count * sizeof(*nioptr);
2219
2220         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2221                                   OST_SAN_WRITE, 3, size, NULL);
2222         if (!request)
2223                 RETURN(-ENOMEM);
2224
2225         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2226         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2227         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2228                                 sizeof (*nioptr) * page_count);
2229
2230         memcpy(&body->oa, oa, sizeof(body->oa));
2231
2232         obdo_to_ioobj(oa, iooptr);
2233         iooptr->ioo_bufcnt = page_count;
2234
2235         /* pack request */
2236         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2237                 LASSERT(PageLocked(pga[mapped].pg));
2238                 LASSERT(mapped == 0 ||
2239                         pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
2240
2241                 nioptr->offset = pga[mapped].disk_offset;
2242                 nioptr->len    = pga[mapped].count;
2243                 nioptr->flags  = pga[mapped].flag;
2244         }
2245
2246         size[1] = page_count * sizeof(*nioptr);
2247         request->rq_replen = lustre_msg_size(2, size);
2248
2249         rc = ptlrpc_queue_wait(request);
2250         if (rc)
2251                 GOTO(out_req, rc);
2252
2253         swab = lustre_msg_swabbed (request->rq_repmsg);
2254         LASSERT_REPSWAB (request, 1);
2255         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2256         if (!nioptr) {
2257                 CERROR("absent/short niobuf array\n");
2258                 GOTO(out_req, rc = -EPROTO);
2259         }
2260
2261         /* actual write */
2262         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2263                 struct page *page = pga[mapped].pg;
2264                 struct buffer_head *bh;
2265                 kdev_t dev;
2266
2267                 if (swab)
2268                         lustre_swab_niobuf_remote (nioptr);
2269
2270                 /* got san device associated */
2271                 LASSERT(exp->exp_obd != NULL);
2272                 dev = exp->exp_obd->u.cli.cl_sandev;
2273
2274                 if (!page->buffers) {
2275                         create_empty_buffers(page, dev, PAGE_SIZE);
2276                 } else {
2277                         /* checking */
2278                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2279                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2280                         LASSERT(page->buffers->b_blocknr ==
2281                                 (unsigned long)nioptr->offset);
2282                 }
2283                 bh = page->buffers;
2284
2285                 LASSERT(bh);
2286
2287                 /* if buffer locked, wait it's io completion */
2288                 if (test_bit(BH_Lock, &bh->b_state))
2289                         wait_on_buffer(bh);
2290
2291                 clear_bit(BH_New, &bh->b_state);
2292                 set_bit(BH_Mapped, &bh->b_state);
2293
2294                 /* override the block nr */
2295                 bh->b_blocknr = (unsigned long)nioptr->offset;
2296
2297                 /* we are about to write it, so set it
2298                  * uptodate/dirty
2299                  * page lock should garentee no race condition here */
2300                 set_bit(BH_Uptodate, &bh->b_state);
2301                 set_bit(BH_Dirty, &bh->b_state);
2302
2303                 ll_rw_block(WRITE, 1, &bh);
2304
2305                 /* must do syncronous write here */
2306                 wait_on_buffer(bh);
2307                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2308                         /* I/O error */
2309                         rc = -EIO;
2310                         goto out_req;
2311                 }
2312         }
2313
2314 out_req:
2315         ptlrpc_req_finished(request);
2316         RETURN(rc);
2317 }
2318
2319 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2320                       struct lov_stripe_md *lsm, obd_count page_count,
2321                       struct brw_page *pga, struct obd_trans_info *oti)
2322 {
2323         ENTRY;
2324
2325         while (page_count) {
2326                 obd_count pages_per_brw;
2327                 int rc;
2328
2329                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2330                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2331                 else
2332                         pages_per_brw = page_count;
2333
2334                 if (cmd & OBD_BRW_WRITE)
2335                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2336                 else
2337                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2338
2339                 if (rc != 0)
2340                         RETURN(rc);
2341
2342                 page_count -= pages_per_brw;
2343                 pga += pages_per_brw;
2344         }
2345         RETURN(0);
2346 }
2347 #endif
2348 #endif
2349
2350 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
2351 {
2352         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2353
2354         if (lock == NULL) {
2355                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2356                 return;
2357         }
2358
2359         l_lock(&lock->l_resource->lr_namespace->ns_lock);
2360 #ifdef __KERNEL__
2361         if (lock->l_ast_data && lock->l_ast_data != data) {
2362                 struct inode *new_inode = data;
2363                 struct inode *old_inode = lock->l_ast_data;
2364                 if (!(old_inode->i_state & I_FREEING))
2365                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2366                 LASSERTF(old_inode->i_state & I_FREEING,
2367                          "Found existing inode %p/%lu/%u state %lu in lock: "
2368                          "setting data to %p/%lu/%u\n", old_inode,
2369                          old_inode->i_ino, old_inode->i_generation,
2370                          old_inode->i_state,
2371                          new_inode, new_inode->i_ino, new_inode->i_generation);
2372         }
2373 #endif
2374         lock->l_ast_data = data;
2375         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2376         LDLM_LOCK_PUT(lock);
2377 }
2378
2379 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2380                              ldlm_iterator_t replace, void *data)
2381 {
2382         struct ldlm_res_id res_id = { .name = {0} };
2383         struct obd_device *obd = class_exp2obd(exp);
2384
2385         res_id.name[0] = lsm->lsm_object_id;
2386         res_id.name[2] = lsm->lsm_object_gr;
2387         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2388         return 0;
2389 }
2390
2391 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2392                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2393                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2394                        void *data, __u32 lvb_len, void *lvb_swabber,
2395                        struct lustre_handle *lockh)
2396 {
2397         struct obd_device *obd = exp->exp_obd;
2398         struct ldlm_res_id res_id = { .name = {0} };
2399         struct ost_lvb lvb;
2400         struct ldlm_reply *rep;
2401         struct ptlrpc_request *req = NULL;
2402         int rc;
2403         ENTRY;
2404
2405         res_id.name[0] = lsm->lsm_object_id;
2406         res_id.name[2] = lsm->lsm_object_gr;
2407
2408         /* Filesystem lock extents are extended to page boundaries so that
2409          * dealing with the page cache is a little smoother.  */
2410         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2411         policy->l_extent.end |= ~PAGE_MASK;
2412
2413         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2414                 goto no_match;
2415
2416         /* Next, search for already existing extent locks that will cover us */
2417         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2418                              lockh);
2419         if (rc == 1) {
2420                 osc_set_data_with_check(lockh, data);
2421                 if (*flags & LDLM_FL_HAS_INTENT) {
2422                         /* I would like to be able to ASSERT here that rss <=
2423                          * kms, but I can't, for reasons which are explained in
2424                          * lov_enqueue() */
2425                 }
2426                 /* We already have a lock, and it's referenced */
2427                 RETURN(ELDLM_OK);
2428         }
2429
2430         /* If we're trying to read, we also search for an existing PW lock.  The
2431          * VFS and page cache already protect us locally, so lots of readers/
2432          * writers can share a single PW lock.
2433          *
2434          * There are problems with conversion deadlocks, so instead of
2435          * converting a read lock to a write lock, we'll just enqueue a new
2436          * one.
2437          *
2438          * At some point we should cancel the read lock instead of making them
2439          * send us a blocking callback, but there are problems with canceling
2440          * locks out from other users right now, too. */
2441
2442         if (mode == LCK_PR) {
2443                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2444                                      policy, LCK_PW, lockh);
2445                 if (rc == 1) {
2446                         /* FIXME: This is not incredibly elegant, but it might
2447                          * be more elegant than adding another parameter to
2448                          * lock_match.  I want a second opinion. */
2449                         ldlm_lock_addref(lockh, LCK_PR);
2450                         ldlm_lock_decref(lockh, LCK_PW);
2451                         osc_set_data_with_check(lockh, data);
2452                         RETURN(ELDLM_OK);
2453                 }
2454         }
2455         if (mode == LCK_PW) {
2456                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2457                                      policy, LCK_PR, lockh);
2458                 if (rc == 1) {
2459                         rc = ldlm_cli_convert(lockh, mode, flags);
2460                         if (!rc) {
2461                                 /* Update readers/writers accounting */
2462                                 ldlm_lock_addref(lockh, LCK_PW);
2463                                 ldlm_lock_decref(lockh, LCK_PR);
2464                                 osc_set_data_with_check(lockh, data);
2465                                 RETURN(ELDLM_OK);
2466                         }
2467                         /* If the conversion failed, we need to drop refcount
2468                            on matched lock before we get new one */
2469                         /* XXX Won't it save us some efforts if we cancel PR
2470                            lock here? We are going to take PW lock anyway and it
2471                            will invalidate PR lock */
2472                         ldlm_lock_decref(lockh, LCK_PR);
2473                         if (rc != EDEADLOCK) {
2474                                 RETURN(rc);
2475                         }
2476                 }
2477         }
2478
2479         if (mode == LCK_PW) {
2480                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2481                                      policy, LCK_PR, lockh);
2482                 if (rc == 1) {
2483                         rc = ldlm_cli_convert(lockh, mode, flags);
2484                         if (!rc) {
2485                                 /* Update readers/writers accounting */
2486                                 ldlm_lock_addref(lockh, LCK_PW);
2487                                 ldlm_lock_decref(lockh, LCK_PR);
2488                                 osc_set_data_with_check(lockh, data);
2489                                 RETURN(ELDLM_OK);
2490                         }
2491                         /* If the conversion failed, we need to drop refcount
2492                            on matched lock before we get new one */
2493                         /* XXX Won't it save us some efforts if we cancel PR
2494                            lock here? We are going to take PW lock anyway and it
2495                            will invalidate PR lock */
2496                         ldlm_lock_decref(lockh, LCK_PR);
2497                         if (rc != EDEADLOCK) {
2498                                 RETURN(rc);
2499                         }
2500                 }
2501         }
2502
2503  no_match:
2504         if (*flags & LDLM_FL_HAS_INTENT) {
2505                 int size[2] = {0, sizeof(struct ldlm_request)};
2506
2507                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2508                                       LDLM_ENQUEUE, 2, size, NULL);
2509                 if (req == NULL)
2510                         RETURN(-ENOMEM);
2511
2512                 size[0] = sizeof(*rep);
2513                 size[1] = sizeof(lvb);
2514                 req->rq_replen = lustre_msg_size(2, size);
2515         }
2516         rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2517                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2518                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2519         if (req != NULL) {
2520                 if (rc == ELDLM_LOCK_ABORTED) {
2521                         /* swabbed by ldlm_cli_enqueue() */
2522                         LASSERT_REPSWABBED(req, 0);
2523                         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2524                         LASSERT(rep != NULL);
2525                         if (rep->lock_policy_res1)
2526                                 rc = rep->lock_policy_res1;
2527                 }
2528                 ptlrpc_req_finished(req);
2529         }
2530
2531         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2532                 CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
2533                        lvb.lvb_size, lvb.lvb_blocks);
2534                 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2535                 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2536         }
2537
2538         RETURN(rc);
2539 }
2540
2541 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2542                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2543                      int *flags, void *data, struct lustre_handle *lockh)
2544 {
2545         struct ldlm_res_id res_id = { .name = {0} };
2546         struct obd_device *obd = exp->exp_obd;
2547         int rc;
2548         ENTRY;
2549
2550         res_id.name[0] = lsm->lsm_object_id;
2551         res_id.name[2] = lsm->lsm_object_gr;
2552
2553         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2554
2555         /* Filesystem lock extents are extended to page boundaries so that
2556          * dealing with the page cache is a little smoother */
2557         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2558         policy->l_extent.end |= ~PAGE_MASK;
2559
2560         /* Next, search for already existing extent locks that will cover us */
2561         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2562                              policy, mode, lockh);
2563         if (rc) {
2564                // if (!(*flags & LDLM_FL_TEST_LOCK))
2565                         osc_set_data_with_check(lockh, data);
2566                 RETURN(rc);
2567         }
2568         /* If we're trying to read, we also search for an existing PW lock.  The
2569          * VFS and page cache already protect us locally, so lots of readers/
2570          * writers can share a single PW lock. */
2571         if (mode == LCK_PR) {
2572                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2573                                      policy, LCK_PW, lockh);
2574                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2575                         /* FIXME: This is not incredibly elegant, but it might
2576                          * be more elegant than adding another parameter to
2577                          * lock_match.  I want a second opinion. */
2578                         osc_set_data_with_check(lockh, data);
2579                         ldlm_lock_addref(lockh, LCK_PR);
2580                         ldlm_lock_decref(lockh, LCK_PW);
2581                 }
2582         }
2583         RETURN(rc);
2584 }
2585
2586 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2587                       __u32 mode, struct lustre_handle *lockh)
2588 {
2589         ENTRY;
2590
2591         if (mode == LCK_GROUP)
2592                 ldlm_lock_decref_and_cancel(lockh, mode);
2593         else
2594                 ldlm_lock_decref(lockh, mode);
2595
2596         RETURN(0);
2597 }
2598
2599 static int osc_cancel_unused(struct obd_export *exp,
2600                              struct lov_stripe_md *lsm, int flags, void *opaque)
2601 {
2602         struct obd_device *obd = class_exp2obd(exp);
2603         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
2604
2605         if (lsm != NULL) {
2606                 res_id.name[0] = lsm->lsm_object_id;
2607                 res_id.name[2] = lsm->lsm_object_gr;
2608                 resp = &res_id;
2609         }
2610
2611         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2612 }
2613
2614 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2615                       unsigned long max_age)
2616 {
2617         struct obd_statfs *msfs;
2618         struct ptlrpc_request *request;
2619         int rc, size = sizeof(*osfs);
2620         ENTRY;
2621
2622         /* We could possibly pass max_age in the request (as an absolute
2623          * timestamp or a "seconds.usec ago") so the target can avoid doing
2624          * extra calls into the filesystem if that isn't necessary (e.g.
2625          * during mount that would help a bit).  Having relative timestamps
2626          * is not so great if request processing is slow, while absolute
2627          * timestamps are not ideal because they need time synchronization. */
2628         request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION,
2629                                   OST_STATFS, 0, NULL, NULL);
2630         if (!request)
2631                 RETURN(-ENOMEM);
2632
2633         request->rq_replen = lustre_msg_size(1, &size);
2634         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2635
2636         rc = ptlrpc_queue_wait(request);
2637         if (rc)
2638                 GOTO(out, rc);
2639
2640         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2641                                   lustre_swab_obd_statfs);
2642         if (msfs == NULL) {
2643                 CERROR("Can't unpack obd_statfs\n");
2644                 GOTO(out, rc = -EPROTO);
2645         }
2646
2647         memcpy(osfs, msfs, sizeof(*osfs));
2648
2649         EXIT;
2650  out:
2651         ptlrpc_req_finished(request);
2652         return rc;
2653 }
2654
2655 /* Retrieve object striping information.
2656  *
2657  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2658  * the maximum number of OST indices which will fit in the user buffer.
2659  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2660  */
2661 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2662 {
2663         struct lov_user_md lum, *lumk;
2664         int rc, lum_size;
2665         ENTRY;
2666
2667         if (!lsm)
2668                 RETURN(-ENODATA);
2669
2670         rc = copy_from_user(&lum, lump, sizeof(lum));
2671         if (rc)
2672                 RETURN(-EFAULT);
2673
2674         if (lum.lmm_magic != LOV_USER_MAGIC)
2675                 RETURN(-EINVAL);
2676
2677         if (lum.lmm_stripe_count > 0) {
2678                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2679                 OBD_ALLOC(lumk, lum_size);
2680                 if (!lumk)
2681                         RETURN(-ENOMEM);
2682
2683                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2684                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
2685         } else {
2686                 lum_size = sizeof(lum);
2687                 lumk = &lum;
2688         }
2689
2690         lumk->lmm_object_id = lsm->lsm_object_id;
2691         lumk->lmm_object_gr = lsm->lsm_object_gr;
2692         lumk->lmm_stripe_count = 1;
2693
2694         if (copy_to_user(lump, lumk, lum_size))
2695                 rc = -EFAULT;
2696
2697         if (lumk != &lum)
2698                 OBD_FREE(lumk, lum_size);
2699
2700         RETURN(rc);
2701 }
2702
2703 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2704                          void *karg, void *uarg)
2705 {
2706         struct obd_device *obd = exp->exp_obd;
2707         struct obd_ioctl_data *data = karg;
2708         int err = 0;
2709         ENTRY;
2710
2711 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2712         MOD_INC_USE_COUNT;
2713 #else
2714         if (!try_module_get(THIS_MODULE)) {
2715                 CERROR("Can't get module. Is it alive?");
2716                 return -EINVAL;
2717         }
2718 #endif
2719         switch (cmd) {
2720         case OBD_IOC_LOV_GET_CONFIG: {
2721                 char *buf;
2722                 struct lov_desc *desc;
2723                 struct obd_uuid uuid;
2724
2725                 buf = NULL;
2726                 len = 0;
2727                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2728                         GOTO(out, err = -EINVAL);
2729
2730                 data = (struct obd_ioctl_data *)buf;
2731
2732                 if (sizeof(*desc) > data->ioc_inllen1) {
2733                         OBD_FREE(buf, len);
2734                         GOTO(out, err = -EINVAL);
2735                 }
2736
2737                 if (data->ioc_inllen2 < sizeof(uuid)) {
2738                         OBD_FREE(buf, len);
2739                         GOTO(out, err = -EINVAL);
2740                 }
2741
2742                 if (data->ioc_inllen3 < sizeof(__u32)) {
2743                         OBD_FREE(buf, len);
2744                         GOTO(out, err = -EINVAL);
2745                 }
2746
2747                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2748                 desc->ld_tgt_count = 1;
2749                 desc->ld_active_tgt_count = 1;
2750                 desc->ld_default_stripe_count = 1;
2751                 desc->ld_default_stripe_size = 0;
2752                 desc->ld_default_stripe_offset = 0;
2753                 desc->ld_pattern = 0;
2754                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2755                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2756                 *((__u32 *)data->ioc_inlbuf3) = 1;
2757
2758                 err = copy_to_user((void *)uarg, buf, len);
2759                 if (err)
2760                         err = -EFAULT;
2761                 obd_ioctl_freedata(buf, len);
2762                 GOTO(out, err);
2763         }
2764         case LL_IOC_LOV_SETSTRIPE:
2765                 err = obd_alloc_memmd(exp, karg);
2766                 if (err > 0)
2767                         err = 0;
2768                 GOTO(out, err);
2769         case LL_IOC_LOV_GETSTRIPE:
2770                 err = osc_getstripe(karg, uarg);
2771                 GOTO(out, err);
2772         case OBD_IOC_CLIENT_RECOVER:
2773                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2774                                             data->ioc_inlbuf1);
2775                 if (err > 0)
2776                         err = 0;
2777                 GOTO(out, err);
2778         case IOC_OSC_SET_ACTIVE:
2779                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2780                                                data->ioc_offset);
2781                 GOTO(out, err);
2782         case IOC_OSC_CTL_RECOVERY:
2783                 err = ptlrpc_import_control_recovery(obd->u.cli.cl_import,
2784                                                      data->ioc_offset);
2785                 GOTO(out, err);
2786         default:
2787                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm);
2788                 GOTO(out, err = -ENOTTY);
2789         }
2790 out:
2791 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2792         MOD_DEC_USE_COUNT;
2793 #else
2794         module_put(THIS_MODULE);
2795 #endif
2796         return err;
2797 }
2798
2799 static int osc_get_info(struct obd_export *exp, obd_count keylen,
2800                         void *key, __u32 *vallen, void *val)
2801 {
2802         ENTRY;
2803         if (!vallen || !val)
2804                 RETURN(-EFAULT);
2805
2806         if (keylen > strlen("lock_to_stripe") &&
2807             strcmp(key, "lock_to_stripe") == 0) {
2808                 __u32 *stripe = val;
2809                 *vallen = sizeof(*stripe);
2810                 *stripe = 0;
2811                 RETURN(0);
2812         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2813                 struct ptlrpc_request *req;
2814                 obd_id *reply;
2815                 char *bufs[1] = {key};
2816                 int rc;
2817                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
2818                                       OST_GET_INFO, 1, &keylen, bufs);
2819                 if (req == NULL)
2820                         RETURN(-ENOMEM);
2821
2822                 req->rq_replen = lustre_msg_size(1, vallen);
2823                 rc = ptlrpc_queue_wait(req);
2824                 if (rc)
2825                         GOTO(out, rc);
2826
2827                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
2828                                            lustre_swab_ost_last_id);
2829                 if (reply == NULL) {
2830                         CERROR("Can't unpack OST last ID\n");
2831                         GOTO(out, rc = -EPROTO);
2832                 }
2833                 *((obd_id *)val) = *reply;
2834         out:
2835                 ptlrpc_req_finished(req);
2836                 RETURN(rc);
2837         }
2838         RETURN(-EPROTO);
2839 }
2840
2841 static int osc_set_info(struct obd_export *exp, obd_count keylen,
2842                         void *key, obd_count vallen, void *val)
2843 {
2844         struct obd_device  *obd = exp->exp_obd;
2845         struct obd_import *imp = class_exp2cliimp(exp);
2846         struct llog_ctxt *ctxt;
2847         int rc = 0;
2848         ENTRY;
2849
2850         if (keylen == strlen("next_id") &&
2851             memcmp(key, "next_id", strlen("next_id")) == 0) {
2852                 if (vallen != sizeof(obd_id))
2853                         RETURN(-EINVAL);
2854                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
2855                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
2856                        exp->exp_obd->obd_name,
2857                        obd->u.cli.cl_oscc.oscc_next_id);
2858
2859                 RETURN(0);
2860         }
2861
2862         if (keylen == strlen("growth_count") &&
2863             memcmp(key, "growth_count", strlen("growth_count")) == 0) {
2864                 if (vallen != sizeof(int))
2865                         RETURN(-EINVAL);
2866                 obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val);
2867                 RETURN(0);
2868         }
2869
2870         if (keylen == strlen("unlinked") &&
2871             memcmp(key, "unlinked", keylen) == 0) {
2872                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2873                 spin_lock(&oscc->oscc_lock);
2874                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
2875                 spin_unlock(&oscc->oscc_lock);
2876                 RETURN(0);
2877         }
2878         if (keylen == strlen("unrecovery") &&
2879             memcmp(key, "unrecovery", keylen) == 0) {
2880                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2881                 spin_lock(&oscc->oscc_lock);
2882                 oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
2883                 spin_unlock(&oscc->oscc_lock);
2884                 RETURN(0);
2885         }
2886         if (keylen == strlen("initial_recov") &&
2887             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
2888                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2889                 if (vallen != sizeof(int))
2890                         RETURN(-EINVAL);
2891                 imp->imp_initial_recov = *(int *)val;
2892                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
2893                        exp->exp_obd->obd_name,
2894                        imp->imp_initial_recov);
2895                 RETURN(0);
2896         }
2897
2898         if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
2899                 struct client_obd *cl = &obd->u.cli;
2900                 if (vallen != sizeof(int))
2901                         RETURN(-EINVAL);
2902                 cl->cl_async = *(int *)val;
2903                 CDEBUG(D_HA, "%s: set async = %d\n",
2904                        obd->obd_name, cl->cl_async);
2905                 RETURN(0);
2906         }
2907
2908         if (keylen == strlen("sec") && memcmp(key, "sec", keylen) == 0) {
2909                 struct client_obd *cli = &exp->exp_obd->u.cli;
2910
2911                 if (vallen == strlen("null") &&
2912                     memcmp(val, "null", vallen) == 0) {
2913                         cli->cl_sec_flavor = PTLRPC_SEC_NULL;
2914                         cli->cl_sec_subflavor = 0;
2915                         RETURN(0);
2916                 }
2917                 if (vallen == strlen("krb5i") &&
2918                     memcmp(val, "krb5i", vallen) == 0) {
2919                         cli->cl_sec_flavor = PTLRPC_SEC_GSS;
2920                         cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5I;
2921                         RETURN(0);
2922                 }
2923                 if (vallen == strlen("krb5p") &&
2924                     memcmp(val, "krb5p", vallen) == 0) {
2925                         cli->cl_sec_flavor = PTLRPC_SEC_GSS;
2926                         cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5P;
2927                         RETURN(0);
2928                 }
2929                 CERROR("unrecognized security type %s\n", (char*) val);
2930                 RETURN(-EINVAL);
2931         }
2932
2933         if (keylen < strlen("mds_conn") || memcmp(key, "mds_conn", keylen) != 0)
2934                 RETURN(-EINVAL);
2935
2936         ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
2937         if (ctxt) {
2938                 if (rc == 0)
2939                         rc = llog_initiator_connect(ctxt);
2940                 else
2941                         CERROR("cannot establish the connect for ctxt %p: %d\n",
2942                                ctxt, rc);
2943         }
2944
2945         imp->imp_server_timeout = 1;
2946         CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
2947         imp->imp_pingable = 1;
2948
2949         RETURN(rc);
2950 }
2951
2952
2953 static struct llog_operations osc_size_repl_logops = {
2954         lop_cancel: llog_obd_repl_cancel
2955 };
2956
2957 static struct llog_operations osc_unlink_orig_logops;
2958 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
2959                          struct obd_device *tgt, int count,
2960                          struct llog_catid *catid)
2961 {
2962         int rc;
2963         ENTRY;
2964
2965         osc_unlink_orig_logops = llog_lvfs_ops;
2966         osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup;
2967         osc_unlink_orig_logops.lop_cleanup = llog_catalog_cleanup;
2968         osc_unlink_orig_logops.lop_add = llog_catalog_add;
2969         osc_unlink_orig_logops.lop_connect = llog_origin_connect;
2970
2971         rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_ORIG_CTXT, tgt, count,
2972                             &catid->lci_logid, &osc_unlink_orig_logops);
2973         if (rc)
2974                 RETURN(rc);
2975
2976         rc = obd_llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
2977                             &osc_size_repl_logops);
2978         RETURN(rc);
2979 }
2980
2981 static int osc_llog_finish(struct obd_device *obd,
2982                            struct obd_llogs *llogs, int count)
2983 {
2984         int rc;
2985         ENTRY;
2986
2987         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT));
2988         if (rc)
2989                 RETURN(rc);
2990
2991         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_REPL_CTXT));
2992         RETURN(rc);
2993 }
2994
2995
2996 static int osc_connect(struct lustre_handle *exph,
2997                        struct obd_device *obd, struct obd_uuid *cluuid,
2998                        struct obd_connect_data *data,
2999                        unsigned long connect_flags)
3000 {
3001         int rc;
3002         ENTRY;
3003         rc = client_connect_import(exph, obd, cluuid, data, connect_flags);
3004         RETURN(rc);
3005 }
3006
3007 static int osc_disconnect(struct obd_export *exp, unsigned long flags)
3008 {
3009         struct obd_device *obd = class_exp2obd(exp);
3010         struct llog_ctxt *ctxt;
3011         int rc;
3012         ENTRY;
3013
3014         ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT);
3015         if (obd->u.cli.cl_conn_count == 1)
3016                 /* flush any remaining cancel messages out to the target */
3017                 llog_sync(ctxt, exp);
3018
3019         rc = client_disconnect_export(exp, flags);
3020         RETURN(rc);
3021 }
3022
3023 static int osc_import_event(struct obd_device *obd,
3024                             struct obd_import *imp, 
3025                             enum obd_import_event event)
3026 {
3027         struct client_obd *cli;
3028         int rc = 0;
3029
3030         LASSERT(imp->imp_obd == obd);
3031
3032         switch (event) {
3033         case IMP_EVENT_DISCON: {
3034                 /* Only do this on the MDS OSC's */
3035                 if (imp->imp_server_timeout) {
3036                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3037                         
3038                         spin_lock(&oscc->oscc_lock);
3039                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3040                         spin_unlock(&oscc->oscc_lock);
3041                 }
3042                 break;
3043         }
3044         case IMP_EVENT_INACTIVE: {
3045                 if (obd->obd_observer)
3046                         rc = obd_notify(obd->obd_observer, obd, 0, 0);
3047                 break;
3048         }
3049         case IMP_EVENT_INVALIDATE: {
3050                 struct ldlm_namespace *ns = obd->obd_namespace;
3051
3052                 /* Reset grants */
3053                 cli = &obd->u.cli;
3054                 spin_lock(&cli->cl_loi_list_lock);
3055                 cli->cl_avail_grant = 0;
3056                 cli->cl_lost_grant = 0;
3057                 /* all pages go to failing rpcs due to the invalid import */
3058                 osc_check_rpcs(cli);
3059                 spin_unlock(&cli->cl_loi_list_lock);
3060                 
3061                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3062
3063                 break;
3064         }
3065         case IMP_EVENT_ACTIVE: {
3066                 /* Only do this on the MDS OSC's */
3067                 if (imp->imp_server_timeout) {
3068                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3069
3070                         spin_lock(&oscc->oscc_lock);
3071                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3072                         spin_unlock(&oscc->oscc_lock);
3073                 }
3074
3075                 if (obd->obd_observer)
3076                         rc = obd_notify(obd->obd_observer, obd, 1, 0);
3077                 break;
3078         }
3079         default:
3080                 CERROR("Unknown import event %d\n", event);
3081                 LBUG();
3082         }
3083         RETURN(rc);
3084 }
3085
3086 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
3087 {
3088         struct lprocfs_static_vars lvars;
3089         int rc;
3090         ENTRY;
3091
3092         lprocfs_init_vars(osc,&lvars);
3093         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
3094         if (rc < 0)
3095                 RETURN(rc);
3096
3097         rc = lproc_osc_attach_seqstat(dev);
3098         if (rc < 0) {
3099                 lprocfs_obd_detach(dev);
3100                 RETURN(rc);
3101         }
3102
3103         ptlrpc_lprocfs_register_obd(dev);
3104         RETURN(0);
3105 }
3106
3107 static int osc_detach(struct obd_device *dev)
3108 {
3109         ptlrpc_lprocfs_unregister_obd(dev);
3110         return lprocfs_obd_detach(dev);
3111 }
3112
3113 static int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3114 {
3115         int rc;
3116         ENTRY;
3117         rc = ptlrpcd_addref();
3118         if (rc)
3119                 RETURN(rc);
3120
3121         rc = client_obd_setup(obd, len, buf);
3122         if (rc)
3123                 ptlrpcd_decref();
3124         else
3125                 oscc_init(obd);
3126
3127         RETURN(rc);
3128 }
3129
3130 static int osc_cleanup(struct obd_device *obd, int flags)
3131 {
3132         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3133         int rc;
3134
3135         rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
3136                                     LDLM_FL_CONFIG_CHANGE, NULL);
3137         if (rc)
3138                 RETURN(rc);
3139
3140         spin_lock(&oscc->oscc_lock);
3141         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3142         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3143         spin_unlock(&oscc->oscc_lock);
3144
3145         rc = client_obd_cleanup(obd, flags);
3146         ptlrpcd_decref();
3147         RETURN(rc);
3148 }
3149
3150 struct obd_ops osc_obd_ops = {
3151         .o_owner                = THIS_MODULE,
3152         .o_attach               = osc_attach,
3153         .o_detach               = osc_detach,
3154         .o_setup                = osc_setup,
3155         .o_cleanup              = osc_cleanup,
3156         .o_add_conn             = client_import_add_conn,
3157         .o_del_conn             = client_import_del_conn,
3158         .o_connect              = osc_connect,
3159         .o_disconnect           = osc_disconnect,
3160         .o_statfs               = osc_statfs,
3161         .o_packmd               = osc_packmd,
3162         .o_unpackmd             = osc_unpackmd,
3163         .o_create               = osc_create,
3164         .o_destroy              = osc_destroy,
3165         .o_getattr              = osc_getattr,
3166         .o_getattr_async        = osc_getattr_async,
3167         .o_setattr              = osc_setattr,
3168         .o_brw                  = osc_brw,
3169         .o_brw_async            = osc_brw_async,
3170         .o_prep_async_page      = osc_prep_async_page,
3171         .o_queue_async_io       = osc_queue_async_io,
3172         .o_set_async_flags      = osc_set_async_flags,
3173         .o_queue_group_io       = osc_queue_group_io,
3174         .o_trigger_group_io     = osc_trigger_group_io,
3175         .o_teardown_async_page  = osc_teardown_async_page,
3176         .o_punch                = osc_punch,
3177         .o_sync                 = osc_sync,
3178         .o_enqueue              = osc_enqueue,
3179         .o_match                = osc_match,
3180         .o_change_cbdata        = osc_change_cbdata,
3181         .o_cancel               = osc_cancel,
3182         .o_cancel_unused        = osc_cancel_unused,
3183         .o_iocontrol            = osc_iocontrol,
3184         .o_get_info             = osc_get_info,
3185         .o_set_info             = osc_set_info,
3186         .o_import_event         = osc_import_event,
3187         .o_llog_init            = osc_llog_init,
3188         .o_llog_finish          = osc_llog_finish,
3189 };
3190
3191 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3192 struct obd_ops sanosc_obd_ops = {
3193         .o_owner                = THIS_MODULE,
3194         .o_attach               = osc_attach,
3195         .o_detach               = osc_detach,
3196         .o_cleanup              = client_obd_cleanup,
3197         .o_add_conn             = client_import_add_conn,
3198         .o_del_conn             = client_import_del_conn,
3199         .o_connect              = osc_connect,
3200         .o_disconnect           = client_disconnect_export,
3201         .o_statfs               = osc_statfs,
3202         .o_packmd               = osc_packmd,
3203         .o_unpackmd             = osc_unpackmd,
3204         .o_create               = osc_real_create,
3205         .o_destroy              = osc_destroy,
3206         .o_getattr              = osc_getattr,
3207         .o_getattr_async        = osc_getattr_async,
3208         .o_setattr              = osc_setattr,
3209         .o_setup                = client_sanobd_setup,
3210         .o_brw                  = sanosc_brw,
3211         .o_punch                = osc_punch,
3212         .o_sync                 = osc_sync,
3213         .o_enqueue              = osc_enqueue,
3214         .o_match                = osc_match,
3215         .o_change_cbdata        = osc_change_cbdata,
3216         .o_cancel               = osc_cancel,
3217         .o_cancel_unused        = osc_cancel_unused,
3218         .o_iocontrol            = osc_iocontrol,
3219         .o_import_event         = osc_import_event,
3220         .o_llog_init            = osc_llog_init,
3221         .o_llog_finish          = osc_llog_finish,
3222 };
3223 #endif
3224
3225 int __init osc_init(void)
3226 {
3227         struct lprocfs_static_vars lvars;
3228 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3229         struct lprocfs_static_vars sanlvars;
3230 #endif
3231         int rc;
3232         ENTRY;
3233
3234         lprocfs_init_vars(osc, &lvars);
3235 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3236         lprocfs_init_vars(osc, &sanlvars);
3237 #endif
3238
3239         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3240                                  LUSTRE_OSC_NAME);
3241         if (rc)
3242                 RETURN(rc);
3243
3244 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3245         rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars,
3246                                  LUSTRE_SANOSC_NAME);
3247         if (rc)
3248                 class_unregister_type(LUSTRE_OSC_NAME);
3249 #endif
3250
3251         RETURN(rc);
3252 }
3253
3254 #ifdef __KERNEL__
3255 static void /*__exit*/ osc_exit(void)
3256 {
3257 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3258         class_unregister_type(LUSTRE_SANOSC_NAME);
3259 #endif
3260         class_unregister_type(LUSTRE_OSC_NAME);
3261 }
3262
3263 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3264 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3265 MODULE_LICENSE("GPL");
3266
3267 module_init(osc_init);
3268 module_exit(osc_exit);
3269 #endif