Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Andreas Dilger <adilger@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define OBDECHO_VERSION "1.0"
25
26 #define EXPORT_SYMTAB
27
28 #include <linux/version.h>
29 #include <linux/module.h>
30 #include <linux/mm.h>
31 #include <linux/highmem.h>
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <linux/sched.h>
35 #include <linux/smp_lock.h>
36 #include <linux/ext2_fs.h>
37 #include <linux/quotaops.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <asm/unistd.h>
41
42 #define DEBUG_SUBSYSTEM S_ECHO
43
44 #include <linux/obd_support.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_echo.h>
47 #include <linux/lustre_debug.h>
48 #include <linux/lustre_dlm.h>
49 #include <linux/lprocfs_status.h>
50
51 static atomic_t echo_page_rws;
52 static atomic_t echo_getattrs;
53
54 #define ECHO_PROC_STAT "sys/obdecho"
55 #define ECHO_INIT_OBJID 0x1000000000000000ULL
56
57 extern struct lprocfs_vars status_var_nm_1[];
58 extern struct lprocfs_vars status_class_var[];
59
60 int echo_proc_read(char *page, char **start, off_t off, int count, int *eof,
61                    void *data)
62 {
63         long long attrs = atomic_read(&echo_getattrs);
64         long long pages = atomic_read(&echo_page_rws);
65         int len;
66
67         *eof = 1;
68         if (off != 0)
69                 return (0);
70
71         len = sprintf(page, "%Ld %Ld\n", attrs, pages);
72
73         *start = page;
74         return (len);
75 }
76
77 int echo_proc_write(struct file *file, const char *ubuffer,
78                     unsigned long count, void *data)
79 {
80         /* Ignore what we've been asked to write, and just zero the counters */
81         atomic_set (&echo_page_rws, 0);
82         atomic_set (&echo_getattrs, 0);
83
84         return (count);
85 }
86
87 void echo_proc_init(void)
88 {
89         struct proc_dir_entry *entry;
90
91         entry = create_proc_entry(ECHO_PROC_STAT, S_IFREG|S_IRUGO|S_IWUSR,NULL);
92
93         if (entry == NULL) {
94                 CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
95                 return;
96         }
97
98         entry->data = NULL;
99         entry->read_proc = echo_proc_read;
100         entry->write_proc = echo_proc_write;
101 }
102
103 void echo_proc_fini(void)
104 {
105         remove_proc_entry(ECHO_PROC_STAT, 0);
106 }
107
108 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
109                         obd_uuid_t cluuid, struct recovd_obd *recovd,
110                         ptlrpc_recovery_cb_t recover)
111 {
112         return class_connect(conn, obd, cluuid);
113 }
114
115 static __u64 echo_next_id(struct obd_device *obddev)
116 {
117         obd_id id;
118
119         spin_lock(&obddev->u.echo.eo_lock);
120         id = ++obddev->u.echo.eo_lastino;
121         spin_unlock(&obddev->u.echo.eo_lock);
122
123         return id;
124 }
125
126 int echo_create(struct lustre_handle *conn, struct obdo *oa,
127                 struct lov_stripe_md **ea)
128 {
129         struct obd_device *obd = class_conn2obd(conn);
130
131         if (!obd) {
132                 CERROR("invalid client "LPX64"\n", conn->addr);
133                 return -EINVAL;
134         }
135
136         if (!(oa->o_mode && S_IFMT)) {
137                 CERROR("filter obd: no type!\n");
138                 return -ENOENT;
139         }
140
141         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
142                 CERROR("invalid o_valid %08x\n", oa->o_valid);
143                 return -EINVAL;
144         }
145
146         oa->o_id = echo_next_id(obd);
147         oa->o_valid = OBD_MD_FLID;
148         atomic_inc(&obd->u.echo.eo_create);
149
150         return 0;
151 }
152
153 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
154                  struct lov_stripe_md *ea)
155 {
156         struct obd_device *obd = class_conn2obd(conn);
157
158         if (!obd) {
159                 CERROR("invalid client "LPX64"\n", conn->addr);
160                 RETURN(-EINVAL);
161         }
162
163         if (!(oa->o_valid & OBD_MD_FLID)) {
164                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
165                 RETURN(-EINVAL);
166         }
167
168         if (oa->o_id > obd->u.echo.eo_lastino || oa->o_id < ECHO_INIT_OBJID) {
169                 CERROR("bad destroy objid: "LPX64"\n", oa->o_id);
170                 RETURN(-EINVAL);
171         }
172
173         atomic_inc(&obd->u.echo.eo_destroy);
174
175         return 0;
176 }
177
178 static int echo_open(struct lustre_handle *conn, struct obdo *oa,
179                      struct lov_stripe_md *md)
180 {
181         return 0;
182 }
183
184 static int echo_close(struct lustre_handle *conn, struct obdo *oa,
185                       struct lov_stripe_md *md)
186 {
187         return 0;
188 }
189
190 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
191                         struct lov_stripe_md *md)
192 {
193         struct obd_device *obd = class_conn2obd(conn);
194         obd_id id = oa->o_id;
195
196         if (!obd) {
197                 CERROR("invalid client "LPX64"\n", conn->addr);
198                 RETURN(-EINVAL);
199         }
200
201         if (!(oa->o_valid & OBD_MD_FLID)) {
202                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
203                 RETURN(-EINVAL);
204         }
205
206         memcpy(oa, &obd->u.echo.oa, sizeof(*oa));
207         oa->o_id = id;
208         oa->o_valid |= OBD_MD_FLID;
209
210         atomic_inc(&echo_getattrs);
211
212         return 0;
213 }
214
215 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
216                         struct lov_stripe_md *md)
217 {
218         struct obd_device *obd = class_conn2obd(conn);
219
220         if (!obd) {
221                 CERROR("invalid client "LPX64"\n", conn->addr);
222                 RETURN(-EINVAL);
223         }
224
225         if (!(oa->o_valid & OBD_MD_FLID)) {
226                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
227                 RETURN(-EINVAL);
228         }
229
230         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
231
232         atomic_inc(&obd->u.echo.eo_setattr);
233
234         return 0;
235 }
236
237 /* This allows us to verify that desc_private is passed unmolested */
238 #define DESC_PRIV 0x10293847
239
240 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
241                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
242                 struct niobuf_local *res, void **desc_private)
243 {
244         struct obd_device *obd;
245         struct niobuf_local *r = res;
246         int rc = 0;
247         int i;
248
249         ENTRY;
250
251         obd = class_conn2obd(conn);
252         if (!obd) {
253                 CERROR("invalid client "LPX64"\n", conn->addr);
254                 RETURN(-EINVAL);
255         }
256
257         memset(res, 0, sizeof(*res) * niocount);
258
259         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
260                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
261
262         *desc_private = (void *)DESC_PRIV;
263
264         obd_kmap_get(niocount, 1);
265
266         for (i = 0; i < objcount; i++, obj++) {
267                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
268                 int verify = obj->ioo_id != 0;
269                 int j;
270
271                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
272                         r->page = alloc_pages(gfp_mask, 0);
273                         if (!r->page) {
274                                 CERROR("can't get page %d/%d for id "LPU64"\n",
275                                        j, obj->ioo_bufcnt, obj->ioo_id);
276                                 GOTO(preprw_cleanup, rc = -ENOMEM);
277                         }
278                         atomic_inc(&obd->u.echo.eo_prep);
279
280                         r->offset = nb->offset;
281                         r->addr = kmap(r->page);
282                         r->len = nb->len;
283
284                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
285                                r->page, r->addr, r->offset);
286
287                         if (verify && cmd == OBD_BRW_READ)
288                                 page_debug_setup(r->addr, r->len, r->offset,
289                                                  obj->ioo_id);
290                         else if (verify)
291                                 page_debug_setup(r->addr, r->len,
292                                                  0xecc0ecc0ecc0ecc0,
293                                                  0xecc0ecc0ecc0ecc0);
294                 }
295         }
296         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
297                atomic_read(&obd->u.echo.eo_prep));
298
299         RETURN(0);
300
301 preprw_cleanup:
302         /* It is possible that we would rather handle errors by  allow
303          * any already-set-up pages to complete, rather than tearing them
304          * all down again.  I believe that this is what the in-kernel
305          * prep/commit operations do.
306          */
307         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
308         while (r-- > res) {
309                 kunmap(r->page);
310                 __free_pages(r->page, 0);
311                 atomic_dec(&obd->u.echo.eo_prep);
312         }
313         obd_kmap_put(niocount);
314         memset(res, 0, sizeof(*res) * niocount);
315
316         return rc;
317 }
318
319 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
320                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
321                   void *desc_private)
322 {
323         struct obd_device *obd;
324         struct niobuf_local *r = res;
325         int rc = 0;
326         int i;
327         ENTRY;
328
329         obd = class_conn2obd(conn);
330         if (!obd) {
331                 CERROR("invalid client "LPX64"\n", conn->addr);
332                 RETURN(-EINVAL);
333         }
334
335         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
336                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
337                        objcount, niocount);
338         } else {
339                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
340                        objcount, niocount);
341         }
342
343         if (niocount && !r) {
344                 CERROR("NULL res niobuf with niocount %d\n", niocount);
345                 RETURN(-EINVAL);
346         }
347
348         LASSERT(desc_private == (void *)DESC_PRIV);
349
350         for (i = 0; i < objcount; i++, obj++) {
351                 int verify = obj->ioo_id != 0;
352                 int j;
353
354                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
355                         struct page *page = r->page;
356                         void *addr;
357
358                         if (!page || !(addr = page_address(page)) ||
359                             !kern_addr_valid(addr)) {
360
361                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
362                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
363                                 GOTO(commitrw_cleanup, rc = -EFAULT);
364                         }
365
366                         atomic_inc(&echo_page_rws);
367
368                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
369                                r->page, addr, r->offset);
370
371                         if (verify)
372                                 page_debug_check("echo", addr, r->len,
373                                                  r->offset, obj->ioo_id);
374
375                         kunmap(page);
376                         obd_kmap_put(1);
377                         __free_pages(page, 0);
378                         atomic_dec(&obd->u.echo.eo_prep);
379                 }
380         }
381         CDEBUG(D_PAGE, "%d pages remain after commit\n",
382                atomic_read(&obd->u.echo.eo_prep));
383         RETURN(0);
384
385 commitrw_cleanup:
386         CERROR("cleaning up %ld pages (%d obdos)\n",
387                niocount - (long)(r - res) - 1, objcount);
388         while (++r < res + niocount) {
389                 struct page *page = r->page;
390
391                 kunmap(page);
392                 obd_kmap_put(1);
393                 __free_pages(page, 0);
394                 atomic_dec(&obd->u.echo.eo_prep);
395         }
396         return rc;
397 }
398
399 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
400 {
401         ENTRY;
402
403         obddev->obd_namespace =
404                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
405         if (obddev->obd_namespace == NULL) {
406                 LBUG();
407                 RETURN(-ENOMEM);
408         }
409         spin_lock_init(&obddev->u.echo.eo_lock);
410         obddev->u.echo.eo_lastino = ECHO_INIT_OBJID;
411
412         RETURN(0);
413 }
414
415 static int echo_cleanup(struct obd_device *obddev)
416 {
417         ENTRY;
418
419         ldlm_namespace_free(obddev->obd_namespace);
420         CERROR("%d prep/commitrw pages leaked\n",
421                atomic_read(&obddev->u.echo.eo_prep));
422
423         RETURN(0);
424 }
425
426 int echo_attach(struct obd_device *dev, obd_count len, void *data)
427 {
428         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
429 }
430
431 int echo_detach(struct obd_device *dev)
432 {
433         return lprocfs_dereg_obd(dev);
434 }
435
436 static struct obd_ops echo_obd_ops = {
437         o_owner:       THIS_MODULE,
438         o_attach:      echo_attach,
439         o_detach:      echo_detach,
440         o_connect:     echo_connect,
441         o_disconnect:  class_disconnect,
442         o_create:      echo_create,
443         o_destroy:     echo_destroy,
444         o_open:        echo_open,
445         o_close:       echo_close,
446         o_getattr:     echo_getattr,
447         o_setattr:     echo_setattr,
448         o_preprw:      echo_preprw,
449         o_commitrw:    echo_commitrw,
450         o_setup:       echo_setup,
451         o_cleanup:     echo_cleanup
452 };
453
454 extern int echo_client_init(void);
455 extern void echo_client_cleanup(void);
456
457 static int __init obdecho_init(void)
458 {
459         int rc;
460
461         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION
462                " info@clusterfs.com\n");
463
464         echo_proc_init();
465         rc = class_register_type(&echo_obd_ops, status_class_var,
466                                  OBD_ECHO_DEVICENAME);
467         if (rc)
468                 RETURN(rc);
469
470         rc = echo_client_init();
471         if (rc)
472                 class_unregister_type(OBD_ECHO_DEVICENAME);
473
474         RETURN(rc);
475 }
476
477 static void __exit obdecho_exit(void)
478 {
479         echo_proc_fini();
480         echo_client_cleanup();
481         class_unregister_type(OBD_ECHO_DEVICENAME);
482 }
483
484 MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
485 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver " OBDECHO_VERSION);
486 MODULE_LICENSE("GPL");
487
488 module_init(obdecho_init);
489 module_exit(obdecho_exit);