Whamcloud - gitweb
b=191
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Andreas Dilger <adilger@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define OBDECHO_VERSION "1.0"
25
26 #define EXPORT_SYMTAB
27
28 #include <linux/version.h>
29 #include <linux/module.h>
30 #include <linux/mm.h>
31 #include <linux/highmem.h>
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <linux/sched.h>
35 #include <linux/smp_lock.h>
36 #include <linux/ext2_fs.h>
37 #include <linux/quotaops.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <asm/unistd.h>
41
42 #define DEBUG_SUBSYSTEM S_ECHO
43
44 #include <linux/obd_support.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_echo.h>
47 #include <linux/lustre_debug.h>
48 #include <linux/lustre_dlm.h>
49 #include <linux/lprocfs_status.h>
50
51 static atomic_t echo_page_rws;
52 static atomic_t echo_getattrs;
53
54 #define ECHO_PROC_STAT "sys/obdecho"
55 #define ECHO_INIT_OBJID 0x1000000000000000ULL
56
57 extern struct lprocfs_vars status_var_nm_1[];
58 extern struct lprocfs_vars status_class_var[];
59
60 int echo_proc_read(char *page, char **start, off_t off, int count, int *eof,
61                    void *data)
62 {
63         long long attrs = atomic_read(&echo_getattrs);
64         long long pages = atomic_read(&echo_page_rws);
65         int len;
66
67         *eof = 1;
68         if (off != 0)
69                 return (0);
70
71         len = sprintf(page, "%Ld %Ld\n", attrs, pages);
72
73         *start = page;
74         return (len);
75 }
76
77 int echo_proc_write(struct file *file, const char *ubuffer,
78                     unsigned long count, void *data)
79 {
80         /* Ignore what we've been asked to write, and just zero the counters */
81         atomic_set (&echo_page_rws, 0);
82         atomic_set (&echo_getattrs, 0);
83
84         return (count);
85 }
86
87 void echo_proc_init(void)
88 {
89         struct proc_dir_entry *entry;
90
91         entry = create_proc_entry(ECHO_PROC_STAT, S_IFREG|S_IRUGO|S_IWUSR,NULL);
92
93         if (entry == NULL) {
94                 CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
95                 return;
96         }
97
98         entry->data = NULL;
99         entry->read_proc = echo_proc_read;
100         entry->write_proc = echo_proc_write;
101 }
102
103 void echo_proc_fini(void)
104 {
105         remove_proc_entry(ECHO_PROC_STAT, 0);
106 }
107
108 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
109                         obd_uuid_t cluuid, struct recovd_obd *recovd,
110                         ptlrpc_recovery_cb_t recover)
111 {
112         int rc;
113
114         MOD_INC_USE_COUNT;
115         rc = class_connect(conn, obd, cluuid);
116
117         if (rc)
118                 MOD_DEC_USE_COUNT;
119
120         return rc;
121 }
122
123 static int echo_disconnect(struct lustre_handle *conn)
124 {
125         int rc;
126
127         rc = class_disconnect(conn);
128         if (!rc)
129                 MOD_DEC_USE_COUNT;
130
131         return rc;
132 }
133
134 static __u64 echo_next_id(struct obd_device *obddev)
135 {
136         obd_id id;
137
138         spin_lock(&obddev->u.echo.eo_lock);
139         id = ++obddev->u.echo.eo_lastino;
140         spin_unlock(&obddev->u.echo.eo_lock);
141
142         return id;
143 }
144
145 int echo_create(struct lustre_handle *conn, struct obdo *oa,
146                 struct lov_stripe_md **ea)
147 {
148         struct obd_device *obd = class_conn2obd(conn);
149
150         if (!obd) {
151                 CERROR("invalid client %Lx\n", conn->addr);
152                 return -EINVAL;
153         }
154
155         if (!(oa->o_mode && S_IFMT)) {
156                 CERROR("filter obd: no type!\n");
157                 return -ENOENT;
158         }
159
160         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
161                 CERROR("invalid o_valid %08x\n", oa->o_valid);
162                 return -EINVAL;
163         }
164
165         oa->o_id = echo_next_id(obd);
166         oa->o_valid = OBD_MD_FLID;
167         atomic_inc(&obd->u.echo.eo_create);
168
169         return 0;
170 }
171
172 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
173                  struct lov_stripe_md *ea)
174 {
175         struct obd_device *obd = class_conn2obd(conn);
176
177         if (!obd) {
178                 CERROR("invalid client "LPX64"\n", conn->addr);
179                 RETURN(-EINVAL);
180         }
181
182         if (!(oa->o_valid & OBD_MD_FLID)) {
183                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
184                 RETURN(-EINVAL);
185         }
186
187         if (oa->o_id > obd->u.echo.eo_lastino || oa->o_id < ECHO_INIT_OBJID) {
188                 CERROR("bad destroy objid: "LPX64"\n", oa->o_id);
189                 RETURN(-EINVAL);
190         }
191
192         atomic_inc(&obd->u.echo.eo_destroy);
193
194         return 0;
195 }
196
197 static int echo_open(struct lustre_handle *conn, struct obdo *oa,
198                      struct lov_stripe_md *md)
199 {
200         return 0;
201 }
202
203 static int echo_close(struct lustre_handle *conn, struct obdo *oa,
204                       struct lov_stripe_md *md)
205 {
206         return 0;
207 }
208
209 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
210                         struct lov_stripe_md *md)
211 {
212         struct obd_device *obd = class_conn2obd(conn);
213         obd_id id = oa->o_id;
214
215         if (!obd) {
216                 CERROR("invalid client "LPX64"\n", conn->addr);
217                 RETURN(-EINVAL);
218         }
219
220         if (!(oa->o_valid & OBD_MD_FLID)) {
221                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
222                 RETURN(-EINVAL);
223         }
224
225         memcpy(oa, &obd->u.echo.oa, sizeof(*oa));
226         oa->o_id = id;
227         oa->o_valid |= OBD_MD_FLID;
228
229         atomic_inc(&echo_getattrs);
230
231         return 0;
232 }
233
234 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
235                         struct lov_stripe_md *md)
236 {
237         struct obd_device *obd = class_conn2obd(conn);
238
239         if (!obd) {
240                 CERROR("invalid client "LPX64"\n", conn->addr);
241                 RETURN(-EINVAL);
242         }
243
244         if (!(oa->o_valid & OBD_MD_FLID)) {
245                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
246                 RETURN(-EINVAL);
247         }
248
249         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
250
251         atomic_inc(&obd->u.echo.eo_setattr);
252
253         return 0;
254 }
255
256 /* This allows us to verify that desc_private is passed unmolested */
257 #define DESC_PRIV 0x10293847
258
259 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
260                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
261                 struct niobuf_local *res, void **desc_private)
262 {
263         struct obd_device *obd;
264         struct niobuf_local *r = res;
265         int rc = 0;
266         int i;
267
268         ENTRY;
269
270         obd = class_conn2obd(conn);
271         if (!obd) {
272                 CERROR("invalid client "LPX64"\n", conn->addr);
273                 RETURN(-EINVAL);
274         }
275
276         memset(res, 0, sizeof(*res) * niocount);
277
278         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
279                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
280
281         *desc_private = (void *)DESC_PRIV;
282
283         obd_kmap_get(niocount, 1);
284
285         for (i = 0; i < objcount; i++, obj++) {
286                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
287                 int verify = obj->ioo_id != 0;
288                 int j;
289
290                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
291                         r->page = alloc_pages(gfp_mask, 0);
292                         if (!r->page) {
293                                 CERROR("can't get page %d/%d for id "LPU64"\n",
294                                        j, obj->ioo_bufcnt, obj->ioo_id);
295                                 GOTO(preprw_cleanup, rc = -ENOMEM);
296                         }
297                         atomic_inc(&obd->u.echo.eo_prep);
298
299                         r->offset = nb->offset;
300                         r->addr = kmap(r->page);
301                         r->len = nb->len;
302
303                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
304                                r->page, r->addr, r->offset);
305
306                         if (verify && cmd == OBD_BRW_READ)
307                                 page_debug_setup(r->addr, r->len, r->offset,
308                                                  obj->ioo_id);
309                         else if (verify)
310                                 page_debug_setup(r->addr, r->len,
311                                                  0xecc0ecc0ecc0ecc0,
312                                                  0xecc0ecc0ecc0ecc0);
313                 }
314         }
315         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
316                atomic_read(&obd->u.echo.eo_prep));
317
318         RETURN(0);
319
320 preprw_cleanup:
321         /* It is possible that we would rather handle errors by  allow
322          * any already-set-up pages to complete, rather than tearing them
323          * all down again.  I believe that this is what the in-kernel
324          * prep/commit operations do.
325          */
326         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
327         while (r-- > res) {
328                 kunmap(r->page);
329                 __free_pages(r->page, 0);
330                 atomic_dec(&obd->u.echo.eo_prep);
331         }
332         obd_kmap_put(niocount);
333         memset(res, 0, sizeof(*res) * niocount);
334
335         return rc;
336 }
337
338 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
339                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
340                   void *desc_private)
341 {
342         struct obd_device *obd;
343         struct niobuf_local *r = res;
344         int rc = 0;
345         int i;
346         ENTRY;
347
348         obd = class_conn2obd(conn);
349         if (!obd) {
350                 CERROR("invalid client "LPX64"\n", conn->addr);
351                 RETURN(-EINVAL);
352         }
353
354         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
355                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
356                        objcount, niocount);
357         } else {
358                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
359                        objcount, niocount);
360         }
361
362         if (niocount && !r) {
363                 CERROR("NULL res niobuf with niocount %d\n", niocount);
364                 RETURN(-EINVAL);
365         }
366
367         LASSERT(desc_private == (void *)DESC_PRIV);
368
369         for (i = 0; i < objcount; i++, obj++) {
370                 int verify = obj->ioo_id != 0;
371                 int j;
372
373                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
374                         struct page *page = r->page;
375                         void *addr;
376
377                         if (!page || !(addr = page_address(page)) ||
378                             !kern_addr_valid(addr)) {
379
380                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
381                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
382                                 GOTO(commitrw_cleanup, rc = -EFAULT);
383                         }
384
385                         atomic_inc(&echo_page_rws);
386
387                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
388                                r->page, addr, r->offset);
389
390                         if (verify)
391                                 page_debug_check("echo", addr, r->len,
392                                                  r->offset, obj->ioo_id);
393
394                         kunmap(page);
395                         obd_kmap_put(1);
396                         __free_pages(page, 0);
397                         atomic_dec(&obd->u.echo.eo_prep);
398                 }
399         }
400         CDEBUG(D_PAGE, "%d pages remain after commit\n",
401                atomic_read(&obd->u.echo.eo_prep));
402         RETURN(0);
403
404 commitrw_cleanup:
405         CERROR("cleaning up %ld pages (%d obdos)\n",
406                niocount - (long)(r - res) - 1, objcount);
407         while (++r < res + niocount) {
408                 struct page *page = r->page;
409
410                 kunmap(page);
411                 obd_kmap_put(1);
412                 __free_pages(page, 0);
413                 atomic_dec(&obd->u.echo.eo_prep);
414         }
415         return rc;
416 }
417
418 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
419 {
420         ENTRY;
421
422         obddev->obd_namespace =
423                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
424         if (obddev->obd_namespace == NULL) {
425                 LBUG();
426                 RETURN(-ENOMEM);
427         }
428         spin_lock_init(&obddev->u.echo.eo_lock);
429         obddev->u.echo.eo_lastino = ECHO_INIT_OBJID;
430
431         RETURN(0);
432 }
433
434 static int echo_cleanup(struct obd_device *obddev)
435 {
436         ENTRY;
437
438         ldlm_namespace_free(obddev->obd_namespace);
439         CERROR("%d prep/commitrw pages leaked\n",
440                atomic_read(&obddev->u.echo.eo_prep));
441
442         RETURN(0);
443 }
444
445 int echo_attach(struct obd_device *dev, obd_count len, void *data)
446 {
447         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
448 }
449
450 int echo_detach(struct obd_device *dev)
451 {
452         return lprocfs_dereg_obd(dev);
453 }
454
455 static struct obd_ops echo_obd_ops = {
456         o_attach:       echo_attach,
457         o_detach:       echo_detach,
458         o_connect:      echo_connect,
459         o_disconnect:   echo_disconnect,
460         o_create:       echo_create,
461         o_destroy:      echo_destroy,
462         o_open:         echo_open,
463         o_close:        echo_close,
464         o_getattr:      echo_getattr,
465         o_setattr:      echo_setattr,
466         o_preprw:       echo_preprw,
467         o_commitrw:     echo_commitrw,
468         o_setup:        echo_setup,
469         o_cleanup:      echo_cleanup
470 };
471
472 extern int echo_client_init(void);
473 extern void echo_client_cleanup(void);
474
475 static int __init obdecho_init(void)
476 {
477         int rc;
478
479         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION
480                " info@clusterfs.com\n");
481
482         echo_proc_init();
483         rc = class_register_type(&echo_obd_ops, status_class_var,
484                                  OBD_ECHO_DEVICENAME);
485         if (rc)
486                 RETURN(rc);
487
488         rc = echo_client_init();
489         if (rc)
490                 class_unregister_type(OBD_ECHO_DEVICENAME);
491
492         RETURN(rc);
493 }
494
495 static void __exit obdecho_exit(void)
496 {
497         echo_proc_fini();
498         echo_client_cleanup();
499         class_unregister_type(OBD_ECHO_DEVICENAME);
500 }
501
502 MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
503 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver " OBDECHO_VERSION);
504 MODULE_LICENSE("GPL");
505
506 module_init(obdecho_init);
507 module_exit(obdecho_exit);