Whamcloud - gitweb
5292cad8b8cbbe8f5988b713fc9b084718d263de
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdecho/echo.c
5  *
6  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  * This code is issued under the GNU General Public License.
9  * See the file COPYING in this distribution
10  *
11  * by Peter Braam <braam@clusterfs.com>
12  * and Andreas Dilger <adilger@clusterfs.com>
13  */
14
15 static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.30 2002/09/03 20:19:20 adilger Exp $";
16 #define OBDECHO_VERSION "$Revision: 1.30 $"
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <linux/proc_fs.h>
28 #include <linux/init.h>
29 #include <asm/unistd.h>
30
31 #define DEBUG_SUBSYSTEM S_ECHO
32
33 #include <linux/obd_support.h>
34 #include <linux/obd_class.h>
35 #include <linux/obd_echo.h>
36 #include <linux/lustre_debug.h>
37 #include <linux/lustre_dlm.h>
38
39 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
40
41 #define ECHO_PROC_STAT "sys/obdecho"
42
43 int echo_proc_read(char *page, char **start, off_t off, int count, int *eof,
44                    void *data)
45 {
46         struct obd_device *obd;
47         long long attrs = 0;
48         long long pages = 0;
49         int len;
50         int i;
51
52         *eof = 1;
53         if (off != 0)
54                 return (0);
55
56         for (i = 0, obd = obd_dev; i < MAX_OBD_DEVICES; i++, obd++) {
57                 if (strcmp(obd->obd_type->typ_name, OBD_ECHO_DEVICENAME))
58                         continue;
59                 attrs += atomic_read(&obd->u.echo.eo_getattr) +
60                          atomic_read(&obd->u.echo.eo_setattr);
61                 pages += atomic_read(&obd->u.echo.eo_read) +
62                          atomic_read(&obd->u.echo.eo_write);
63         }
64
65         len = sprintf(page, "%Ld %Ld\n", attrs, pages);
66
67         *start = page;
68         return (len);
69 }
70
71 int echo_proc_write(struct file *file, const char *ubuffer,
72                     unsigned long count, void *data)
73 {
74         struct obd_device *obd;
75         int i;
76
77         for (i = 0, obd = obd_dev; i < MAX_OBD_DEVICES; i++, obd++) {
78                 if (strcmp(obd->obd_type->typ_name, OBD_ECHO_DEVICENAME))
79                         continue;
80
81                 atomic_set(&obd->u.echo.eo_getattr, 0);
82                 atomic_set(&obd->u.echo.eo_setattr, 0);
83                 atomic_set(&obd->u.echo.eo_read, 0);
84                 atomic_set(&obd->u.echo.eo_write, 0);
85         }
86
87         /* Ignore what we've been asked to write, and just zero the counters */
88
89         return (count);
90 }
91
92 void echo_proc_init(void)
93 {
94         struct proc_dir_entry *entry;
95
96         entry = create_proc_entry(ECHO_PROC_STAT, S_IFREG|S_IRUGO|S_IWUSR,NULL);
97
98         if (entry == NULL) {
99                 CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
100                 return;
101         }
102
103         entry->data = NULL;
104         entry->read_proc = echo_proc_read;
105         entry->write_proc = echo_proc_write;
106 }
107
108 void echo_proc_fini(void)
109 {
110         remove_proc_entry(ECHO_PROC_STAT, 0);
111 }
112
113 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
114                         char *cluuid)
115 {
116         int rc;
117
118         MOD_INC_USE_COUNT;
119         rc = class_connect(conn, obd, cluuid);
120
121         if (rc)
122                 MOD_DEC_USE_COUNT;
123
124         return rc;
125 }
126
127 static int echo_disconnect(struct lustre_handle *conn)
128 {
129         int rc;
130
131         rc = class_disconnect(conn);
132         if (!rc)
133                 MOD_DEC_USE_COUNT;
134
135         return rc;
136 }
137
138 static __u64 echo_next_id(struct obd_device *obddev)
139 {
140         obd_id id;
141
142         spin_lock(&obddev->u.echo.eo_lock);
143         id = ++obddev->u.echo.eo_lastino;
144         spin_unlock(&obddev->u.echo.eo_lock);
145
146         return id;
147 }
148
149 int echo_create(struct lustre_handle *conn, struct obdo *oa,
150                 struct lov_stripe_md **ea)
151 {
152         struct obd_device *obd = class_conn2obd(conn);
153
154         if (!obd) {
155                 CERROR("invalid client %Lx\n", conn->addr);
156                 return -EINVAL;
157         }
158
159         if (!(oa->o_mode && S_IFMT)) {
160                 CERROR("filter obd: no type!\n");
161                 return -ENOENT;
162         }
163
164         if (!(oa->o_valid & OBD_MD_FLMODE)) {
165                 CERROR("invalid o_valid %08x\n", oa->o_valid);
166                 return -EINVAL;
167         }
168
169         oa->o_id = echo_next_id(obd);
170         oa->o_valid = OBD_MD_FLID;
171         atomic_inc(&obd->u.echo.eo_create);
172
173         return 0;
174 }
175
176 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
177                  struct lov_stripe_md *ea)
178 {
179         struct obd_device *obd = class_conn2obd(conn);
180
181         if (!obd) {
182                 CERROR("invalid client "LPX64"\n", conn->addr);
183                 RETURN(-EINVAL);
184         }
185
186         if (!(oa->o_valid & OBD_MD_FLID)) {
187                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
188                 RETURN(-EINVAL);
189         }
190
191         if (oa->o_id > obd->u.echo.eo_lastino) {
192                 CERROR("bad destroy objid: %Ld\n", (long long)oa->o_id);
193                 RETURN(-EINVAL);
194         }
195
196         atomic_inc(&obd->u.echo.eo_destroy);
197
198         return 0;
199 }
200
201 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
202                         struct lov_stripe_md *md)
203 {
204         struct obd_device *obd = class_conn2obd(conn);
205         obd_id id = oa->o_id;
206
207         if (!obd) {
208                 CERROR("invalid client "LPX64"\n", conn->addr);
209                 RETURN(-EINVAL);
210         }
211
212         if (!(oa->o_valid & OBD_MD_FLID)) {
213                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
214                 RETURN(-EINVAL);
215         }
216
217         memcpy(oa, &obd->u.echo.oa, sizeof(*oa));
218         oa->o_id = id;
219         oa->o_valid |= OBD_MD_FLID;
220
221         atomic_inc(&obd->u.echo.eo_getattr);
222
223         return 0;
224 }
225
226 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
227                         struct lov_stripe_md *md)
228 {
229         struct obd_device *obd = class_conn2obd(conn);
230
231         if (!obd) {
232                 CERROR("invalid client "LPX64"\n", conn->addr);
233                 RETURN(-EINVAL);
234         }
235
236         if (!(oa->o_valid & OBD_MD_FLID)) {
237                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
238                 RETURN(-EINVAL);
239         }
240
241         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
242
243         atomic_inc(&obd->u.echo.eo_setattr);
244
245         return 0;
246 }
247
248 /* This allows us to verify that desc_private is passed unmolested */
249 #define DESC_PRIV 0x10293847
250
251 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
252                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
253                 struct niobuf_local *res, void **desc_private)
254 {
255         struct obd_device *obd;
256         struct niobuf_local *r = res;
257         int rc = 0;
258         int i;
259
260         ENTRY;
261
262         obd = class_conn2obd(conn);
263         if (!obd) {
264                 CERROR("invalid client "LPX64"\n", conn->addr);
265                 RETURN(-EINVAL);
266         }
267
268         memset(res, 0, sizeof(*res) * niocount);
269
270         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
271                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
272
273         *desc_private = (void *)DESC_PRIV;
274
275         for (i = 0; i < objcount; i++, obj++) {
276                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
277                 int verify = obj->ioo_id != 0;
278                 int j;
279
280                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
281                         r->page = alloc_pages(gfp_mask, 0);
282                         if (!r->page) {
283                                 CERROR("can't get page %d/%d for id "LPU64"\n",
284                                        j, obj->ioo_bufcnt, obj->ioo_id);
285                                 GOTO(preprw_cleanup, rc = -ENOMEM);
286                         }
287                         atomic_inc(&obd->u.echo.eo_prep);
288
289                         r->offset = nb->offset;
290                         r->addr = kmap(r->page);
291                         r->len = nb->len;
292
293                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
294                                r->page, r->addr, r->offset);
295
296                         if (verify && cmd == OBD_BRW_READ)
297                                 page_debug_setup(r->addr, r->len, r->offset,
298                                                  obj->ioo_id);
299                         else if (verify)
300                                 page_debug_setup(r->addr, r->len,
301                                                  0xecc0ecc0ecc0ecc0,
302                                                  0xecc0ecc0ecc0ecc0);
303                 }
304         }
305         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
306                atomic_read(&obd->u.echo.eo_prep));
307
308         RETURN(0);
309
310 preprw_cleanup:
311         /* It is possible that we would rather handle errors by  allow
312          * any already-set-up pages to complete, rather than tearing them
313          * all down again.  I believe that this is what the in-kernel
314          * prep/commit operations do.
315          */
316         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
317         while (r-- > res) {
318                 kunmap(r->page);
319                 __free_pages(r->page, 0);
320                 atomic_dec(&obd->u.echo.eo_prep);
321         }
322         memset(res, 0, sizeof(*res) * niocount);
323
324         return rc;
325 }
326
327 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
328                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
329                   void *desc_private)
330 {
331         struct obd_device *obd;
332         struct niobuf_local *r = res;
333         int rc = 0;
334         int i;
335         ENTRY;
336
337         obd = class_conn2obd(conn);
338         if (!obd) {
339                 CERROR("invalid client "LPX64"\n", conn->addr);
340                 RETURN(-EINVAL);
341         }
342
343         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
344                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
345                        objcount, niocount);
346                 atomic_inc(&obd->u.echo.eo_read);
347         } else {
348                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
349                        objcount, niocount);
350                 atomic_inc(&obd->u.echo.eo_write);
351         }
352
353         if (niocount && !r) {
354                 CERROR("NULL res niobuf with niocount %d\n", niocount);
355                 RETURN(-EINVAL);
356         }
357
358         LASSERT(desc_private == (void *)DESC_PRIV);
359
360         for (i = 0; i < objcount; i++, obj++) {
361                 int verify = obj->ioo_id != 0;
362                 int j;
363
364                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
365                         struct page *page = r->page;
366                         void *addr;
367
368                         if (!page || !(addr = page_address(page)) ||
369                             !kern_addr_valid(addr)) {
370
371                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
372                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
373                                 GOTO(commitrw_cleanup, rc = -EFAULT);
374                         }
375
376                         atomic_inc(&obd->u.echo.eo_read);
377
378                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
379                                r->page, addr, r->offset);
380
381                         if (verify)
382                                 page_debug_check("echo", addr, r->len,
383                                                  r->offset, obj->ioo_id);
384
385                         kunmap(page);
386                         __free_pages(page, 0);
387                         atomic_dec(&obd->u.echo.eo_prep);
388                 }
389         }
390         CDEBUG(D_PAGE, "%d pages remain after commit\n",
391                atomic_read(&obd->u.echo.eo_prep));
392         RETURN(0);
393
394 commitrw_cleanup:
395         CERROR("cleaning up %ld pages (%d obdos)\n",
396                niocount - (long)(r - res) - 1, objcount);
397         while (++r < res + niocount) {
398                 struct page *page = r->page;
399
400                 kunmap(page);
401                 __free_pages(page, 0);
402                 atomic_dec(&obd->u.echo.eo_prep);
403         }
404         return rc;
405 }
406
407 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
408 {
409         ENTRY;
410
411         obddev->obd_namespace =
412                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
413         if (obddev->obd_namespace == NULL) {
414                 LBUG();
415                 RETURN(-ENOMEM);
416         }
417
418         RETURN(0);
419 }
420
421 static int echo_cleanup(struct obd_device *obddev)
422 {
423         ENTRY;
424
425         ldlm_namespace_free(obddev->obd_namespace);
426         printk(KERN_INFO "%s: %u getattrs, %u setattrs\n", OBD_ECHO_DEVICENAME,
427                atomic_read(&obddev->u.echo.eo_getattr),
428                atomic_read(&obddev->u.echo.eo_setattr));
429         printk(KERN_INFO "%s: %u reads, %u writes (%d leaked)\n",
430                OBD_ECHO_DEVICENAME,
431                atomic_read(&obddev->u.echo.eo_read),
432                atomic_read(&obddev->u.echo.eo_write),
433                atomic_read(&obddev->u.echo.eo_prep));
434         printk(KERN_INFO "%s: %u creates, %u destroys\n", OBD_ECHO_DEVICENAME,
435                atomic_read(&obddev->u.echo.eo_create),
436                atomic_read(&obddev->u.echo.eo_destroy));
437
438         RETURN(0);
439 }
440
441 struct obd_ops echo_obd_ops = {
442         o_connect:      echo_connect,
443         o_disconnect:   echo_disconnect,
444         o_create:       echo_create,
445         o_destroy:      echo_destroy,
446         o_getattr:      echo_getattr,
447         o_setattr:      echo_setattr,
448         o_preprw:       echo_preprw,
449         o_commitrw:     echo_commitrw,
450         o_setup:        echo_setup,
451         o_cleanup:      echo_cleanup
452 };
453
454 static int __init obdecho_init(void)
455 {
456         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION
457                " info@clusterfs.com\n");
458
459         echo_proc_init();
460
461         return class_register_type(&echo_obd_ops, OBD_ECHO_DEVICENAME);
462 }
463
464 static void __exit obdecho_exit(void)
465 {
466         echo_proc_fini();
467
468         class_unregister_type(OBD_ECHO_DEVICENAME);
469 }
470
471 MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
472 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver " OBDECHO_VERSION);
473 MODULE_LICENSE("GPL");
474
475 module_init(obdecho_init);
476 module_exit(obdecho_exit);