Whamcloud - gitweb
file ext3-largefile.diff was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Andreas Dilger <adilger@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25
26 #include <linux/version.h>
27 #include <linux/module.h>
28 #include <linux/mm.h>
29 #include <linux/highmem.h>
30 #include <linux/fs.h>
31 #include <linux/stat.h>
32 #include <linux/sched.h>
33 #include <linux/smp_lock.h>
34 #include <linux/ext2_fs.h>
35 #include <linux/quotaops.h>
36 #include <linux/proc_fs.h>
37 #include <linux/init.h>
38 #include <asm/unistd.h>
39
40 #define DEBUG_SUBSYSTEM S_ECHO
41
42 #include <linux/obd_support.h>
43 #include <linux/obd_class.h>
44 #include <linux/obd_echo.h>
45 #include <linux/lustre_debug.h>
46 #include <linux/lustre_dlm.h>
47 #include <linux/lprocfs_status.h>
48
49 #define ECHO_INIT_OBJID      0x1000000000000000ULL
50 #define ECHO_HANDLE_MAGIC    0xabcd0123fedc9876ULL
51
52 #define ECHO_OBJECT0_NPAGES  16
53 static struct page *echo_object0_pages[ECHO_OBJECT0_NPAGES];
54
55 /* should be generic per-obd stats... */
56 struct xprocfs_io_stat {
57         __u64    st_read_bytes;
58         __u64    st_read_reqs;
59         __u64    st_write_bytes;
60         __u64    st_write_reqs;
61         __u64    st_getattr_reqs;
62         __u64    st_setattr_reqs;
63         __u64    st_create_reqs;
64         __u64    st_destroy_reqs;
65         __u64    st_statfs_reqs;
66         __u64    st_open_reqs;
67         __u64    st_close_reqs;
68         __u64    st_punch_reqs;
69 };
70
71 static struct xprocfs_io_stat xprocfs_iostats[NR_CPUS];
72 static struct proc_dir_entry *xprocfs_dir;
73
74 #define XPROCFS_BUMP_MYCPU_IOSTAT(field, count)                 \
75 do {                                                            \
76         xprocfs_iostats[smp_processor_id()].field += (count);   \
77 } while (0)
78
79 #define DECLARE_XPROCFS_SUM_STAT(field)                 \
80 static long long                                        \
81 xprocfs_sum_##field (void)                              \
82 {                                                       \
83         long long stat = 0;                             \
84         int       i;                                    \
85                                                         \
86         for (i = 0; i < smp_num_cpus; i++)              \
87                 stat += xprocfs_iostats[i].field;       \
88         return (stat);                                  \
89 }
90
91 DECLARE_XPROCFS_SUM_STAT (st_read_bytes)
92 DECLARE_XPROCFS_SUM_STAT (st_read_reqs)
93 DECLARE_XPROCFS_SUM_STAT (st_write_bytes)
94 DECLARE_XPROCFS_SUM_STAT (st_write_reqs)
95 DECLARE_XPROCFS_SUM_STAT (st_getattr_reqs)
96 DECLARE_XPROCFS_SUM_STAT (st_setattr_reqs)
97 DECLARE_XPROCFS_SUM_STAT (st_create_reqs)
98 DECLARE_XPROCFS_SUM_STAT (st_destroy_reqs)
99 DECLARE_XPROCFS_SUM_STAT (st_statfs_reqs)
100 DECLARE_XPROCFS_SUM_STAT (st_open_reqs)
101 DECLARE_XPROCFS_SUM_STAT (st_close_reqs)
102 DECLARE_XPROCFS_SUM_STAT (st_punch_reqs)
103
104 static int
105 xprocfs_rd_stat (char *page, char **start, off_t off, int count,
106                  int  *eof, void *data)
107 {
108         long long (*fn)(void) = (long long(*)(void))data;
109         int         len;
110         
111         *eof = 1;
112         if (off != 0)
113                 return (0);
114
115         len = snprintf (page, count, "%Ld\n", fn());
116         *start = page;
117         return (len);
118 }
119         
120
121 static void
122 xprocfs_add_stat(char *name, long long (*fn)(void))
123 {
124         struct proc_dir_entry *entry;
125
126         entry = create_proc_entry (name, S_IFREG|S_IRUGO, xprocfs_dir);
127         if (entry == NULL) {
128                 CERROR ("Can't add procfs stat %s\n", name);
129                 return;
130         }
131
132         entry->data = fn;
133         entry->read_proc = xprocfs_rd_stat;
134         entry->write_proc = NULL;
135 }
136
137 static void
138 xprocfs_init (char *name)
139 {
140         char  dirname[64];
141         
142         snprintf (dirname, sizeof (dirname), "sys/%s", name);
143
144         xprocfs_dir = proc_mkdir (dirname, NULL);
145         if (xprocfs_dir == NULL) {
146                 CERROR ("Can't make dir\n");
147                 return;
148         }
149
150         xprocfs_add_stat ("read_bytes",   xprocfs_sum_st_read_bytes);
151         xprocfs_add_stat ("read_reqs",    xprocfs_sum_st_read_reqs);
152         xprocfs_add_stat ("write_bytes",  xprocfs_sum_st_write_bytes);
153         xprocfs_add_stat ("write_reqs",   xprocfs_sum_st_write_reqs);
154         xprocfs_add_stat ("getattr_reqs", xprocfs_sum_st_getattr_reqs);
155         xprocfs_add_stat ("setattr_reqs", xprocfs_sum_st_setattr_reqs);
156         xprocfs_add_stat ("create_reqs",  xprocfs_sum_st_create_reqs);
157         xprocfs_add_stat ("destroy_reqs", xprocfs_sum_st_destroy_reqs);
158         xprocfs_add_stat ("statfs_reqs",  xprocfs_sum_st_statfs_reqs);
159         xprocfs_add_stat ("open_reqs",    xprocfs_sum_st_open_reqs);
160         xprocfs_add_stat ("close_reqs",   xprocfs_sum_st_close_reqs);
161         xprocfs_add_stat ("punch_reqs",   xprocfs_sum_st_punch_reqs);
162 }
163
164 void xprocfs_fini (void)
165 {
166         if (xprocfs_dir == NULL)
167                 return;
168
169         remove_proc_entry ("read_bytes",   xprocfs_dir);
170         remove_proc_entry ("read_reqs",    xprocfs_dir);
171         remove_proc_entry ("write_bytes",  xprocfs_dir);
172         remove_proc_entry ("write_reqs",   xprocfs_dir);
173         remove_proc_entry ("getattr_reqs", xprocfs_dir);
174         remove_proc_entry ("setattr_reqs", xprocfs_dir);
175         remove_proc_entry ("create_reqs",  xprocfs_dir);
176         remove_proc_entry ("destroy_reqs", xprocfs_dir);
177         remove_proc_entry ("statfs_reqs",  xprocfs_dir);
178         remove_proc_entry ("open_reqs",    xprocfs_dir);
179         remove_proc_entry ("close_reqs",   xprocfs_dir);
180         remove_proc_entry ("punch_reqs",   xprocfs_dir);
181
182         remove_proc_entry (xprocfs_dir->name, xprocfs_dir->parent);
183         xprocfs_dir = NULL;
184 }
185
186 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
187                         struct obd_uuid *cluuid, struct recovd_obd *recovd,
188                         ptlrpc_recovery_cb_t recover)
189 {
190         return class_connect(conn, obd, cluuid);
191 }
192
193 static int echo_disconnect(struct lustre_handle *conn)
194 {
195         struct obd_export *exp = class_conn2export(conn);
196         
197         LASSERT (exp != NULL);
198         
199         ldlm_cancel_locks_for_export (exp);
200         return (class_disconnect (conn));
201 }
202
203 static __u64 echo_next_id(struct obd_device *obddev)
204 {
205         obd_id id;
206
207         spin_lock(&obddev->u.echo.eo_lock);
208         id = ++obddev->u.echo.eo_lastino;
209         spin_unlock(&obddev->u.echo.eo_lock);
210
211         return id;
212 }
213
214 int echo_create(struct lustre_handle *conn, struct obdo *oa,
215                 struct lov_stripe_md **ea, struct obd_trans_info *oti)
216 {
217         struct obd_device *obd = class_conn2obd(conn);
218
219         XPROCFS_BUMP_MYCPU_IOSTAT (st_create_reqs, 1);
220
221         if (!obd) {
222                 CERROR("invalid client "LPX64"\n", conn->addr);
223                 return -EINVAL;
224         }
225
226         if (!(oa->o_mode && S_IFMT)) {
227                 CERROR("echo obd: no type!\n");
228                 return -ENOENT;
229         }
230
231         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
232                 CERROR("invalid o_valid %08x\n", oa->o_valid);
233                 return -EINVAL;
234         }
235
236         oa->o_id = echo_next_id(obd);
237         oa->o_valid = OBD_MD_FLID;
238         atomic_inc(&obd->u.echo.eo_create);
239
240         return 0;
241 }
242
243 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
244                  struct lov_stripe_md *ea, struct obd_trans_info *oti)
245 {
246         struct obd_device *obd = class_conn2obd(conn);
247
248         XPROCFS_BUMP_MYCPU_IOSTAT (st_destroy_reqs, 1);
249
250         if (!obd) {
251                 CERROR("invalid client "LPX64"\n", conn->addr);
252                 RETURN(-EINVAL);
253         }
254
255         if (!(oa->o_valid & OBD_MD_FLID)) {
256                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
257                 RETURN(-EINVAL);
258         }
259
260         if (oa->o_id > obd->u.echo.eo_lastino || oa->o_id < ECHO_INIT_OBJID) {
261                 CERROR("bad destroy objid: "LPX64"\n", oa->o_id);
262                 RETURN(-EINVAL);
263         }
264
265         atomic_inc(&obd->u.echo.eo_destroy);
266
267         return 0;
268 }
269
270 static int echo_open(struct lustre_handle *conn, struct obdo *oa,
271                      struct lov_stripe_md *md, struct obd_trans_info *oti)
272 {
273         struct lustre_handle *fh = obdo_handle (oa);
274         struct obd_device    *obd = class_conn2obd (conn);
275
276         XPROCFS_BUMP_MYCPU_IOSTAT (st_open_reqs, 1);
277
278         if (!obd) {
279                 CERROR ("invalid client "LPX64"\n", conn->addr);
280                 return (-EINVAL);
281         }
282
283         if (!(oa->o_valid & OBD_MD_FLID)) {
284                 CERROR ("obdo missing FLID valid flag: %08x\n", oa->o_valid);
285                 return (-EINVAL);
286         }
287
288         fh->addr = oa->o_id;
289         fh->cookie = ECHO_HANDLE_MAGIC;
290         
291         oa->o_valid |= OBD_MD_FLHANDLE;
292         return 0;
293 }
294
295 static int echo_close(struct lustre_handle *conn, struct obdo *oa,
296                       struct lov_stripe_md *md, struct obd_trans_info *oti)
297 {
298         struct lustre_handle *fh = obdo_handle (oa);
299         struct obd_device    *obd = class_conn2obd(conn);
300
301         XPROCFS_BUMP_MYCPU_IOSTAT (st_close_reqs, 1);
302
303         if (!obd) {
304                 CERROR("invalid client "LPX64"\n", conn->addr);
305                 return (-EINVAL);
306         }
307
308         if (!(oa->o_valid & OBD_MD_FLHANDLE)) {
309                 CERROR("obdo missing FLHANDLE valid flag: %08x\n", oa->o_valid);
310                 return (-EINVAL);
311         }
312
313         if (fh->cookie != ECHO_HANDLE_MAGIC) {
314                 CERROR ("invalid file handle on close: "LPX64"\n", fh->cookie);
315                 return (-EINVAL);
316         }
317         
318         return 0;
319 }
320
321 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
322                         struct lov_stripe_md *md)
323 {
324         struct obd_device *obd = class_conn2obd(conn);
325         obd_id id = oa->o_id;
326
327         XPROCFS_BUMP_MYCPU_IOSTAT (st_getattr_reqs, 1);
328         
329         if (!obd) {
330                 CERROR("invalid client "LPX64"\n", conn->addr);
331                 RETURN(-EINVAL);
332         }
333
334         if (!(oa->o_valid & OBD_MD_FLID)) {
335                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
336                 RETURN(-EINVAL);
337         }
338
339         obdo_cpy_md(oa, &obd->u.echo.oa, oa->o_valid);
340         oa->o_id = id;
341
342         return 0;
343 }
344
345 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
346                         struct lov_stripe_md *md, struct obd_trans_info *oti)
347 {
348         struct obd_device *obd = class_conn2obd(conn);
349
350         XPROCFS_BUMP_MYCPU_IOSTAT (st_setattr_reqs, 1);
351         
352         if (!obd) {
353                 CERROR("invalid client "LPX64"\n", conn->addr);
354                 RETURN(-EINVAL);
355         }
356
357         if (!(oa->o_valid & OBD_MD_FLID)) {
358                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
359                 RETURN(-EINVAL);
360         }
361
362         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
363
364         atomic_inc(&obd->u.echo.eo_setattr);
365
366         return 0;
367 }
368
369 /* This allows us to verify that desc_private is passed unmolested */
370 #define DESC_PRIV 0x10293847
371
372 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
373                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
374                 struct niobuf_local *res, void **desc_private, struct obd_trans_info *oti)
375 {
376         struct obd_device *obd;
377         struct niobuf_local *r = res;
378         int rc = 0;
379         int i;
380         ENTRY;
381
382         if ((cmd & OBD_BRW_WRITE) != 0)
383                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_reqs, 1);
384         else
385                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_reqs, 1);
386
387         obd = class_conn2obd(conn);
388         if (!obd) {
389                 CERROR("invalid client "LPX64"\n", conn->addr);
390                 RETURN(-EINVAL);
391         }
392
393         memset(res, 0, sizeof(*res) * niocount);
394
395         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
396                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
397
398         *desc_private = (void *)DESC_PRIV;
399
400         obd_kmap_get(niocount, 1);
401
402         for (i = 0; i < objcount; i++, obj++) {
403                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
404                 int isobj0 = obj->ioo_id == 0;
405                 int verify = !isobj0;
406                 int j;
407
408                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
409
410                         if (isobj0 &&
411                             (nb->offset >> PAGE_SHIFT) < ECHO_OBJECT0_NPAGES) {
412                                 r->page = echo_object0_pages[nb->offset >> PAGE_SHIFT];
413                                 /* Take extra ref so __free_pages() can be called OK */
414                                 get_page (r->page);
415                         } else {
416                                 r->page = alloc_pages(gfp_mask, 0);
417                                 if (r->page == NULL) {
418                                         CERROR("can't get page %d/%d for id "LPU64"\n",
419                                                j, obj->ioo_bufcnt, obj->ioo_id);
420                                         GOTO(preprw_cleanup, rc = -ENOMEM);
421                                 }
422                         }
423
424                         atomic_inc(&obd->u.echo.eo_prep);
425
426                         r->offset = nb->offset;
427                         r->addr = kmap(r->page);
428                         r->len = nb->len;
429
430                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
431                                r->page, r->addr, r->offset);
432
433                         if (cmd == OBD_BRW_READ) {
434                                 XPROCFS_BUMP_MYCPU_IOSTAT (st_read_bytes, r->len);
435                                 if (verify)
436                                         page_debug_setup(r->addr, r->len, r->offset,
437                                                          obj->ioo_id);
438                         } else {
439                                 XPROCFS_BUMP_MYCPU_IOSTAT (st_write_bytes, r->len);
440                                 if (verify)
441                                         page_debug_setup(r->addr, r->len,
442                                                          0xecc0ecc0ecc0ecc0,
443                                                          0xecc0ecc0ecc0ecc0);
444                         }
445                 }
446         }
447         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
448                atomic_read(&obd->u.echo.eo_prep));
449
450         RETURN(0);
451
452 preprw_cleanup:
453         /* It is possible that we would rather handle errors by  allow
454          * any already-set-up pages to complete, rather than tearing them
455          * all down again.  I believe that this is what the in-kernel
456          * prep/commit operations do.
457          */
458         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
459         while (r-- > res) {
460                 kunmap(r->page);
461                 /* NB if this is an 'object0' page, __free_pages will just
462                  * lose the extra ref gained above */
463                 __free_pages(r->page, 0);
464                 atomic_dec(&obd->u.echo.eo_prep);
465         }
466         obd_kmap_put(niocount);
467         memset(res, 0, sizeof(*res) * niocount);
468
469         return rc;
470 }
471
472 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
473                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
474                   void *desc_private, struct obd_trans_info *oti)
475 {
476         struct obd_device *obd;
477         struct niobuf_local *r = res;
478         int rc = 0;
479         int vrc = 0;
480         int i;
481         ENTRY;
482
483         obd = class_conn2obd(conn);
484         if (!obd) {
485                 CERROR("invalid client "LPX64"\n", conn->addr);
486                 RETURN(-EINVAL);
487         }
488
489         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
490                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
491                        objcount, niocount);
492         } else {
493                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
494                        objcount, niocount);
495         }
496
497         if (niocount && !r) {
498                 CERROR("NULL res niobuf with niocount %d\n", niocount);
499                 RETURN(-EINVAL);
500         }
501
502         LASSERT(desc_private == (void *)DESC_PRIV);
503
504         for (i = 0; i < objcount; i++, obj++) {
505                 int verify = obj->ioo_id != 0;
506                 int j;
507
508                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
509                         struct page *page = r->page;
510                         void *addr;
511
512                         if (!page || !(addr = page_address(page)) ||
513                             !kern_addr_valid(addr)) {
514
515                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
516                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
517                                 GOTO(commitrw_cleanup, rc = -EFAULT);
518                         }
519
520                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
521                                r->page, addr, r->offset);
522
523                         if (verify) {
524                                 vrc = page_debug_check("echo", addr, r->len,
525                                                        r->offset, obj->ioo_id);
526                                 /* check all the pages always */
527                                 if (vrc != 0 && rc == 0)
528                                         rc = vrc;
529                         }
530                         
531                         kunmap(page);
532                         /* NB see comment above regarding object0 pages */
533                         obd_kmap_put(1);
534                         __free_pages(page, 0);
535                         atomic_dec(&obd->u.echo.eo_prep);
536                 }
537         }
538         CDEBUG(D_PAGE, "%d pages remain after commit\n",
539                atomic_read(&obd->u.echo.eo_prep));
540         RETURN(rc);
541
542 commitrw_cleanup:
543         CERROR("cleaning up %ld pages (%d obdos)\n",
544                niocount - (long)(r - res) - 1, objcount);
545         while (++r < res + niocount) {
546                 struct page *page = r->page;
547
548                 kunmap(page);
549                 obd_kmap_put(1);
550                 /* NB see comment above regarding object0 pages */
551                 __free_pages(page, 0);
552                 atomic_dec(&obd->u.echo.eo_prep);
553         }
554         return rc;
555 }
556
557 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
558 {
559         ENTRY;
560
561         spin_lock_init(&obddev->u.echo.eo_lock);
562         obddev->u.echo.eo_lastino = ECHO_INIT_OBJID;
563
564         obddev->obd_namespace =
565                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
566         if (obddev->obd_namespace == NULL) {
567                 LBUG();
568                 RETURN(-ENOMEM);
569         }
570
571         ptlrpc_init_client (LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
572                             "echo_ldlm_cb_client", &obddev->obd_ldlm_client);
573         RETURN(0);
574 }
575
576 static int echo_cleanup(struct obd_device *obddev)
577 {
578         ENTRY;
579
580         ldlm_namespace_free(obddev->obd_namespace);
581         CERROR("%d prep/commitrw pages leaked\n",
582                atomic_read(&obddev->u.echo.eo_prep));
583
584         RETURN(0);
585 }
586
587 int echo_attach(struct obd_device *dev, obd_count len, void *data)
588 {
589         struct lprocfs_static_vars lvars;
590
591         lprocfs_init_vars(&lvars);
592         return lprocfs_obd_attach(dev, lvars.obd_vars);
593 }
594
595 int echo_detach(struct obd_device *dev)
596 {
597         return lprocfs_obd_detach(dev);
598 }
599
600 static struct obd_ops echo_obd_ops = {
601         o_owner:       THIS_MODULE,
602         o_attach:      echo_attach,
603         o_detach:      echo_detach,
604         o_connect:     echo_connect,
605         o_disconnect:  echo_disconnect,
606         o_create:      echo_create,
607         o_destroy:     echo_destroy,
608         o_open:        echo_open,
609         o_close:       echo_close,
610         o_getattr:     echo_getattr,
611         o_setattr:     echo_setattr,
612         o_preprw:      echo_preprw,
613         o_commitrw:    echo_commitrw,
614         o_setup:       echo_setup,
615         o_cleanup:     echo_cleanup
616 };
617
618 extern int echo_client_init(void);
619 extern void echo_client_cleanup(void);
620
621 static void
622 echo_object0_pages_fini (void) 
623 {
624         int     i;
625         
626         for (i = 0; i < ECHO_OBJECT0_NPAGES; i++) 
627                 if (echo_object0_pages[i] != NULL) {
628                         __free_pages (echo_object0_pages[i], 0);
629                         echo_object0_pages[i] = NULL;
630                 }
631 }
632
633 static int
634 echo_object0_pages_init (void)
635 {
636         struct page *pg;
637         int          i;
638         
639         for (i = 0; i < ECHO_OBJECT0_NPAGES; i++) {
640                 int gfp_mask = (i < ECHO_OBJECT0_NPAGES/2) ? GFP_KERNEL : GFP_HIGHUSER;
641                 
642                 pg = alloc_pages (gfp_mask, 0);
643                 if (pg == NULL) {
644                         echo_object0_pages_fini ();
645                         return (-ENOMEM);
646                 }
647                 
648                 memset (kmap (pg), 0, PAGE_SIZE);
649                 kunmap (pg);
650
651                 echo_object0_pages[i] = pg;
652         }
653         
654         return (0);
655 }
656
657 static int __init obdecho_init(void)
658 {
659         struct lprocfs_static_vars lvars;
660         int rc;
661
662         printk(KERN_INFO "Lustre Echo OBD driver; info@clusterfs.com\n");
663
664         lprocfs_init_vars(&lvars);
665
666         xprocfs_init ("echo");
667
668         rc = echo_object0_pages_init ();
669         if (rc != 0)
670                 goto failed_0;
671         
672         rc = class_register_type(&echo_obd_ops, lvars.module_vars,
673                                  OBD_ECHO_DEVICENAME);
674         if (rc != 0)
675                 goto failed_1;
676
677         rc = echo_client_init();
678         if (rc == 0)
679                 RETURN (0);
680
681         class_unregister_type(OBD_ECHO_DEVICENAME);
682  failed_1:
683         echo_object0_pages_fini ();
684  failed_0:
685         xprocfs_fini ();
686         
687         RETURN(rc);
688 }
689
690 static void __exit obdecho_exit(void)
691 {
692         echo_client_cleanup();
693         class_unregister_type(OBD_ECHO_DEVICENAME);
694         echo_object0_pages_fini ();
695         xprocfs_fini ();
696 }
697
698 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
699 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
700 MODULE_LICENSE("GPL");
701
702 module_init(obdecho_init);
703 module_exit(obdecho_exit);