Whamcloud - gitweb
ded86b3d9b261f26df1b4d5ddfb98bb3fdcb46a5
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_OSC
33
34 #ifdef __KERNEL__
35 # include <linux/version.h>
36 # include <linux/module.h>
37 # include <linux/mm.h>
38 # include <linux/highmem.h>
39 # include <linux/ctype.h>
40 # include <linux/init.h>
41 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 #  include <linux/workqueue.h>
43 #  include <linux/smp_lock.h>
44 # else
45 #  include <linux/locks.h>
46 # endif
47 #else /* __KERNEL__ */
48 # include <liblustre.h>
49 #endif
50
51 # include <linux/lustre_dlm.h>
52 #include <linux/kp30.h>
53 #include <linux/lustre_net.h>
54 #include <lustre/lustre_user.h>
55 #include <linux/obd_ost.h>
56 #include <linux/obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <linux/lustre_ha.h>
63 #include <linux/lprocfs_status.h>
64 #include <linux/lustre_log.h>
65 #include "osc_internal.h"
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72         ENTRY;
73
74         lmm_size = sizeof(**lmmp);
75         if (!lmmp)
76                 RETURN(lmm_size);
77
78         if (*lmmp && !lsm) {
79                 OBD_FREE(*lmmp, lmm_size);
80                 *lmmp = NULL;
81                 RETURN(0);
82         }
83
84         if (!*lmmp) {
85                 OBD_ALLOC(*lmmp, lmm_size);
86                 if (!*lmmp)
87                         RETURN(-ENOMEM);
88         }
89
90         if (lsm) {
91                 LASSERT(lsm->lsm_object_id);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93         }
94
95         RETURN(lmm_size);
96 }
97
98 /* Unpack OSC object metadata from disk storage (LE byte order). */
99 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100                         struct lov_mds_md *lmm, int lmm_bytes)
101 {
102         int lsm_size;
103         ENTRY;
104
105         if (lmm != NULL) {
106                 if (lmm_bytes < sizeof (*lmm)) {
107                         CERROR("lov_mds_md too small: %d, need %d\n",
108                                lmm_bytes, (int)sizeof(*lmm));
109                         RETURN(-EINVAL);
110                 }
111                 /* XXX LOV_MAGIC etc check? */
112
113                 if (lmm->lmm_object_id == 0) {
114                         CERROR("lov_mds_md: zero lmm_object_id\n");
115                         RETURN(-EINVAL);
116                 }
117         }
118
119         lsm_size = lov_stripe_md_size(1);
120         if (lsmp == NULL)
121                 RETURN(lsm_size);
122
123         if (*lsmp != NULL && lmm == NULL) {
124                 OBD_FREE(*lsmp, lsm_size);
125                 *lsmp = NULL;
126                 RETURN(0);
127         }
128
129         if (*lsmp == NULL) {
130                 OBD_ALLOC(*lsmp, lsm_size);
131                 if (*lsmp == NULL)
132                         RETURN(-ENOMEM);
133                 loi_init((*lsmp)->lsm_oinfo);
134         }
135
136         if (lmm != NULL) {
137                 /* XXX zero *lsmp? */
138                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
139                 LASSERT((*lsmp)->lsm_object_id);
140         }
141
142         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
143
144         RETURN(lsm_size);
145 }
146
147 static int osc_getattr_interpret(struct ptlrpc_request *req,
148                                  struct osc_getattr_async_args *aa, int rc)
149 {
150         struct ost_body *body;
151         ENTRY;
152
153         if (rc != 0)
154                 RETURN(rc);
155
156         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
157         if (body) {
158                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
159                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
160
161                 /* This should really be sent by the OST */
162                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
163                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
164         } else {
165                 CERROR("can't unpack ost_body\n");
166                 rc = -EPROTO;
167                 aa->aa_oa->o_valid = 0;
168         }
169
170         RETURN(rc);
171 }
172
173 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
174                              struct lov_stripe_md *md,
175                              struct ptlrpc_request_set *set)
176 {
177         struct ptlrpc_request *request;
178         struct ost_body *body;
179         int size = sizeof(*body);
180         struct osc_getattr_async_args *aa;
181         ENTRY;
182
183         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
184                                   &size, NULL);
185         if (!request)
186                 RETURN(-ENOMEM);
187
188         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
189         memcpy(&body->oa, oa, sizeof(*oa));
190
191         request->rq_replen = lustre_msg_size(1, &size);
192         request->rq_interpret_reply = osc_getattr_interpret;
193
194         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
195         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
196         aa->aa_oa = oa;
197
198         ptlrpc_set_add_req (set, request);
199         RETURN (0);
200 }
201
202 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
203                        struct lov_stripe_md *md)
204 {
205         struct ptlrpc_request *request;
206         struct ost_body *body;
207         int rc, size = sizeof(*body);
208         ENTRY;
209
210         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1,
211                                   &size, NULL);
212         if (!request)
213                 RETURN(-ENOMEM);
214
215         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
216         memcpy(&body->oa, oa, sizeof(*oa));
217
218         request->rq_replen = lustre_msg_size(1, &size);
219
220         rc = ptlrpc_queue_wait(request);
221         if (rc) {
222                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
223                 GOTO(out, rc);
224         }
225
226         body = lustre_swab_repbuf(request, 0, sizeof (*body),
227                                   lustre_swab_ost_body);
228         if (body == NULL) {
229                 CERROR ("can't unpack ost_body\n");
230                 GOTO (out, rc = -EPROTO);
231         }
232
233         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
234         memcpy(oa, &body->oa, sizeof(*oa));
235
236         /* This should really be sent by the OST */
237         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
238         oa->o_valid |= OBD_MD_FLBLKSZ;
239
240         EXIT;
241  out:
242         ptlrpc_req_finished(request);
243         return rc;
244 }
245
246 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
247                        struct lov_stripe_md *md, struct obd_trans_info *oti)
248 {
249         struct ptlrpc_request *request;
250         struct ost_body *body;
251         int rc, size = sizeof(*body);
252         ENTRY;
253
254         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1, &size,
255                                   NULL);
256         if (!request)
257                 RETURN(-ENOMEM);
258
259         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
260         memcpy(&body->oa, oa, sizeof(*oa));
261
262         request->rq_replen = lustre_msg_size(1, &size);
263
264         rc = ptlrpc_queue_wait(request);
265         if (rc)
266                 GOTO(out, rc);
267
268         body = lustre_swab_repbuf(request, 0, sizeof(*body),
269                                   lustre_swab_ost_body);
270         if (body == NULL)
271                 GOTO(out, rc = -EPROTO);
272
273         memcpy(oa, &body->oa, sizeof(*oa));
274
275         EXIT;
276 out:
277         ptlrpc_req_finished(request);
278         RETURN(0);
279 }
280
281 int osc_real_create(struct obd_export *exp, struct obdo *oa,
282                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *request;
285         struct ost_body *body;
286         struct lov_stripe_md *lsm;
287         int rc, size = sizeof(*body);
288         ENTRY;
289
290         LASSERT(oa);
291         LASSERT(ea);
292
293         lsm = *ea;
294         if (!lsm) {
295                 rc = obd_alloc_memmd(exp, &lsm);
296                 if (rc < 0)
297                         RETURN(rc);
298         }
299
300         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE, 1, &size,
301                                   NULL);
302         if (!request)
303                 GOTO(out, rc = -ENOMEM);
304
305         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
306         memcpy(&body->oa, oa, sizeof(body->oa));
307
308         request->rq_replen = lustre_msg_size(1, &size);
309         if (oa->o_valid & OBD_MD_FLINLINE) {
310                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
311                         oa->o_flags == OBD_FL_DELORPHAN);
312                 DEBUG_REQ(D_HA, request,
313                           "delorphan from OST integration");
314                 /* Don't resend the delorphan request */
315                 request->rq_no_resend = request->rq_no_delay = 1;
316         }
317
318         rc = ptlrpc_queue_wait(request);
319         if (rc)
320                 GOTO(out_req, rc);
321
322         body = lustre_swab_repbuf(request, 0, sizeof(*body),
323                                   lustre_swab_ost_body);
324         if (body == NULL) {
325                 CERROR ("can't unpack ost_body\n");
326                 GOTO (out_req, rc = -EPROTO);
327         }
328
329         memcpy(oa, &body->oa, sizeof(*oa));
330
331         /* This should really be sent by the OST */
332         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
333         oa->o_valid |= OBD_MD_FLBLKSZ;
334
335         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
336          * have valid lsm_oinfo data structs, so don't go touching that.
337          * This needs to be fixed in a big way.
338          */
339         lsm->lsm_object_id = oa->o_id;
340         *ea = lsm;
341
342         if (oti != NULL) {
343                 oti->oti_transno = request->rq_repmsg->transno;
344
345                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
346                         if (!oti->oti_logcookies)
347                                 oti_alloc_cookies(oti, 1);
348                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
349                                sizeof(oti->oti_onecookie));
350                 }
351         }
352
353         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
354         EXIT;
355 out_req:
356         ptlrpc_req_finished(request);
357 out:
358         if (rc && !*ea)
359                 obd_free_memmd(exp, &lsm);
360         return rc;
361 }
362
363 static int osc_punch(struct obd_export *exp, struct obdo *oa,
364                      struct lov_stripe_md *md, obd_size start,
365                      obd_size end, struct obd_trans_info *oti)
366 {
367         struct ptlrpc_request *request;
368         struct ost_body *body;
369         int rc, size = sizeof(*body);
370         ENTRY;
371
372         if (!oa) {
373                 CERROR("oa NULL\n");
374                 RETURN(-EINVAL);
375         }
376
377         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_PUNCH, 1, &size,
378                                   NULL);
379         if (!request)
380                 RETURN(-ENOMEM);
381
382         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
383         memcpy(&body->oa, oa, sizeof(*oa));
384
385         /* overload the size and blocks fields in the oa with start/end */
386         body->oa.o_size = start;
387         body->oa.o_blocks = end;
388         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
389
390         request->rq_replen = lustre_msg_size(1, &size);
391
392         rc = ptlrpc_queue_wait(request);
393         if (rc)
394                 GOTO(out, rc);
395
396         body = lustre_swab_repbuf (request, 0, sizeof (*body),
397                                    lustre_swab_ost_body);
398         if (body == NULL) {
399                 CERROR ("can't unpack ost_body\n");
400                 GOTO (out, rc = -EPROTO);
401         }
402
403         memcpy(oa, &body->oa, sizeof(*oa));
404
405         EXIT;
406  out:
407         ptlrpc_req_finished(request);
408         return rc;
409 }
410
411 static int osc_sync(struct obd_export *exp, struct obdo *oa,
412                     struct lov_stripe_md *md, obd_size start, obd_size end)
413 {
414         struct ptlrpc_request *request;
415         struct ost_body *body;
416         int rc, size = sizeof(*body);
417         ENTRY;
418
419         if (!oa) {
420                 CERROR("oa NULL\n");
421                 RETURN(-EINVAL);
422         }
423
424         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SYNC, 1, &size,
425                                   NULL);
426         if (!request)
427                 RETURN(-ENOMEM);
428
429         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
430         memcpy(&body->oa, oa, sizeof(*oa));
431
432         /* overload the size and blocks fields in the oa with start/end */
433         body->oa.o_size = start;
434         body->oa.o_blocks = end;
435         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
436
437         request->rq_replen = lustre_msg_size(1, &size);
438
439         rc = ptlrpc_queue_wait(request);
440         if (rc)
441                 GOTO(out, rc);
442
443         body = lustre_swab_repbuf(request, 0, sizeof(*body),
444                                   lustre_swab_ost_body);
445         if (body == NULL) {
446                 CERROR ("can't unpack ost_body\n");
447                 GOTO (out, rc = -EPROTO);
448         }
449
450         memcpy(oa, &body->oa, sizeof(*oa));
451
452         EXIT;
453  out:
454         ptlrpc_req_finished(request);
455         return rc;
456 }
457
458 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
459                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
460 {
461         struct ptlrpc_request *request;
462         struct ost_body *body;
463         int rc, size = sizeof(*body);
464         ENTRY;
465
466         if (!oa) {
467                 CERROR("oa NULL\n");
468                 RETURN(-EINVAL);
469         }
470
471         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_DESTROY, 1,
472                                   &size, NULL);
473         if (!request)
474                 RETURN(-ENOMEM);
475
476         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
477
478         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
479                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
480                        sizeof(*oti->oti_logcookies));
481                 oti->oti_logcookies++;
482         }
483
484         memcpy(&body->oa, oa, sizeof(*oa));
485         request->rq_replen = lustre_msg_size(1, &size);
486
487         rc = ptlrpc_queue_wait(request);
488         if (rc)
489                 GOTO(out, rc);
490
491         body = lustre_swab_repbuf(request, 0, sizeof(*body),
492                                   lustre_swab_ost_body);
493         if (body == NULL) {
494                 CERROR ("Can't unpack body\n");
495                 GOTO (out, rc = -EPROTO);
496         }
497
498         memcpy(oa, &body->oa, sizeof(*oa));
499
500         EXIT;
501  out:
502         ptlrpc_req_finished(request);
503         return rc;
504 }
505
506 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
507                                 long writing_bytes)
508 {
509         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
510
511         LASSERT(!(oa->o_valid & bits));
512
513         oa->o_valid |= bits;
514         spin_lock(&cli->cl_loi_list_lock);
515         oa->o_dirty = cli->cl_dirty;
516         oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
517         oa->o_grant = cli->cl_avail_grant;
518         oa->o_dropped = cli->cl_lost_grant;
519         cli->cl_lost_grant = 0;
520         spin_unlock(&cli->cl_loi_list_lock);
521         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
522                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
523 }
524
525 /* caller must hold loi_list_lock */
526 static void osc_consume_write_grant(struct client_obd *cli,
527                                     struct osc_async_page *oap)
528 {
529         cli->cl_dirty += PAGE_SIZE;
530         cli->cl_avail_grant -= PAGE_SIZE;
531         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
532         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
533         LASSERT(cli->cl_avail_grant >= 0);
534 }
535
536 /* caller must hold loi_list_lock */
537 void osc_wake_cache_waiters(struct client_obd *cli)
538 {
539         struct list_head *l, *tmp;
540         struct osc_cache_waiter *ocw;
541
542         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
543                 /* if we can't dirty more, we must wait until some is written */
544                 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
545                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
546                                cli->cl_dirty, cli->cl_dirty_max);
547                         return;
548                 }
549
550                 /* if still dirty cache but no grant wait for pending RPCs that
551                  * may yet return us some grant before doing sync writes */
552                 if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
553                         CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n",
554                                cli->cl_brw_in_flight);
555                         return;
556                 }
557
558                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
559                 list_del_init(&ocw->ocw_entry);
560                 if (cli->cl_avail_grant < PAGE_SIZE) {
561                         /* no more RPCs in flight to return grant, do sync IO */
562                         ocw->ocw_rc = -EDQUOT;
563                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
564                 } else {
565                         osc_consume_write_grant(cli, ocw->ocw_oap);
566                 }
567
568                 wake_up(&ocw->ocw_waitq);
569         }
570
571         EXIT;
572 }
573
574 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
575 {
576         spin_lock(&cli->cl_loi_list_lock);
577         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
578         cli->cl_avail_grant += body->oa.o_grant;
579         /* waiters are woken in brw_interpret_oap */
580         spin_unlock(&cli->cl_loi_list_lock);
581 }
582
583 /* We assume that the reason this OSC got a short read is because it read
584  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
585  * via the LOV, and it _knows_ it's reading inside the file, it's just that
586  * this stripe never got written at or beyond this stripe offset yet. */
587 static void handle_short_read(int nob_read, obd_count page_count,
588                               struct brw_page *pga)
589 {
590         char *ptr;
591
592         /* skip bytes read OK */
593         while (nob_read > 0) {
594                 LASSERT (page_count > 0);
595
596                 if (pga->count > nob_read) {
597                         /* EOF inside this page */
598                         ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
599                         memset(ptr + nob_read, 0, pga->count - nob_read);
600                         kunmap(pga->pg);
601                         page_count--;
602                         pga++;
603                         break;
604                 }
605
606                 nob_read -= pga->count;
607                 page_count--;
608                 pga++;
609         }
610
611         /* zero remaining pages */
612         while (page_count-- > 0) {
613                 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
614                 memset(ptr, 0, pga->count);
615                 kunmap(pga->pg);
616                 pga++;
617         }
618 }
619
620 static int check_write_rcs(struct ptlrpc_request *request,
621                            int requested_nob, int niocount,
622                            obd_count page_count, struct brw_page *pga)
623 {
624         int    *remote_rcs, i;
625
626         /* return error if any niobuf was in error */
627         remote_rcs = lustre_swab_repbuf(request, 1,
628                                         sizeof(*remote_rcs) * niocount, NULL);
629         if (remote_rcs == NULL) {
630                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
631                 return(-EPROTO);
632         }
633         if (lustre_msg_swabbed(request->rq_repmsg))
634                 for (i = 0; i < niocount; i++)
635                         __swab32s(&remote_rcs[i]);
636
637         for (i = 0; i < niocount; i++) {
638                 if (remote_rcs[i] < 0)
639                         return(remote_rcs[i]);
640
641                 if (remote_rcs[i] != 0) {
642                         CERROR("rc[%d] invalid (%d) req %p\n",
643                                 i, remote_rcs[i], request);
644                         return(-EPROTO);
645                 }
646         }
647
648         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
649                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
650                        requested_nob, request->rq_bulk->bd_nob_transferred);
651                 return(-EPROTO);
652         }
653
654         return (0);
655 }
656
657 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
658 {
659         if (p1->flag != p2->flag) {
660                 unsigned mask = ~OBD_BRW_FROM_GRANT;
661
662                 /* warn if we try to combine flags that we don't know to be
663                  * safe to combine */
664                 if ((p1->flag & mask) != (p2->flag & mask))
665                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
666                                "same brw?\n", p1->flag, p2->flag);
667                 return 0;
668         }
669
670         return (p1->off + p1->count == p2->off);
671 }
672
673 #if CHECKSUM_BULK
674 static obd_count cksum_pages(int nob, obd_count page_count,
675                              struct brw_page *pga)
676 {
677         obd_count cksum = 0;
678         char *ptr;
679
680         while (nob > 0) {
681                 LASSERT (page_count > 0);
682
683                 ptr = kmap(pga->pg);
684                 ost_checksum(&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
685                              pga->count > nob ? nob : pga->count);
686                 kunmap(pga->pg);
687
688                 nob -= pga->count;
689                 page_count--;
690                 pga++;
691         }
692
693         return (cksum);
694 }
695 #endif
696
697 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
698                                 struct lov_stripe_md *lsm, obd_count page_count,
699                                 struct brw_page *pga, int *requested_nobp,
700                                 int *niocountp, struct ptlrpc_request **reqp)
701 {
702         struct ptlrpc_request   *req;
703         struct ptlrpc_bulk_desc *desc;
704         struct client_obd       *cli = &imp->imp_obd->u.cli;
705         struct ost_body         *body;
706         struct obd_ioobj        *ioobj;
707         struct niobuf_remote    *niobuf;
708         unsigned long            flags;
709         int                      niocount;
710         int                      size[3];
711         int                      i;
712         int                      requested_nob;
713         int                      opc;
714         int                      rc;
715
716         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
717
718         for (niocount = i = 1; i < page_count; i++)
719                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
720                         niocount++;
721
722         size[0] = sizeof(*body);
723         size[1] = sizeof(*ioobj);
724         size[2] = niocount * sizeof(*niobuf);
725
726         req = ptlrpc_prep_req(imp, opc, 3, size, NULL);
727         if (req == NULL)
728                 return (-ENOMEM);
729
730         if (opc == OST_WRITE)
731                 desc = ptlrpc_prep_bulk_imp (req, page_count,
732                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
733         else
734                 desc = ptlrpc_prep_bulk_imp (req, page_count,
735                                              BULK_PUT_SINK, OST_BULK_PORTAL);
736         if (desc == NULL)
737                 GOTO(out, rc = -ENOMEM);
738         /* NB request now owns desc and will free it when it gets freed */
739
740         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
741         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
742         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
743
744         memcpy(&body->oa, oa, sizeof(*oa));
745
746         obdo_to_ioobj(oa, ioobj);
747         ioobj->ioo_bufcnt = niocount;
748
749         LASSERT (page_count > 0);
750         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
751                 struct brw_page *pg = &pga[i];
752                 struct brw_page *pg_prev = pg - 1;
753
754                 LASSERT(pg->count > 0);
755                 LASSERT((pg->off & ~PAGE_MASK) + pg->count <= PAGE_SIZE);
756                 LASSERTF(i == 0 || pg->off > pg_prev->off,
757                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
758                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
759                          i, page_count,
760                          pg->pg, pg->pg->private, pg->pg->index, pg->off,
761                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
762                                  pg_prev->off);
763
764                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~PAGE_MASK,
765                                       pg->count);
766                 requested_nob += pg->count;
767
768                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
769                         niobuf--;
770                         niobuf->len += pg->count;
771                 } else {
772                         niobuf->offset = pg->off;
773                         niobuf->len    = pg->count;
774                         niobuf->flags  = pg->flag;
775                 }
776         }
777
778         LASSERT((void *)(niobuf - niocount) ==
779                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
780         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
781         spin_lock_irqsave(&req->rq_lock, flags);
782         req->rq_no_resend = 1;
783         spin_unlock_irqrestore(&req->rq_lock, flags);
784
785         /* size[0] still sizeof (*body) */
786         if (opc == OST_WRITE) {
787 #if CHECKSUM_BULK
788                 body->oa.o_valid |= OBD_MD_FLCKSUM;
789                 body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
790 #endif
791                 /* 1 RC per niobuf */
792                 size[1] = sizeof(__u32) * niocount;
793                 req->rq_replen = lustre_msg_size(2, size);
794         } else {
795                 /* 1 RC for the whole I/O */
796                 req->rq_replen = lustre_msg_size(1, size);
797         }
798
799         *niocountp = niocount;
800         *requested_nobp = requested_nob;
801         *reqp = req;
802         return (0);
803
804  out:
805         ptlrpc_req_finished (req);
806         return (rc);
807 }
808
809 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
810                                 int requested_nob, int niocount,
811                                 obd_count page_count, struct brw_page *pga,
812                                 int rc)
813 {
814         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
815         struct ost_body *body;
816         ENTRY;
817
818         if (rc < 0)
819                 RETURN(rc);
820
821         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
822         if (body == NULL) {
823                 CERROR ("Can't unpack body\n");
824                 RETURN(-EPROTO);
825         }
826
827         osc_update_grant(cli, body);
828         memcpy(oa, &body->oa, sizeof(*oa));
829
830         if (req->rq_reqmsg->opc == OST_WRITE) {
831                 if (rc > 0) {
832                         CERROR ("Unexpected +ve rc %d\n", rc);
833                         RETURN(-EPROTO);
834                 }
835                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
836
837                 RETURN(check_write_rcs(req, requested_nob, niocount,
838                                        page_count, pga));
839         }
840
841         if (rc > requested_nob) {
842                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
843                 RETURN(-EPROTO);
844         }
845
846         if (rc != req->rq_bulk->bd_nob_transferred) {
847                 CERROR ("Unexpected rc %d (%d transferred)\n",
848                         rc, req->rq_bulk->bd_nob_transferred);
849                 return (-EPROTO);
850         }
851
852         if (rc < requested_nob)
853                 handle_short_read(rc, page_count, pga);
854
855 #if CHECKSUM_BULK
856         if (oa->o_valid & OBD_MD_FLCKSUM) {
857                 const struct ptlrpc_peer *peer =
858                         &req->rq_import->imp_connection->c_peer;
859                 static int cksum_counter;
860                 obd_count server_cksum = oa->o_cksum;
861                 obd_count cksum = cksum_pages(rc, page_count, pga);
862                 char str[PTL_NALFMT_SIZE];
863
864                 portals_nid2str(peer->peer_ni->pni_number, peer->peer_nid, str);
865
866                 cksum_counter++;
867                 if (server_cksum != cksum) {
868                         CERROR("Bad checksum: server %x, client %x, server NID "
869                                LPX64" (%s)\n", server_cksum, cksum,
870                                peer->peer_nid, str);
871                         cksum_counter = 0;
872                         oa->o_cksum = cksum;
873                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
874                         CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
875                               cksum_counter, peer->peer_nid, str, cksum);
876                 }
877         } else {
878                 static int cksum_missed;
879
880                 cksum_missed++;
881                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
882                         CERROR("Request checksum %u from "LPX64", no reply\n",
883                                cksum_missed,
884                                req->rq_import->imp_connection->c_peer.peer_nid);
885         }
886 #endif
887         RETURN(0);
888 }
889
890 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
891                             struct lov_stripe_md *lsm,
892                             obd_count page_count, struct brw_page *pga)
893 {
894         int                    requested_nob;
895         int                    niocount;
896         struct ptlrpc_request *request;
897         int                    rc;
898         ENTRY;
899
900 restart_bulk:
901         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
902                                   page_count, pga, &requested_nob, &niocount,
903                                   &request);
904         /* NB ^ sets rq_no_resend */
905
906         if (rc != 0)
907                 return (rc);
908
909         rc = ptlrpc_queue_wait(request);
910
911         if (rc == -ETIMEDOUT && request->rq_resend) {
912                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
913                 ptlrpc_req_finished(request);
914                 goto restart_bulk;
915         }
916
917         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
918                                   page_count, pga, rc);
919
920         ptlrpc_req_finished(request);
921         RETURN (rc);
922 }
923
924 static int brw_interpret(struct ptlrpc_request *request,
925                          struct osc_brw_async_args *aa, int rc)
926 {
927         struct obdo *oa      = aa->aa_oa;
928         int requested_nob    = aa->aa_requested_nob;
929         int niocount         = aa->aa_nio_count;
930         obd_count page_count = aa->aa_page_count;
931         struct brw_page *pga = aa->aa_pga;
932         ENTRY;
933
934         /* XXX bug 937 here */
935         if (rc == -ETIMEDOUT && request->rq_resend) {
936                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
937                 LBUG(); /* re-send.  later. */
938                 //goto restart_bulk;
939         }
940
941         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
942                                   page_count, pga, rc);
943         RETURN (rc);
944 }
945
946 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
947                           struct lov_stripe_md *lsm, obd_count page_count,
948                           struct brw_page *pga, struct ptlrpc_request_set *set)
949 {
950         struct ptlrpc_request     *request;
951         int                        requested_nob;
952         int                        nio_count;
953         struct osc_brw_async_args *aa;
954         int                        rc;
955         ENTRY;
956
957         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
958                                   page_count, pga, &requested_nob, &nio_count,
959                                   &request);
960         /* NB ^ sets rq_no_resend */
961
962         if (rc == 0) {
963                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
964                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
965                 aa->aa_oa = oa;
966                 aa->aa_requested_nob = requested_nob;
967                 aa->aa_nio_count = nio_count;
968                 aa->aa_page_count = page_count;
969                 aa->aa_pga = pga;
970
971                 request->rq_interpret_reply = brw_interpret;
972                 ptlrpc_set_add_req(set, request);
973         }
974         RETURN (rc);
975 }
976
977 #ifndef min_t
978 #define min_t(type,x,y) \
979         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
980 #endif
981
982 /*
983  * ugh, we want disk allocation on the target to happen in offset order.  we'll
984  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
985  * fine for our small page arrays and doesn't require allocation.  its an
986  * insertion sort that swaps elements that are strides apart, shrinking the
987  * stride down until its '1' and the array is sorted.
988  */
989 static void sort_brw_pages(struct brw_page *array, int num)
990 {
991         int stride, i, j;
992         struct brw_page tmp;
993
994         if (num == 1)
995                 return;
996         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
997                 ;
998
999         do {
1000                 stride /= 3;
1001                 for (i = stride ; i < num ; i++) {
1002                         tmp = array[i];
1003                         j = i;
1004                         while (j >= stride && array[j - stride].off > tmp.off) {
1005                                 array[j] = array[j - stride];
1006                                 j -= stride;
1007                         }
1008                         array[j] = tmp;
1009                 }
1010         } while (stride > 1);
1011 }
1012
1013 /* make sure we the regions we're passing to elan don't violate its '4
1014  * fragments' constraint.  portal headers are a fragment, all full
1015  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1016  * counts as a fragment.  I think.  see bug 934. */
1017 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1018 {
1019         int frags_left = 3;
1020         int saw_whole_frag = 0;
1021         int i;
1022
1023         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1024                 if (pg->count == PAGE_SIZE) {
1025                         if (!saw_whole_frag) {
1026                                 saw_whole_frag = 1;
1027                                 frags_left--;
1028                         }
1029                 } else {
1030                         frags_left--;
1031                 }
1032         }
1033         return i;
1034 }
1035
1036 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1037                    struct lov_stripe_md *md, obd_count page_count,
1038                    struct brw_page *pga, struct obd_trans_info *oti)
1039 {
1040         ENTRY;
1041
1042         if (cmd == OBD_BRW_CHECK) {
1043                 /* The caller just wants to know if there's a chance that this
1044                  * I/O can succeed */
1045                 struct obd_import *imp = class_exp2cliimp(exp);
1046
1047                 if (imp == NULL || imp->imp_invalid)
1048                         RETURN(-EIO);
1049                 RETURN(0);
1050         }
1051
1052         while (page_count) {
1053                 obd_count pages_per_brw;
1054                 int rc;
1055
1056                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1057                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1058                 else
1059                         pages_per_brw = page_count;
1060
1061                 sort_brw_pages(pga, pages_per_brw);
1062                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1063
1064                 rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
1065
1066                 if (rc != 0)
1067                         RETURN(rc);
1068
1069                 page_count -= pages_per_brw;
1070                 pga += pages_per_brw;
1071         }
1072         RETURN(0);
1073 }
1074
1075 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1076                          struct lov_stripe_md *md, obd_count page_count,
1077                          struct brw_page *pga, struct ptlrpc_request_set *set,
1078                          struct obd_trans_info *oti)
1079 {
1080         ENTRY;
1081
1082         if (cmd == OBD_BRW_CHECK) {
1083                 /* The caller just wants to know if there's a chance that this
1084                  * I/O can succeed */
1085                 struct obd_import *imp = class_exp2cliimp(exp);
1086
1087                 if (imp == NULL || imp->imp_invalid)
1088                         RETURN(-EIO);
1089                 RETURN(0);
1090         }
1091
1092         while (page_count) {
1093                 obd_count pages_per_brw;
1094                 int rc;
1095
1096                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1097                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1098                 else
1099                         pages_per_brw = page_count;
1100
1101                 sort_brw_pages(pga, pages_per_brw);
1102                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1103
1104                 rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
1105
1106                 if (rc != 0)
1107                         RETURN(rc);
1108
1109                 page_count -= pages_per_brw;
1110                 pga += pages_per_brw;
1111         }
1112         RETURN(0);
1113 }
1114
1115 static void osc_check_rpcs(struct client_obd *cli);
1116 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1117                            int sent);
1118 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
1119 static void lop_update_pending(struct client_obd *cli,
1120                                struct loi_oap_pages *lop, int cmd, int delta);
1121
1122 /* this is called when a sync waiter receives an interruption.  Its job is to
1123  * get the caller woken as soon as possible.  If its page hasn't been put in an
1124  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1125  * desiring interruption which will forcefully complete the rpc once the rpc
1126  * has timed out */
1127 static void osc_occ_interrupted(struct oig_callback_context *occ)
1128 {
1129         struct osc_async_page *oap;
1130         struct loi_oap_pages *lop;
1131         struct lov_oinfo *loi;
1132         ENTRY;
1133
1134         /* XXX member_of() */
1135         oap = list_entry(occ, struct osc_async_page, oap_occ);
1136
1137         spin_lock(&oap->oap_cli->cl_loi_list_lock);
1138
1139         oap->oap_interrupted = 1;
1140
1141         /* ok, it's been put in an rpc. */
1142         if (oap->oap_request != NULL) {
1143                 ptlrpc_mark_interrupted(oap->oap_request);
1144                 ptlrpcd_wake(oap->oap_request);
1145                 GOTO(unlock, 0);
1146         }
1147
1148         /* we don't get interruption callbacks until osc_trigger_sync_io()
1149          * has been called and put the sync oaps in the pending/urgent lists.*/
1150         if (!list_empty(&oap->oap_pending_item)) {
1151                 list_del_init(&oap->oap_pending_item);
1152                 if (oap->oap_async_flags & ASYNC_URGENT)
1153                         list_del_init(&oap->oap_urgent_item);
1154
1155                 loi = oap->oap_loi;
1156                 lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
1157                         &loi->loi_write_lop : &loi->loi_read_lop;
1158                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1159                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1160
1161                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1162                 oap->oap_oig = NULL;
1163         }
1164
1165 unlock:
1166         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1167 }
1168
1169 /* this must be called holding the loi list lock to give coverage to exit_cache,
1170  * async_flag maintenance, and oap_request */
1171 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1172                               struct osc_async_page *oap, int sent, int rc)
1173 {
1174         osc_exit_cache(cli, oap, sent);
1175         oap->oap_async_flags = 0;
1176         oap->oap_interrupted = 0;
1177
1178         if (oap->oap_request != NULL) {
1179                 ptlrpc_req_finished(oap->oap_request);
1180                 oap->oap_request = NULL;
1181         }
1182
1183         if (rc == 0 && oa != NULL)
1184                 oap->oap_loi->loi_blocks = oa->o_blocks;
1185
1186         if (oap->oap_oig) {
1187                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1188                 oap->oap_oig = NULL;
1189                 EXIT;
1190                 return;
1191         }
1192
1193         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1194                                            oa, rc);
1195 }
1196
1197 static int brw_interpret_oap(struct ptlrpc_request *request,
1198                              struct osc_brw_async_args *aa, int rc)
1199 {
1200         struct osc_async_page *oap;
1201         struct client_obd *cli;
1202         struct list_head *pos, *n;
1203         ENTRY;
1204
1205
1206         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1207                                   aa->aa_nio_count, aa->aa_page_count,
1208                                   aa->aa_pga, rc);
1209
1210         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1211
1212         cli = aa->aa_cli;
1213         /* in failout recovery we ignore writeback failure and want
1214          * to just tell llite to unlock the page and continue */
1215         if (request->rq_reqmsg->opc == OST_WRITE && 
1216             (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
1217                 CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n", 
1218                        cli->cl_import, 
1219                        cli->cl_import ? cli->cl_import->imp_invalid : -1);
1220                 rc = 0;
1221         }
1222
1223         spin_lock(&cli->cl_loi_list_lock);
1224
1225         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1226          * is called so we know whether to go to sync BRWs or wait for more
1227          * RPCs to complete */
1228         cli->cl_brw_in_flight--;
1229
1230         /* the caller may re-use the oap after the completion call so
1231          * we need to clean it up a little */
1232         list_for_each_safe(pos, n, &aa->aa_oaps) {
1233                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1234
1235                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1236                        //oap->oap_page, oap->oap_page->index, oap);
1237
1238                 list_del_init(&oap->oap_rpc_item);
1239                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1240         }
1241
1242         osc_wake_cache_waiters(cli);
1243         osc_check_rpcs(cli);
1244
1245         spin_unlock(&cli->cl_loi_list_lock);
1246
1247         obdo_free(aa->aa_oa);
1248         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1249
1250         RETURN(0);
1251 }
1252
1253 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1254                                             struct list_head *rpc_list,
1255                                             int page_count, int cmd)
1256 {
1257         struct ptlrpc_request *req;
1258         struct brw_page *pga = NULL;
1259         int requested_nob, nio_count;
1260         struct osc_brw_async_args *aa;
1261         struct obdo *oa = NULL;
1262         struct obd_async_page_ops *ops = NULL;
1263         void *caller_data = NULL;
1264         struct list_head *pos;
1265         int i, rc;
1266
1267         LASSERT(!list_empty(rpc_list));
1268
1269         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1270         if (pga == NULL)
1271                 RETURN(ERR_PTR(-ENOMEM));
1272
1273         oa = obdo_alloc();
1274         if (oa == NULL)
1275                 GOTO(out, req = ERR_PTR(-ENOMEM));
1276
1277         i = 0;
1278         list_for_each(pos, rpc_list) {
1279                 struct osc_async_page *oap;
1280
1281                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1282                 if (ops == NULL) {
1283                         ops = oap->oap_caller_ops;
1284                         caller_data = oap->oap_caller_data;
1285                 }
1286                 pga[i].off = oap->oap_obj_off + oap->oap_page_off;
1287                 pga[i].pg = oap->oap_page;
1288                 pga[i].count = oap->oap_count;
1289                 pga[i].flag = oap->oap_brw_flags;
1290                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1291                        pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1292                 i++;
1293         }
1294
1295         /* always get the data for the obdo for the rpc */
1296         LASSERT(ops != NULL);
1297         ops->ap_fill_obdo(caller_data, cmd, oa);
1298
1299         sort_brw_pages(pga, page_count);
1300         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1301                                   pga, &requested_nob, &nio_count, &req);
1302         if (rc != 0) {
1303                 CERROR("prep_req failed: %d\n", rc);
1304                 GOTO(out, req = ERR_PTR(rc));
1305         }
1306
1307         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1308         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1309         aa->aa_oa = oa;
1310         aa->aa_requested_nob = requested_nob;
1311         aa->aa_nio_count = nio_count;
1312         aa->aa_page_count = page_count;
1313         aa->aa_pga = pga;
1314         aa->aa_cli = cli;
1315
1316 out:
1317         if (IS_ERR(req)) {
1318                 if (oa)
1319                         obdo_free(oa);
1320                 if (pga)
1321                         OBD_FREE(pga, sizeof(*pga) * page_count);
1322         }
1323         RETURN(req);
1324 }
1325
1326 static void lop_update_pending(struct client_obd *cli,
1327                                struct loi_oap_pages *lop, int cmd, int delta)
1328 {
1329         lop->lop_num_pending += delta;
1330         if (cmd == OBD_BRW_WRITE)
1331                 cli->cl_pending_w_pages += delta;
1332         else
1333                 cli->cl_pending_r_pages += delta;
1334 }
1335
1336 /* the loi lock is held across this function but it's allowed to release
1337  * and reacquire it during its work */
1338 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1339                             int cmd, struct loi_oap_pages *lop)
1340 {
1341         struct ptlrpc_request *request;
1342         obd_count page_count = 0;
1343         struct list_head *tmp, *pos;
1344         struct osc_async_page *oap = NULL;
1345         struct osc_brw_async_args *aa;
1346         struct obd_async_page_ops *ops;
1347         LIST_HEAD(rpc_list);
1348         ENTRY;
1349
1350         /* first we find the pages we're allowed to work with */
1351         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1352                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1353                 ops = oap->oap_caller_ops;
1354
1355                 LASSERT(oap->oap_magic == OAP_MAGIC);
1356
1357                 /* in llite being 'ready' equates to the page being locked
1358                  * until completion unlocks it.  commit_write submits a page
1359                  * as not ready because its unlock will happen unconditionally
1360                  * as the call returns.  if we race with commit_write giving
1361                  * us that page we dont' want to create a hole in the page
1362                  * stream, so we stop and leave the rpc to be fired by
1363                  * another dirtier or kupdated interval (the not ready page
1364                  * will still be on the dirty list).  we could call in
1365                  * at the end of ll_file_write to process the queue again. */
1366                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1367                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1368                         if (rc < 0)
1369                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1370                                                 "instead of ready\n", oap,
1371                                                 oap->oap_page, rc);
1372                         switch (rc) {
1373                         case -EAGAIN:
1374                                 /* llite is telling us that the page is still
1375                                  * in commit_write and that we should try
1376                                  * and put it in an rpc again later.  we
1377                                  * break out of the loop so we don't create
1378                                  * a hole in the sequence of pages in the rpc
1379                                  * stream.*/
1380                                 pos = NULL;
1381                                 break;
1382                         case -EINTR:
1383                                 /* the io isn't needed.. tell the checks
1384                                  * below to complete the rpc with EINTR */
1385                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1386                                 oap->oap_count = -EINTR;
1387                                 break;
1388                         case 0:
1389                                 oap->oap_async_flags |= ASYNC_READY;
1390                                 break;
1391                         default:
1392                                 LASSERTF(0, "oap %p page %p returned %d "
1393                                             "from make_ready\n", oap,
1394                                             oap->oap_page, rc);
1395                                 break;
1396                         }
1397                 }
1398                 if (pos == NULL)
1399                         break;
1400
1401                 /* take the page out of our book-keeping */
1402                 list_del_init(&oap->oap_pending_item);
1403                 lop_update_pending(cli, lop, cmd, -1);
1404                 if (!list_empty(&oap->oap_urgent_item))
1405                         list_del_init(&oap->oap_urgent_item);
1406
1407                 /* ask the caller for the size of the io as the rpc leaves. */
1408                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1409                         oap->oap_count =
1410                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1411                 if (oap->oap_count <= 0) {
1412                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1413                                oap->oap_count);
1414                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1415                         continue;
1416                 }
1417
1418                 /* now put the page back in our accounting */
1419                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1420                 if (++page_count >= cli->cl_max_pages_per_rpc)
1421                         break;
1422         }
1423
1424         osc_wake_cache_waiters(cli);
1425
1426         if (page_count == 0)
1427                 RETURN(0);
1428
1429         loi_list_maint(cli, loi);
1430         spin_unlock(&cli->cl_loi_list_lock);
1431
1432         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1433         if (IS_ERR(request)) {
1434                 /* this should happen rarely and is pretty bad, it makes the
1435                  * pending list not follow the dirty order */
1436                 spin_lock(&cli->cl_loi_list_lock);
1437                 list_for_each_safe(pos, tmp, &rpc_list) {
1438                         oap = list_entry(pos, struct osc_async_page,
1439                                          oap_rpc_item);
1440                         list_del_init(&oap->oap_rpc_item);
1441
1442                         /* queued sync pages can be torn down while the pages
1443                          * were between the pending list and the rpc */
1444                         if (oap->oap_interrupted) {
1445                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1446                                 osc_ap_completion(cli, NULL, oap, 0,
1447                                                   oap->oap_count);
1448                                 continue;
1449                         }
1450
1451                         /* put the page back in the loi/lop lists */
1452                         list_add_tail(&oap->oap_pending_item,
1453                                       &lop->lop_pending);
1454                         lop_update_pending(cli, lop, cmd, 1);
1455                         if (oap->oap_async_flags & ASYNC_URGENT)
1456                                 list_add(&oap->oap_urgent_item,
1457                                          &lop->lop_urgent);
1458                 }
1459                 loi_list_maint(cli, loi);
1460                 RETURN(PTR_ERR(request));
1461         }
1462
1463         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1464         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1465         INIT_LIST_HEAD(&aa->aa_oaps);
1466         list_splice(&rpc_list, &aa->aa_oaps);
1467         INIT_LIST_HEAD(&rpc_list);
1468
1469 #ifdef __KERNEL__
1470         if (cmd == OBD_BRW_READ) {
1471                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1472                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
1473         } else {
1474                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1475                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1476                                  cli->cl_brw_in_flight);
1477         }
1478 #endif
1479
1480         spin_lock(&cli->cl_loi_list_lock);
1481
1482         cli->cl_brw_in_flight++;
1483         /* queued sync pages can be torn down while the pages
1484          * were between the pending list and the rpc */
1485         list_for_each(pos, &aa->aa_oaps) {
1486                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1487                 if (oap->oap_interrupted) {
1488                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1489                                oap, request);
1490                         ptlrpc_mark_interrupted(request);
1491                         break;
1492                 }
1493         }
1494
1495         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %d in flight\n", request,
1496                page_count, aa, cli->cl_brw_in_flight);
1497
1498         oap->oap_request = ptlrpc_request_addref(request);
1499         request->rq_interpret_reply = brw_interpret_oap;
1500         ptlrpcd_add_req(request);
1501         RETURN(1);
1502 }
1503
1504 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1505                          int cmd)
1506 {
1507         int optimal;
1508         ENTRY;
1509
1510         if (lop->lop_num_pending == 0)
1511                 RETURN(0);
1512
1513         /* if we have an invalid import we want to drain the queued pages
1514          * by forcing them through rpcs that immediately fail and complete
1515          * the pages.  recovery relies on this to empty the queued pages
1516          * before canceling the locks and evicting down the llite pages */
1517         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1518                 RETURN(1);
1519
1520         /* stream rpcs in queue order as long as as there is an urgent page
1521          * queued.  this is our cheap solution for good batching in the case
1522          * where writepage marks some random page in the middle of the file as
1523          * urgent because of, say, memory pressure */
1524         if (!list_empty(&lop->lop_urgent))
1525                 RETURN(1);
1526
1527         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1528         optimal = cli->cl_max_pages_per_rpc;
1529         if (cmd == OBD_BRW_WRITE) {
1530                 /* trigger a write rpc stream as long as there are dirtiers
1531                  * waiting for space.  as they're waiting, they're not going to
1532                  * create more pages to coallesce with what's waiting.. */
1533                 if (!list_empty(&cli->cl_cache_waiters))
1534                         RETURN(1);
1535
1536                 /* *2 to avoid triggering rpcs that would want to include pages
1537                  * that are being queued but which can't be made ready until
1538                  * the queuer finishes with the page. this is a wart for
1539                  * llite::commit_write() */
1540                 optimal += 16;
1541         }
1542         if (lop->lop_num_pending >= optimal)
1543                 RETURN(1);
1544
1545         RETURN(0);
1546 }
1547
1548 static void on_list(struct list_head *item, struct list_head *list,
1549                     int should_be_on)
1550 {
1551         if (list_empty(item) && should_be_on)
1552                 list_add_tail(item, list);
1553         else if (!list_empty(item) && !should_be_on)
1554                 list_del_init(item);
1555 }
1556
1557 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1558  * can find pages to build into rpcs quickly */
1559 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1560 {
1561         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1562                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1563                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1564
1565         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1566                 loi->loi_write_lop.lop_num_pending);
1567
1568         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1569                 loi->loi_read_lop.lop_num_pending);
1570 }
1571
1572 #define LOI_DEBUG(LOI, STR, args...)                                     \
1573         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1574                !list_empty(&(LOI)->loi_cli_item),                        \
1575                (LOI)->loi_write_lop.lop_num_pending,                     \
1576                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1577                (LOI)->loi_read_lop.lop_num_pending,                      \
1578                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1579                args)                                                     \
1580
1581 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1582 {
1583         ENTRY;
1584         /* first return all objects which we already know to have
1585          * pages ready to be stuffed into rpcs */
1586         if (!list_empty(&cli->cl_loi_ready_list))
1587                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1588                                   struct lov_oinfo, loi_cli_item));
1589
1590         /* then if we have cache waiters, return all objects with queued
1591          * writes.  This is especially important when many small files
1592          * have filled up the cache and not been fired into rpcs because
1593          * they don't pass the nr_pending/object threshhold */
1594         if (!list_empty(&cli->cl_cache_waiters) &&
1595             !list_empty(&cli->cl_loi_write_list))
1596                 RETURN(list_entry(cli->cl_loi_write_list.next,
1597                                   struct lov_oinfo, loi_write_item));
1598
1599         /* then return all queued objects when we have an invalid import
1600          * so that they get flushed */
1601         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1602                 if (!list_empty(&cli->cl_loi_write_list))
1603                         RETURN(list_entry(cli->cl_loi_write_list.next,
1604                                           struct lov_oinfo, loi_write_item));
1605                 if (!list_empty(&cli->cl_loi_read_list))
1606                         RETURN(list_entry(cli->cl_loi_read_list.next,
1607                                           struct lov_oinfo, loi_read_item));
1608         }
1609         RETURN(NULL);
1610 }
1611
1612 /* called with the loi list lock held */
1613 static void osc_check_rpcs(struct client_obd *cli)
1614 {
1615         struct lov_oinfo *loi;
1616         int rc = 0, race_counter = 0;
1617         ENTRY;
1618
1619         while ((loi = osc_next_loi(cli)) != NULL) {
1620                 LOI_DEBUG(loi, "%d in flight\n", cli->cl_brw_in_flight);
1621
1622                 if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight)
1623                         break;
1624
1625                 /* attempt some read/write balancing by alternating between
1626                  * reads and writes in an object.  The makes_rpc checks here
1627                  * would be redundant if we were getting read/write work items
1628                  * instead of objects.  we don't want send_oap_rpc to drain a
1629                  * partial read pending queue when we're given this object to
1630                  * do io on writes while there are cache waiters */
1631                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1632                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1633                                               &loi->loi_write_lop);
1634                         if (rc < 0)
1635                                 break;
1636                         if (rc > 0)
1637                                 race_counter = 0;
1638                         else
1639                                 race_counter++;
1640                 }
1641                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1642                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1643                                               &loi->loi_read_lop);
1644                         if (rc < 0)
1645                                 break;
1646                         if (rc > 0)
1647                                 race_counter = 0;
1648                         else
1649                                 race_counter++;
1650                 }
1651
1652                 /* attempt some inter-object balancing by issueing rpcs
1653                  * for each object in turn */
1654                 if (!list_empty(&loi->loi_cli_item))
1655                         list_del_init(&loi->loi_cli_item);
1656                 if (!list_empty(&loi->loi_write_item))
1657                         list_del_init(&loi->loi_write_item);
1658                 if (!list_empty(&loi->loi_read_item))
1659                         list_del_init(&loi->loi_read_item);
1660
1661                 loi_list_maint(cli, loi);
1662
1663                 /* send_oap_rpc fails with 0 when make_ready tells it to
1664                  * back off.  llite's make_ready does this when it tries
1665                  * to lock a page queued for write that is already locked.
1666                  * we want to try sending rpcs from many objects, but we
1667                  * don't want to spin failing with 0.  */
1668                 if (race_counter == 10)
1669                         break;
1670         }
1671         EXIT;
1672 }
1673
1674 /* we're trying to queue a page in the osc so we're subject to the
1675  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1676  * If the osc's queued pages are already at that limit, then we want to sleep
1677  * until there is space in the osc's queue for us.  We also may be waiting for
1678  * write credits from the OST if there are RPCs in flight that may return some
1679  * before we fall back to sync writes.
1680  *
1681  * We need this know our allocation was granted in the presence of signals */
1682 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1683 {
1684         int rc;
1685         ENTRY;
1686         spin_lock(&cli->cl_loi_list_lock);
1687         rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0;
1688         spin_unlock(&cli->cl_loi_list_lock);
1689         RETURN(rc);
1690 };
1691
1692 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1693  * grant or cache space. */
1694 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1695                            struct osc_async_page *oap)
1696 {
1697         struct osc_cache_waiter ocw;
1698         struct l_wait_info lwi = { 0 };
1699
1700         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1701                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1702                cli->cl_avail_grant);
1703
1704         if (cli->cl_dirty_max < PAGE_SIZE)
1705                 return(-EDQUOT);
1706
1707         /* Hopefully normal case - cache space and write credits available */
1708         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1709             cli->cl_avail_grant >= PAGE_SIZE) {
1710                 /* account for ourselves */
1711                 osc_consume_write_grant(cli, oap);
1712                 return(0);
1713         }
1714
1715         /* Make sure that there are write rpcs in flight to wait for.  This
1716          * is a little silly as this object may not have any pending but
1717          * other objects sure might. */
1718         if (cli->cl_brw_in_flight) {
1719                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1720                 init_waitqueue_head(&ocw.ocw_waitq);
1721                 ocw.ocw_oap = oap;
1722                 ocw.ocw_rc = 0;
1723
1724                 loi_list_maint(cli, loi);
1725                 osc_check_rpcs(cli);
1726                 spin_unlock(&cli->cl_loi_list_lock);
1727
1728                 CDEBUG(0, "sleeping for cache space\n");
1729                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1730
1731                 spin_lock(&cli->cl_loi_list_lock);
1732                 if (!list_empty(&ocw.ocw_entry)) {
1733                         list_del(&ocw.ocw_entry);
1734                         RETURN(-EINTR);
1735                 }
1736                 RETURN(ocw.ocw_rc);
1737         }
1738
1739         RETURN(-EDQUOT);
1740 }
1741
1742 /* the companion to enter_cache, called when an oap is no longer part of the
1743  * dirty accounting.. so writeback completes or truncate happens before writing
1744  * starts.  must be called with the loi lock held. */
1745 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1746                            int sent)
1747 {
1748         ENTRY;
1749
1750         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1751                 EXIT;
1752                 return;
1753         }
1754
1755         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1756         cli->cl_dirty -= PAGE_SIZE;
1757         if (!sent) {
1758                 cli->cl_lost_grant += PAGE_SIZE;
1759                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1760                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1761         }
1762
1763         EXIT;
1764 }
1765
1766 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1767                         struct lov_oinfo *loi, struct page *page,
1768                         obd_off offset, struct obd_async_page_ops *ops,
1769                         void *data, void **res)
1770 {
1771         struct osc_async_page *oap;
1772         ENTRY;
1773
1774         OBD_ALLOC(oap, sizeof(*oap));
1775         if (oap == NULL)
1776                 return -ENOMEM;
1777
1778         oap->oap_magic = OAP_MAGIC;
1779         oap->oap_cli = &exp->exp_obd->u.cli;
1780         oap->oap_loi = loi;
1781
1782         oap->oap_caller_ops = ops;
1783         oap->oap_caller_data = data;
1784
1785         oap->oap_page = page;
1786         oap->oap_obj_off = offset;
1787
1788         INIT_LIST_HEAD(&oap->oap_pending_item);
1789         INIT_LIST_HEAD(&oap->oap_urgent_item);
1790         INIT_LIST_HEAD(&oap->oap_rpc_item);
1791
1792         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
1793
1794         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
1795         *res = oap;
1796         RETURN(0);
1797 }
1798
1799 struct osc_async_page *oap_from_cookie(void *cookie)
1800 {
1801         struct osc_async_page *oap = cookie;
1802         if (oap->oap_magic != OAP_MAGIC)
1803                 return ERR_PTR(-EINVAL);
1804         return oap;
1805 };
1806
1807 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1808                               struct lov_oinfo *loi, void *cookie,
1809                               int cmd, obd_off off, int count,
1810                               obd_flag brw_flags, enum async_flags async_flags)
1811 {
1812         struct client_obd *cli = &exp->exp_obd->u.cli;
1813         struct osc_async_page *oap;
1814         struct loi_oap_pages *lop;
1815         int rc;
1816         ENTRY;
1817
1818         oap = oap_from_cookie(cookie);
1819         if (IS_ERR(oap))
1820                 RETURN(PTR_ERR(oap));
1821
1822         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1823                 RETURN(-EIO);
1824
1825         if (!list_empty(&oap->oap_pending_item) ||
1826             !list_empty(&oap->oap_urgent_item) ||
1827             !list_empty(&oap->oap_rpc_item))
1828                 RETURN(-EBUSY);
1829
1830         if (loi == NULL)
1831                 loi = &lsm->lsm_oinfo[0];
1832
1833         spin_lock(&cli->cl_loi_list_lock);
1834
1835         oap->oap_cmd = cmd;
1836         oap->oap_async_flags = async_flags;
1837         oap->oap_page_off = off;
1838         oap->oap_count = count;
1839         oap->oap_brw_flags = brw_flags;
1840
1841         if (cmd == OBD_BRW_WRITE) {
1842                 rc = osc_enter_cache(cli, loi, oap);
1843                 if (rc) {
1844                         spin_unlock(&cli->cl_loi_list_lock);
1845                         RETURN(rc);
1846                 }
1847                 lop = &loi->loi_write_lop;
1848         } else {
1849                 lop = &loi->loi_read_lop;
1850         }
1851
1852         if (oap->oap_async_flags & ASYNC_URGENT)
1853                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1854         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1855         lop_update_pending(cli, lop, cmd, 1);
1856
1857         loi_list_maint(cli, loi);
1858
1859         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
1860                   cmd);
1861
1862         osc_check_rpcs(cli);
1863         spin_unlock(&cli->cl_loi_list_lock);
1864
1865         RETURN(0);
1866 }
1867
1868 /* aka (~was & now & flag), but this is more clear :) */
1869 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1870
1871 static int osc_set_async_flags(struct obd_export *exp,
1872                                struct lov_stripe_md *lsm,
1873                                struct lov_oinfo *loi, void *cookie,
1874                                obd_flag async_flags)
1875 {
1876         struct client_obd *cli = &exp->exp_obd->u.cli;
1877         struct loi_oap_pages *lop;
1878         struct osc_async_page *oap;
1879         int rc = 0;
1880         ENTRY;
1881
1882         oap = oap_from_cookie(cookie);
1883         if (IS_ERR(oap))
1884                 RETURN(PTR_ERR(oap));
1885
1886         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1887                 RETURN(-EIO);
1888
1889         if (loi == NULL)
1890                 loi = &lsm->lsm_oinfo[0];
1891
1892         if (oap->oap_cmd == OBD_BRW_WRITE) {
1893                 lop = &loi->loi_write_lop;
1894         } else {
1895                 lop = &loi->loi_read_lop;
1896         }
1897
1898         spin_lock(&cli->cl_loi_list_lock);
1899
1900         if (list_empty(&oap->oap_pending_item))
1901                 GOTO(out, rc = -EINVAL);
1902
1903         if ((oap->oap_async_flags & async_flags) == async_flags)
1904                 GOTO(out, rc = 0);
1905
1906         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1907                 oap->oap_async_flags |= ASYNC_READY;
1908
1909         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
1910                 if (list_empty(&oap->oap_rpc_item)) {
1911                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1912                         loi_list_maint(cli, loi);
1913                 }
1914         }
1915
1916         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
1917                         oap->oap_async_flags);
1918 out:
1919         osc_check_rpcs(cli);
1920         spin_unlock(&cli->cl_loi_list_lock);
1921         RETURN(rc);
1922 }
1923
1924 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
1925                              struct lov_oinfo *loi,
1926                              struct obd_io_group *oig, void *cookie,
1927                              int cmd, obd_off off, int count,
1928                              obd_flag brw_flags,
1929                              obd_flag async_flags)
1930 {
1931         struct client_obd *cli = &exp->exp_obd->u.cli;
1932         struct osc_async_page *oap;
1933         struct loi_oap_pages *lop;
1934         ENTRY;
1935
1936         oap = oap_from_cookie(cookie);
1937         if (IS_ERR(oap))
1938                 RETURN(PTR_ERR(oap));
1939
1940         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1941                 RETURN(-EIO);
1942
1943         if (!list_empty(&oap->oap_pending_item) ||
1944             !list_empty(&oap->oap_urgent_item) ||
1945             !list_empty(&oap->oap_rpc_item))
1946                 RETURN(-EBUSY);
1947
1948         if (loi == NULL)
1949                 loi = &lsm->lsm_oinfo[0];
1950
1951         spin_lock(&cli->cl_loi_list_lock);
1952
1953         oap->oap_cmd = cmd;
1954         oap->oap_page_off = off;
1955         oap->oap_count = count;
1956         oap->oap_brw_flags = brw_flags;
1957         oap->oap_async_flags = async_flags;
1958
1959         if (cmd == OBD_BRW_WRITE)
1960                 lop = &loi->loi_write_lop;
1961         else
1962                 lop = &loi->loi_read_lop;
1963
1964         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
1965         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
1966                 oap->oap_oig = oig;
1967                 oig_add_one(oig, &oap->oap_occ);
1968         }
1969
1970         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
1971
1972         spin_unlock(&cli->cl_loi_list_lock);
1973
1974         RETURN(0);
1975 }
1976
1977 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
1978                                  struct loi_oap_pages *lop, int cmd)
1979 {
1980         struct list_head *pos, *tmp;
1981         struct osc_async_page *oap;
1982
1983         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
1984                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1985                 list_del(&oap->oap_pending_item);
1986                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1987                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1988                 lop_update_pending(cli, lop, cmd, 1);
1989         }
1990         loi_list_maint(cli, loi);
1991 }
1992
1993 static int osc_trigger_group_io(struct obd_export *exp,
1994                                 struct lov_stripe_md *lsm,
1995                                 struct lov_oinfo *loi,
1996                                 struct obd_io_group *oig)
1997 {
1998         struct client_obd *cli = &exp->exp_obd->u.cli;
1999         ENTRY;
2000
2001         if (loi == NULL)
2002                 loi = &lsm->lsm_oinfo[0];
2003
2004         spin_lock(&cli->cl_loi_list_lock);
2005
2006         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2007         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2008
2009         osc_check_rpcs(cli);
2010         spin_unlock(&cli->cl_loi_list_lock);
2011
2012         RETURN(0);
2013 }
2014
2015 static int osc_teardown_async_page(struct obd_export *exp,
2016                                    struct lov_stripe_md *lsm,
2017                                    struct lov_oinfo *loi, void *cookie)
2018 {
2019         struct client_obd *cli = &exp->exp_obd->u.cli;
2020         struct loi_oap_pages *lop;
2021         struct osc_async_page *oap;
2022         int rc = 0;
2023         ENTRY;
2024
2025         oap = oap_from_cookie(cookie);
2026         if (IS_ERR(oap))
2027                 RETURN(PTR_ERR(oap));
2028
2029         if (loi == NULL)
2030                 loi = &lsm->lsm_oinfo[0];
2031
2032         if (oap->oap_cmd == OBD_BRW_WRITE) {
2033                 lop = &loi->loi_write_lop;
2034         } else {
2035                 lop = &loi->loi_read_lop;
2036         }
2037
2038         spin_lock(&cli->cl_loi_list_lock);
2039
2040         if (!list_empty(&oap->oap_rpc_item))
2041                 GOTO(out, rc = -EBUSY);
2042
2043         osc_exit_cache(cli, oap, 0);
2044         osc_wake_cache_waiters(cli);
2045
2046         if (!list_empty(&oap->oap_urgent_item)) {
2047                 list_del_init(&oap->oap_urgent_item);
2048                 oap->oap_async_flags &= ~ASYNC_URGENT;
2049         }
2050         if (!list_empty(&oap->oap_pending_item)) {
2051                 list_del_init(&oap->oap_pending_item);
2052                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2053         }
2054         loi_list_maint(cli, loi);
2055
2056         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2057 out:
2058         spin_unlock(&cli->cl_loi_list_lock);
2059         if (rc == 0)
2060                 OBD_FREE(oap, sizeof(*oap));
2061         RETURN(rc);
2062 }
2063
2064 #ifdef __KERNEL__
2065 /* Note: caller will lock/unlock, and set uptodate on the pages */
2066 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2067 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2068                            struct lov_stripe_md *lsm, obd_count page_count,
2069                            struct brw_page *pga)
2070 {
2071         struct ptlrpc_request *request = NULL;
2072         struct ost_body *body;
2073         struct niobuf_remote *nioptr;
2074         struct obd_ioobj *iooptr;
2075         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2076         int swab;
2077         ENTRY;
2078
2079         /* XXX does not handle 'new' brw protocol */
2080
2081         size[1] = sizeof(struct obd_ioobj);
2082         size[2] = page_count * sizeof(*nioptr);
2083
2084         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_READ, 3,
2085                                   size, NULL);
2086         if (!request)
2087                 RETURN(-ENOMEM);
2088
2089         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2090         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2091         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2092                                 sizeof(*nioptr) * page_count);
2093
2094         memcpy(&body->oa, oa, sizeof(body->oa));
2095
2096         obdo_to_ioobj(oa, iooptr);
2097         iooptr->ioo_bufcnt = page_count;
2098
2099         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2100                 LASSERT(PageLocked(pga[mapped].pg));
2101                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2102
2103                 nioptr->offset = pga[mapped].off;
2104                 nioptr->len    = pga[mapped].count;
2105                 nioptr->flags  = pga[mapped].flag;
2106         }
2107
2108         size[1] = page_count * sizeof(*nioptr);
2109         request->rq_replen = lustre_msg_size(2, size);
2110
2111         rc = ptlrpc_queue_wait(request);
2112         if (rc)
2113                 GOTO(out_req, rc);
2114
2115         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2116                                   lustre_swab_ost_body);
2117         if (body == NULL) {
2118                 CERROR("Can't unpack body\n");
2119                 GOTO(out_req, rc = -EPROTO);
2120         }
2121
2122         memcpy(oa, &body->oa, sizeof(*oa));
2123
2124         swab = lustre_msg_swabbed(request->rq_repmsg);
2125         LASSERT_REPSWAB(request, 1);
2126         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2127         if (!nioptr) {
2128                 /* nioptr missing or short */
2129                 GOTO(out_req, rc = -EPROTO);
2130         }
2131
2132         /* actual read */
2133         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2134                 struct page *page = pga[mapped].pg;
2135                 struct buffer_head *bh;
2136                 kdev_t dev;
2137
2138                 if (swab)
2139                         lustre_swab_niobuf_remote (nioptr);
2140
2141                 /* got san device associated */
2142                 LASSERT(exp->exp_obd != NULL);
2143                 dev = exp->exp_obd->u.cli.cl_sandev;
2144
2145                 /* hole */
2146                 if (!nioptr->offset) {
2147                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2148                                         page->mapping->host->i_ino,
2149                                         page->index);
2150                         memset(page_address(page), 0, PAGE_SIZE);
2151                         continue;
2152                 }
2153
2154                 if (!page->buffers) {
2155                         create_empty_buffers(page, dev, PAGE_SIZE);
2156                         bh = page->buffers;
2157
2158                         clear_bit(BH_New, &bh->b_state);
2159                         set_bit(BH_Mapped, &bh->b_state);
2160                         bh->b_blocknr = (unsigned long)nioptr->offset;
2161
2162                         clear_bit(BH_Uptodate, &bh->b_state);
2163
2164                         ll_rw_block(READ, 1, &bh);
2165                 } else {
2166                         bh = page->buffers;
2167
2168                         /* if buffer already existed, it must be the
2169                          * one we mapped before, check it */
2170                         LASSERT(!test_bit(BH_New, &bh->b_state));
2171                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2172                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2173
2174                         /* wait it's io completion */
2175                         if (test_bit(BH_Lock, &bh->b_state))
2176                                 wait_on_buffer(bh);
2177
2178                         if (!test_bit(BH_Uptodate, &bh->b_state))
2179                                 ll_rw_block(READ, 1, &bh);
2180                 }
2181
2182
2183                 /* must do syncronous write here */
2184                 wait_on_buffer(bh);
2185                 if (!buffer_uptodate(bh)) {
2186                         /* I/O error */
2187                         rc = -EIO;
2188                         goto out_req;
2189                 }
2190         }
2191
2192 out_req:
2193         ptlrpc_req_finished(request);
2194         RETURN(rc);
2195 }
2196
2197 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2198                             struct lov_stripe_md *lsm, obd_count page_count,
2199                             struct brw_page *pga)
2200 {
2201         struct ptlrpc_request *request = NULL;
2202         struct ost_body *body;
2203         struct niobuf_remote *nioptr;
2204         struct obd_ioobj *iooptr;
2205         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2206         int swab;
2207         ENTRY;
2208
2209         size[1] = sizeof(struct obd_ioobj);
2210         size[2] = page_count * sizeof(*nioptr);
2211
2212         request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE,
2213                                   3, size, NULL);
2214         if (!request)
2215                 RETURN(-ENOMEM);
2216
2217         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2218         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2219         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2220                                 sizeof (*nioptr) * page_count);
2221
2222         memcpy(&body->oa, oa, sizeof(body->oa));
2223
2224         obdo_to_ioobj(oa, iooptr);
2225         iooptr->ioo_bufcnt = page_count;
2226
2227         /* pack request */
2228         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2229                 LASSERT(PageLocked(pga[mapped].pg));
2230                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2231
2232                 nioptr->offset = pga[mapped].off;
2233                 nioptr->len    = pga[mapped].count;
2234                 nioptr->flags  = pga[mapped].flag;
2235         }
2236
2237         size[1] = page_count * sizeof(*nioptr);
2238         request->rq_replen = lustre_msg_size(2, size);
2239
2240         rc = ptlrpc_queue_wait(request);
2241         if (rc)
2242                 GOTO(out_req, rc);
2243
2244         swab = lustre_msg_swabbed (request->rq_repmsg);
2245         LASSERT_REPSWAB (request, 1);
2246         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2247         if (!nioptr) {
2248                 CERROR("absent/short niobuf array\n");
2249                 GOTO(out_req, rc = -EPROTO);
2250         }
2251
2252         /* actual write */
2253         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2254                 struct page *page = pga[mapped].pg;
2255                 struct buffer_head *bh;
2256                 kdev_t dev;
2257
2258                 if (swab)
2259                         lustre_swab_niobuf_remote (nioptr);
2260
2261                 /* got san device associated */
2262                 LASSERT(exp->exp_obd != NULL);
2263                 dev = exp->exp_obd->u.cli.cl_sandev;
2264
2265                 if (!page->buffers) {
2266                         create_empty_buffers(page, dev, PAGE_SIZE);
2267                 } else {
2268                         /* checking */
2269                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2270                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2271                         LASSERT(page->buffers->b_blocknr ==
2272                                 (unsigned long)nioptr->offset);
2273                 }
2274                 bh = page->buffers;
2275
2276                 LASSERT(bh);
2277
2278                 /* if buffer locked, wait it's io completion */
2279                 if (test_bit(BH_Lock, &bh->b_state))
2280                         wait_on_buffer(bh);
2281
2282                 clear_bit(BH_New, &bh->b_state);
2283                 set_bit(BH_Mapped, &bh->b_state);
2284
2285                 /* override the block nr */
2286                 bh->b_blocknr = (unsigned long)nioptr->offset;
2287
2288                 /* we are about to write it, so set it
2289                  * uptodate/dirty
2290                  * page lock should garentee no race condition here */
2291                 set_bit(BH_Uptodate, &bh->b_state);
2292                 set_bit(BH_Dirty, &bh->b_state);
2293
2294                 ll_rw_block(WRITE, 1, &bh);
2295
2296                 /* must do syncronous write here */
2297                 wait_on_buffer(bh);
2298                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2299                         /* I/O error */
2300                         rc = -EIO;
2301                         goto out_req;
2302                 }
2303         }
2304
2305 out_req:
2306         ptlrpc_req_finished(request);
2307         RETURN(rc);
2308 }
2309
2310 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2311                       struct lov_stripe_md *lsm, obd_count page_count,
2312                       struct brw_page *pga, struct obd_trans_info *oti)
2313 {
2314         ENTRY;
2315
2316         while (page_count) {
2317                 obd_count pages_per_brw;
2318                 int rc;
2319
2320                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2321                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2322                 else
2323                         pages_per_brw = page_count;
2324
2325                 if (cmd & OBD_BRW_WRITE)
2326                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2327                 else
2328                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2329
2330                 if (rc != 0)
2331                         RETURN(rc);
2332
2333                 page_count -= pages_per_brw;
2334                 pga += pages_per_brw;
2335         }
2336         RETURN(0);
2337 }
2338 #endif
2339 #endif
2340
2341 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
2342 {
2343         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2344
2345         if (lock == NULL) {
2346                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2347                 return;
2348         }
2349         l_lock(&lock->l_resource->lr_namespace->ns_lock);
2350 #ifdef __KERNEL__
2351         if (lock->l_ast_data && lock->l_ast_data != data) {
2352                 struct inode *new_inode = data;
2353                 struct inode *old_inode = lock->l_ast_data;
2354                 LASSERTF(old_inode->i_state & I_FREEING,
2355                          "Found existing inode %p/%lu/%u state %lu in lock: "
2356                          "setting data to %p/%lu/%u\n", old_inode,
2357                          old_inode->i_ino, old_inode->i_generation,
2358                          old_inode->i_state,
2359                          new_inode, new_inode->i_ino, new_inode->i_generation);
2360         }
2361 #endif
2362         lock->l_ast_data = data;
2363         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2364         LDLM_LOCK_PUT(lock);
2365 }
2366
2367 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2368                              ldlm_iterator_t replace, void *data)
2369 {
2370         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2371         struct obd_device *obd = class_exp2obd(exp);
2372
2373         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2374         return 0;
2375 }
2376
2377 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2378                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2379                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2380                        void *data, __u32 lvb_len, void *lvb_swabber,
2381                        struct lustre_handle *lockh)
2382 {
2383         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2384         struct obd_device *obd = exp->exp_obd;
2385         struct ost_lvb lvb;
2386         int rc;
2387         ENTRY;
2388
2389         /* Filesystem lock extents are extended to page boundaries so that
2390          * dealing with the page cache is a little smoother.  */
2391         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2392         policy->l_extent.end |= ~PAGE_MASK;
2393
2394         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2395                 goto no_match;
2396
2397         /* Next, search for already existing extent locks that will cover us */
2398         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2399                              lockh);
2400         if (rc == 1) {
2401                 osc_set_data_with_check(lockh, data);
2402                 if (*flags & LDLM_FL_HAS_INTENT) {
2403                         /* I would like to be able to ASSERT here that rss <=
2404                          * kms, but I can't, for reasons which are explained in
2405                          * lov_enqueue() */
2406                 }
2407                 /* We already have a lock, and it's referenced */
2408                 RETURN(ELDLM_OK);
2409         }
2410
2411         /* If we're trying to read, we also search for an existing PW lock.  The
2412          * VFS and page cache already protect us locally, so lots of readers/
2413          * writers can share a single PW lock.
2414          *
2415          * There are problems with conversion deadlocks, so instead of
2416          * converting a read lock to a write lock, we'll just enqueue a new
2417          * one.
2418          *
2419          * At some point we should cancel the read lock instead of making them
2420          * send us a blocking callback, but there are problems with canceling
2421          * locks out from other users right now, too. */
2422
2423         if (mode == LCK_PR) {
2424                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2425                                      policy, LCK_PW, lockh);
2426                 if (rc == 1) {
2427                         /* FIXME: This is not incredibly elegant, but it might
2428                          * be more elegant than adding another parameter to
2429                          * lock_match.  I want a second opinion. */
2430                         ldlm_lock_addref(lockh, LCK_PR);
2431                         ldlm_lock_decref(lockh, LCK_PW);
2432                         osc_set_data_with_check(lockh, data);
2433                         RETURN(ELDLM_OK);
2434                 }
2435         }
2436
2437  no_match:
2438         rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type,
2439                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2440                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2441
2442         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2443                 CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
2444                        lvb.lvb_size, lvb.lvb_blocks);
2445                 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2446                 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2447         }
2448
2449         RETURN(rc);
2450 }
2451
2452 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2453                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2454                      int *flags, void *data, struct lustre_handle *lockh)
2455 {
2456         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2457         struct obd_device *obd = exp->exp_obd;
2458         int rc;
2459         ENTRY;
2460
2461         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2462
2463         /* Filesystem lock extents are extended to page boundaries so that
2464          * dealing with the page cache is a little smoother */
2465         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2466         policy->l_extent.end |= ~PAGE_MASK;
2467
2468         /* Next, search for already existing extent locks that will cover us */
2469         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2470                              policy, mode, lockh);
2471         if (rc) {
2472                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2473                         osc_set_data_with_check(lockh, data);
2474                 RETURN(rc);
2475         }
2476         /* If we're trying to read, we also search for an existing PW lock.  The
2477          * VFS and page cache already protect us locally, so lots of readers/
2478          * writers can share a single PW lock. */
2479         if (mode == LCK_PR) {
2480                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2481                                      policy, LCK_PW, lockh);
2482                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2483                         /* FIXME: This is not incredibly elegant, but it might
2484                          * be more elegant than adding another parameter to
2485                          * lock_match.  I want a second opinion. */
2486                         osc_set_data_with_check(lockh, data);
2487                         ldlm_lock_addref(lockh, LCK_PR);
2488                         ldlm_lock_decref(lockh, LCK_PW);
2489                 }
2490         }
2491         RETURN(rc);
2492 }
2493
2494 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2495                       __u32 mode, struct lustre_handle *lockh)
2496 {
2497         ENTRY;
2498
2499         ldlm_lock_decref(lockh, mode);
2500
2501         RETURN(0);
2502 }
2503
2504 static int osc_cancel_unused(struct obd_export *exp,
2505                              struct lov_stripe_md *lsm, int flags, void *opaque)
2506 {
2507         struct obd_device *obd = class_exp2obd(exp);
2508         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2509
2510         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2511                                       opaque);
2512 }
2513
2514 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2515                       unsigned long max_age)
2516 {
2517         struct obd_statfs *msfs;
2518         struct ptlrpc_request *request;
2519         int rc, size = sizeof(*osfs);
2520         ENTRY;
2521
2522         /* We could possibly pass max_age in the request (as an absolute
2523          * timestamp or a "seconds.usec ago") so the target can avoid doing
2524          * extra calls into the filesystem if that isn't necessary (e.g.
2525          * during mount that would help a bit).  Having relative timestamps
2526          * is not so great if request processing is slow, while absolute
2527          * timestamps are not ideal because they need time synchronization. */
2528         request = ptlrpc_prep_req(obd->u.cli.cl_import, OST_STATFS,0,NULL,NULL);
2529         if (!request)
2530                 RETURN(-ENOMEM);
2531
2532         request->rq_replen = lustre_msg_size(1, &size);
2533         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2534
2535         rc = ptlrpc_queue_wait(request);
2536         if (rc)
2537                 GOTO(out, rc);
2538
2539         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2540                                   lustre_swab_obd_statfs);
2541         if (msfs == NULL) {
2542                 CERROR("Can't unpack obd_statfs\n");
2543                 GOTO(out, rc = -EPROTO);
2544         }
2545
2546         memcpy(osfs, msfs, sizeof(*osfs));
2547
2548         EXIT;
2549  out:
2550         ptlrpc_req_finished(request);
2551         return rc;
2552 }
2553
2554 /* Retrieve object striping information.
2555  *
2556  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2557  * the maximum number of OST indices which will fit in the user buffer.
2558  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2559  */
2560 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2561 {
2562         struct lov_user_md lum, *lumk;
2563         int rc, lum_size;
2564         ENTRY;
2565
2566         if (!lsm)
2567                 RETURN(-ENODATA);
2568
2569         rc = copy_from_user(&lum, lump, sizeof(lum));
2570         if (rc)
2571                 RETURN(-EFAULT);
2572
2573         if (lum.lmm_magic != LOV_USER_MAGIC)
2574                 RETURN(-EINVAL);
2575
2576         if (lum.lmm_stripe_count > 0) {
2577                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2578                 OBD_ALLOC(lumk, lum_size);
2579                 if (!lumk)
2580                         RETURN(-ENOMEM);
2581
2582                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2583         } else {
2584                 lum_size = sizeof(lum);
2585                 lumk = &lum;
2586         }
2587
2588         lumk->lmm_object_id = lsm->lsm_object_id;
2589         lumk->lmm_stripe_count = 1;
2590
2591         if (copy_to_user(lump, lumk, lum_size))
2592                 rc = -EFAULT;
2593
2594         if (lumk != &lum)
2595                 OBD_FREE(lumk, lum_size);
2596
2597         RETURN(rc);
2598 }
2599
2600 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2601                          void *karg, void *uarg)
2602 {
2603         struct obd_device *obd = exp->exp_obd;
2604         struct obd_ioctl_data *data = karg;
2605         int err = 0;
2606         ENTRY;
2607
2608 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2609         MOD_INC_USE_COUNT;
2610 #else
2611         if (!try_module_get(THIS_MODULE)) {
2612                 CERROR("Can't get module. Is it alive?");
2613                 return -EINVAL;
2614         }
2615 #endif
2616         switch (cmd) {
2617         case OBD_IOC_LOV_GET_CONFIG: {
2618                 char *buf;
2619                 struct lov_desc *desc;
2620                 struct obd_uuid uuid;
2621
2622                 buf = NULL;
2623                 len = 0;
2624                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2625                         GOTO(out, err = -EINVAL);
2626
2627                 data = (struct obd_ioctl_data *)buf;
2628
2629                 if (sizeof(*desc) > data->ioc_inllen1) {
2630                         OBD_FREE(buf, len);
2631                         GOTO(out, err = -EINVAL);
2632                 }
2633
2634                 if (data->ioc_inllen2 < sizeof(uuid)) {
2635                         OBD_FREE(buf, len);
2636                         GOTO(out, err = -EINVAL);
2637                 }
2638
2639                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2640                 desc->ld_tgt_count = 1;
2641                 desc->ld_active_tgt_count = 1;
2642                 desc->ld_default_stripe_count = 1;
2643                 desc->ld_default_stripe_size = 0;
2644                 desc->ld_default_stripe_offset = 0;
2645                 desc->ld_pattern = 0;
2646                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2647
2648                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2649
2650                 err = copy_to_user((void *)uarg, buf, len);
2651                 if (err)
2652                         err = -EFAULT;
2653                 obd_ioctl_freedata(buf, len);
2654                 GOTO(out, err);
2655         }
2656         case LL_IOC_LOV_SETSTRIPE:
2657                 err = obd_alloc_memmd(exp, karg);
2658                 if (err > 0)
2659                         err = 0;
2660                 GOTO(out, err);
2661         case LL_IOC_LOV_GETSTRIPE:
2662                 err = osc_getstripe(karg, uarg);
2663                 GOTO(out, err);
2664         case OBD_IOC_CLIENT_RECOVER:
2665                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2666                                             data->ioc_inlbuf1);
2667                 if (err > 0)
2668                         err = 0;
2669                 GOTO(out, err);
2670         case IOC_OSC_SET_ACTIVE:
2671                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2672                                                data->ioc_offset);
2673                 GOTO(out, err);
2674         default:
2675                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm);
2676                 GOTO(out, err = -ENOTTY);
2677         }
2678 out:
2679 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2680         MOD_DEC_USE_COUNT;
2681 #else
2682         module_put(THIS_MODULE);
2683 #endif
2684         return err;
2685 }
2686
2687 static int osc_get_info(struct obd_export *exp, obd_count keylen,
2688                         void *key, __u32 *vallen, void *val)
2689 {
2690         ENTRY;
2691         if (!vallen || !val)
2692                 RETURN(-EFAULT);
2693
2694         if (keylen > strlen("lock_to_stripe") &&
2695             strcmp(key, "lock_to_stripe") == 0) {
2696                 __u32 *stripe = val;
2697                 *vallen = sizeof(*stripe);
2698                 *stripe = 0;
2699                 RETURN(0);
2700         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
2701                 struct ptlrpc_request *req;
2702                 obd_id *reply;
2703                 char *bufs[1] = {key};
2704                 int rc;
2705                 req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
2706                                       &keylen, bufs);
2707                 if (req == NULL)
2708                         RETURN(-ENOMEM);
2709
2710                 req->rq_replen = lustre_msg_size(1, vallen);
2711                 rc = ptlrpc_queue_wait(req);
2712                 if (rc)
2713                         GOTO(out, rc);
2714
2715                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
2716                                            lustre_swab_ost_last_id);
2717                 if (reply == NULL) {
2718                         CERROR("Can't unpack OST last ID\n");
2719                         GOTO(out, rc = -EPROTO);
2720                 }
2721                 *((obd_id *)val) = *reply;
2722         out:
2723                 ptlrpc_req_finished(req);
2724                 RETURN(rc);
2725         }
2726         RETURN(-EINVAL);
2727 }
2728
2729 static int osc_set_info(struct obd_export *exp, obd_count keylen,
2730                         void *key, obd_count vallen, void *val)
2731 {
2732         struct ptlrpc_request *req;
2733         struct obd_device  *obd = exp->exp_obd;
2734         struct obd_import *imp = class_exp2cliimp(exp);
2735         struct llog_ctxt *ctxt;
2736         int rc, size = keylen;
2737         char *bufs[1] = {key};
2738         ENTRY;
2739
2740         if (keylen == strlen("next_id") &&
2741             memcmp(key, "next_id", strlen("next_id")) == 0) {
2742                 if (vallen != sizeof(obd_id))
2743                         RETURN(-EINVAL);
2744                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
2745                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
2746                        exp->exp_obd->obd_name,
2747                        obd->u.cli.cl_oscc.oscc_next_id);
2748
2749                 RETURN(0);
2750         }
2751
2752         if (keylen == strlen("growth_count") &&
2753             memcmp(key, "growth_count", strlen("growth_count")) == 0) {
2754                 if (vallen != sizeof(int))
2755                         RETURN(-EINVAL);
2756                 obd->u.cli.cl_oscc.oscc_grow_count = *((int*)val);
2757                 RETURN(0);
2758         }
2759
2760         if (keylen == strlen("unlinked") &&
2761             memcmp(key, "unlinked", keylen) == 0) {
2762                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2763                 spin_lock(&oscc->oscc_lock);
2764                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
2765                 spin_unlock(&oscc->oscc_lock);
2766                 RETURN(0);
2767         }
2768
2769
2770         if (keylen == strlen("initial_recov") &&
2771             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
2772                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2773                 if (vallen != sizeof(int))
2774                         RETURN(-EINVAL);
2775                 imp->imp_initial_recov = *(int *)val;
2776                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
2777                        exp->exp_obd->obd_name,
2778                        imp->imp_initial_recov);
2779                 RETURN(0);
2780         }
2781
2782         if (keylen < strlen("mds_conn") ||
2783             memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
2784                 RETURN(-EINVAL);
2785
2786
2787         req = ptlrpc_prep_req(imp, OST_SET_INFO, 1, &size, bufs);
2788         if (req == NULL)
2789                 RETURN(-ENOMEM);
2790
2791         req->rq_replen = lustre_msg_size(0, NULL);
2792         rc = ptlrpc_queue_wait(req);
2793         ptlrpc_req_finished(req);
2794
2795         ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_ORIG_CTXT);
2796         if (ctxt) {
2797                 rc = llog_initiator_connect(ctxt);
2798                 if (rc)
2799                         RETURN(rc);
2800         }
2801
2802         imp->imp_server_timeout = 1;
2803         CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
2804         imp->imp_pingable = 1;
2805
2806         RETURN(rc);
2807 }
2808
2809
2810 static struct llog_operations osc_size_repl_logops = {
2811         lop_cancel: llog_obd_repl_cancel
2812 };
2813
2814 static struct llog_operations osc_unlink_orig_logops;
2815 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
2816                         int count, struct llog_catid *catid)
2817 {
2818         int rc;
2819         ENTRY;
2820
2821         osc_unlink_orig_logops = llog_lvfs_ops;
2822         osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup;
2823         osc_unlink_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
2824         osc_unlink_orig_logops.lop_add = llog_obd_origin_add;
2825         osc_unlink_orig_logops.lop_connect = llog_origin_connect;
2826
2827         rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, count,
2828                         &catid->lci_logid, &osc_unlink_orig_logops);
2829         if (rc)
2830                 RETURN(rc);
2831
2832         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
2833                         &osc_size_repl_logops);
2834         RETURN(rc);
2835 }
2836
2837 static int osc_llog_finish(struct obd_device *obd, int count)
2838 {
2839         int rc;
2840         ENTRY;
2841
2842         rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT));
2843         if (rc)
2844                 RETURN(rc);
2845
2846         rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_REPL_CTXT));
2847         RETURN(rc);
2848 }
2849
2850
2851 static int osc_connect(struct lustre_handle *exph,
2852                        struct obd_device *obd, struct obd_uuid *cluuid)
2853 {
2854         int rc;
2855
2856         rc = client_connect_import(exph, obd, cluuid);
2857
2858         return rc;
2859 }
2860
2861 static int osc_disconnect(struct obd_export *exp, int flags)
2862 {
2863         struct obd_device *obd = class_exp2obd(exp);
2864         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2865         int rc;
2866
2867         if (obd->u.cli.cl_conn_count == 1)
2868                 /* flush any remaining cancel messages out to the target */
2869                 llog_sync(ctxt, exp);
2870
2871         rc = client_disconnect_export(exp, flags);
2872         return rc;
2873 }
2874
2875 static int osc_import_event(struct obd_device *obd,
2876                             struct obd_import *imp, 
2877                             enum obd_import_event event)
2878 {
2879         struct client_obd *cli;
2880         int rc = 0;
2881
2882         LASSERT(imp->imp_obd == obd);
2883
2884         switch (event) {
2885         case IMP_EVENT_DISCON: {
2886                 /* Only do this on the MDS OSC's */
2887                 if (imp->imp_server_timeout) {
2888                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
2889                         
2890                         spin_lock(&oscc->oscc_lock);
2891                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
2892                         spin_unlock(&oscc->oscc_lock);
2893                 }
2894                 break;
2895         }
2896         case IMP_EVENT_INACTIVE: {
2897                 if (obd->obd_observer)
2898                         rc = obd_notify(obd->obd_observer, obd, 0);
2899                 break;
2900         }
2901         case IMP_EVENT_INVALIDATE: {
2902                 struct ldlm_namespace *ns = obd->obd_namespace;
2903
2904                 /* Reset grants */
2905                 cli = &obd->u.cli;
2906                 spin_lock(&cli->cl_loi_list_lock);
2907                 cli->cl_avail_grant = 0;
2908                 cli->cl_lost_grant = 0;
2909                 /* all pages go to failing rpcs due to the invalid import */
2910                 osc_check_rpcs(cli);
2911                 spin_unlock(&cli->cl_loi_list_lock);
2912                 
2913                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2914
2915                 break;
2916         }
2917         case IMP_EVENT_ACTIVE: {
2918                 if (obd->obd_observer)
2919                         rc = obd_notify(obd->obd_observer, obd, 1);
2920                 break;
2921         }
2922         default:
2923                 CERROR("Unknown import event %d\n", event);
2924                 LBUG();
2925         }
2926         RETURN(rc);
2927 }
2928
2929 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
2930 {
2931         int rc;
2932
2933         rc = ptlrpcd_addref();
2934         if (rc)
2935                 return rc;
2936
2937         rc = client_obd_setup(obd, len, buf);
2938         if (rc) {
2939                 ptlrpcd_decref();
2940         } else {
2941                 struct lprocfs_static_vars lvars;
2942
2943                 lprocfs_init_vars(osc, &lvars);
2944                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
2945                         lproc_osc_attach_seqstat(obd);
2946                         ptlrpc_lprocfs_register_obd(obd);
2947                 }
2948
2949                 oscc_init(obd);
2950         }
2951
2952         RETURN(rc);
2953 }
2954
2955 int osc_cleanup(struct obd_device *obd, int flags)
2956 {
2957         int rc;
2958
2959         ptlrpc_lprocfs_unregister_obd(obd);
2960         lprocfs_obd_cleanup(obd);
2961
2962         rc = client_obd_cleanup(obd, flags);
2963         ptlrpcd_decref();
2964         RETURN(rc);
2965 }
2966
2967
2968 struct obd_ops osc_obd_ops = {
2969         .o_owner                = THIS_MODULE,
2970         .o_setup                = osc_setup,
2971         .o_cleanup              = osc_cleanup,
2972         .o_connect              = osc_connect,
2973         .o_disconnect           = osc_disconnect,
2974         .o_statfs               = osc_statfs,
2975         .o_packmd               = osc_packmd,
2976         .o_unpackmd             = osc_unpackmd,
2977         .o_create               = osc_create,
2978         .o_destroy              = osc_destroy,
2979         .o_getattr              = osc_getattr,
2980         .o_getattr_async        = osc_getattr_async,
2981         .o_setattr              = osc_setattr,
2982         .o_brw                  = osc_brw,
2983         .o_brw_async            = osc_brw_async,
2984         .o_prep_async_page      = osc_prep_async_page,
2985         .o_queue_async_io       = osc_queue_async_io,
2986         .o_set_async_flags      = osc_set_async_flags,
2987         .o_queue_group_io       = osc_queue_group_io,
2988         .o_trigger_group_io     = osc_trigger_group_io,
2989         .o_teardown_async_page  = osc_teardown_async_page,
2990         .o_punch                = osc_punch,
2991         .o_sync                 = osc_sync,
2992         .o_enqueue              = osc_enqueue,
2993         .o_match                = osc_match,
2994         .o_change_cbdata        = osc_change_cbdata,
2995         .o_cancel               = osc_cancel,
2996         .o_cancel_unused        = osc_cancel_unused,
2997         .o_iocontrol            = osc_iocontrol,
2998         .o_get_info             = osc_get_info,
2999         .o_set_info             = osc_set_info,
3000         .o_import_event         = osc_import_event,
3001         .o_llog_init            = osc_llog_init,
3002         .o_llog_finish          = osc_llog_finish,
3003 };
3004
3005 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3006 struct obd_ops sanosc_obd_ops = {
3007         .o_owner                = THIS_MODULE,
3008         .o_cleanup              = client_obd_cleanup,
3009         .o_connect              = osc_connect,
3010         .o_disconnect           = client_disconnect_export,
3011         .o_statfs               = osc_statfs,
3012         .o_packmd               = osc_packmd,
3013         .o_unpackmd             = osc_unpackmd,
3014         .o_create               = osc_real_create,
3015         .o_destroy              = osc_destroy,
3016         .o_getattr              = osc_getattr,
3017         .o_getattr_async        = osc_getattr_async,
3018         .o_setattr              = osc_setattr,
3019         .o_setup                = client_sanobd_setup,
3020         .o_brw                  = sanosc_brw,
3021         .o_punch                = osc_punch,
3022         .o_sync                 = osc_sync,
3023         .o_enqueue              = osc_enqueue,
3024         .o_match                = osc_match,
3025         .o_change_cbdata        = osc_change_cbdata,
3026         .o_cancel               = osc_cancel,
3027         .o_cancel_unused        = osc_cancel_unused,
3028         .o_iocontrol            = osc_iocontrol,
3029         .o_import_event         = osc_import_event,
3030         .o_llog_init            = osc_llog_init,
3031         .o_llog_finish          = osc_llog_finish,
3032 };
3033 #endif
3034
3035 int __init osc_init(void)
3036 {
3037         struct lprocfs_static_vars lvars;
3038 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3039         struct lprocfs_static_vars sanlvars;
3040 #endif
3041         int rc;
3042         ENTRY;
3043
3044         lprocfs_init_vars(osc, &lvars);
3045 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3046         lprocfs_init_vars(osc, &sanlvars);
3047 #endif
3048
3049         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3050                                  LUSTRE_OSC_NAME);
3051         if (rc)
3052                 RETURN(rc);
3053
3054 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3055         rc = class_register_type(&sanosc_obd_ops, sanlvars.module_vars,
3056                                  LUSTRE_SANOSC_NAME);
3057         if (rc)
3058                 class_unregister_type(LUSTRE_OSC_NAME);
3059 #endif
3060
3061         RETURN(rc);
3062 }
3063
3064 #ifdef __KERNEL__
3065 static void /*__exit*/ osc_exit(void)
3066 {
3067 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3068         class_unregister_type(LUSTRE_SANOSC_NAME);
3069 #endif
3070         class_unregister_type(LUSTRE_OSC_NAME);
3071 }
3072
3073 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3074 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3075 MODULE_LICENSE("GPL");
3076
3077 module_init(osc_init);
3078 module_exit(osc_exit);
3079 #endif