Whamcloud - gitweb
f31068771dc8826a1556c62c595288118a69d6ef
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdecho/echo.c
5  *
6  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  * This code is issued under the GNU General Public License.
9  * See the file COPYING in this distribution
10  *
11  * by Peter Braam <braam@clusterfs.com>
12  * and Andreas Dilger <adilger@clusterfs.com>
13  */
14
15 static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.42 2002/11/01 17:23:13 thantry Exp $";
16 #define OBDECHO_VERSION "$Revision: 1.42 $"
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/mm.h>
23 #include <linux/highmem.h>
24 #include <linux/fs.h>
25 #include <linux/stat.h>
26 #include <linux/sched.h>
27 #include <linux/smp_lock.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <linux/proc_fs.h>
31 #include <linux/init.h>
32 #include <asm/unistd.h>
33
34 #define DEBUG_SUBSYSTEM S_ECHO
35
36 #include <linux/obd_support.h>
37 #include <linux/obd_class.h>
38 #include <linux/obd_echo.h>
39 #include <linux/lustre_debug.h>
40 #include <linux/lustre_dlm.h>
41 #include <linux/lprocfs_status.h>
42
43 static atomic_t echo_page_rws;
44 static atomic_t echo_getattrs;
45
46 #define ECHO_PROC_STAT "sys/obdecho"
47 #define ECHO_INIT_OBJID 0x1000000000000000ULL
48
49 extern lprocfs_vars_t status_var_nm_1[];
50 extern lprocfs_vars_t status_class_var[];
51
52 int echo_proc_read(char *page, char **start, off_t off, int count, int *eof,
53                    void *data)
54 {
55         long long attrs = atomic_read(&echo_getattrs);
56         long long pages = atomic_read(&echo_page_rws);
57         int len;
58
59         *eof = 1;
60         if (off != 0)
61                 return (0);
62
63         len = sprintf(page, "%Ld %Ld\n", attrs, pages);
64
65         *start = page;
66         return (len);
67 }
68
69 int echo_proc_write(struct file *file, const char *ubuffer,
70                     unsigned long count, void *data)
71 {
72         /* Ignore what we've been asked to write, and just zero the counters */
73         atomic_set (&echo_page_rws, 0);
74         atomic_set (&echo_getattrs, 0);
75
76         return (count);
77 }
78
79 void echo_proc_init(void)
80 {
81         struct proc_dir_entry *entry;
82
83         entry = create_proc_entry(ECHO_PROC_STAT, S_IFREG|S_IRUGO|S_IWUSR,NULL);
84
85         if (entry == NULL) {
86                 CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
87                 return;
88         }
89
90         entry->data = NULL;
91         entry->read_proc = echo_proc_read;
92         entry->write_proc = echo_proc_write;
93 }
94
95 void echo_proc_fini(void)
96 {
97         remove_proc_entry(ECHO_PROC_STAT, 0);
98 }
99
100 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
101                         obd_uuid_t cluuid, struct recovd_obd *recovd,
102                         ptlrpc_recovery_cb_t recover)
103 {
104         int rc;
105
106         MOD_INC_USE_COUNT;
107         rc = class_connect(conn, obd, cluuid);
108
109         if (rc)
110                 MOD_DEC_USE_COUNT;
111
112         return rc;
113 }
114
115 static int echo_disconnect(struct lustre_handle *conn)
116 {
117         int rc;
118
119         rc = class_disconnect(conn);
120         if (!rc)
121                 MOD_DEC_USE_COUNT;
122
123         return rc;
124 }
125
126 static __u64 echo_next_id(struct obd_device *obddev)
127 {
128         obd_id id;
129
130         spin_lock(&obddev->u.echo.eo_lock);
131         id = ++obddev->u.echo.eo_lastino;
132         spin_unlock(&obddev->u.echo.eo_lock);
133
134         return id;
135 }
136
137 int echo_create(struct lustre_handle *conn, struct obdo *oa,
138                 struct lov_stripe_md **ea)
139 {
140         struct obd_device *obd = class_conn2obd(conn);
141
142         if (!obd) {
143                 CERROR("invalid client %Lx\n", conn->addr);
144                 return -EINVAL;
145         }
146
147         if (!(oa->o_mode && S_IFMT)) {
148                 CERROR("filter obd: no type!\n");
149                 return -ENOENT;
150         }
151
152         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
153                 CERROR("invalid o_valid %08x\n", oa->o_valid);
154                 return -EINVAL;
155         }
156
157         oa->o_id = echo_next_id(obd);
158         oa->o_valid = OBD_MD_FLID;
159         atomic_inc(&obd->u.echo.eo_create);
160
161         return 0;
162 }
163
164 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
165                  struct lov_stripe_md *ea)
166 {
167         struct obd_device *obd = class_conn2obd(conn);
168
169         if (!obd) {
170                 CERROR("invalid client "LPX64"\n", conn->addr);
171                 RETURN(-EINVAL);
172         }
173
174         if (!(oa->o_valid & OBD_MD_FLID)) {
175                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
176                 RETURN(-EINVAL);
177         }
178
179         if (oa->o_id > obd->u.echo.eo_lastino || oa->o_id < ECHO_INIT_OBJID) {
180                 CERROR("bad destroy objid: "LPX64"\n", oa->o_id);
181                 RETURN(-EINVAL);
182         }
183
184         atomic_inc(&obd->u.echo.eo_destroy);
185
186         return 0;
187 }
188
189 static int echo_open(struct lustre_handle *conn, struct obdo *oa,
190                      struct lov_stripe_md *md)
191 {
192         return 0;
193 }
194
195 static int echo_close(struct lustre_handle *conn, struct obdo *oa,
196                       struct lov_stripe_md *md)
197 {
198         return 0;
199 }
200
201 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
202                         struct lov_stripe_md *md)
203 {
204         struct obd_device *obd = class_conn2obd(conn);
205         obd_id id = oa->o_id;
206
207         if (!obd) {
208                 CERROR("invalid client "LPX64"\n", conn->addr);
209                 RETURN(-EINVAL);
210         }
211
212         if (!(oa->o_valid & OBD_MD_FLID)) {
213                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
214                 RETURN(-EINVAL);
215         }
216
217         memcpy(oa, &obd->u.echo.oa, sizeof(*oa));
218         oa->o_id = id;
219         oa->o_valid |= OBD_MD_FLID;
220
221         atomic_inc(&echo_getattrs);
222
223         return 0;
224 }
225
226 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
227                         struct lov_stripe_md *md)
228 {
229         struct obd_device *obd = class_conn2obd(conn);
230
231         if (!obd) {
232                 CERROR("invalid client "LPX64"\n", conn->addr);
233                 RETURN(-EINVAL);
234         }
235
236         if (!(oa->o_valid & OBD_MD_FLID)) {
237                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
238                 RETURN(-EINVAL);
239         }
240
241         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
242
243         atomic_inc(&obd->u.echo.eo_setattr);
244
245         return 0;
246 }
247
248 /* This allows us to verify that desc_private is passed unmolested */
249 #define DESC_PRIV 0x10293847
250
251 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
252                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
253                 struct niobuf_local *res, void **desc_private)
254 {
255         struct obd_device *obd;
256         struct niobuf_local *r = res;
257         int rc = 0;
258         int i;
259
260         ENTRY;
261
262         obd = class_conn2obd(conn);
263         if (!obd) {
264                 CERROR("invalid client "LPX64"\n", conn->addr);
265                 RETURN(-EINVAL);
266         }
267
268         memset(res, 0, sizeof(*res) * niocount);
269
270         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
271                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
272
273         *desc_private = (void *)DESC_PRIV;
274
275         for (i = 0; i < objcount; i++, obj++) {
276                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
277                 int verify = obj->ioo_id != 0;
278                 int j;
279
280                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
281                         r->page = alloc_pages(gfp_mask, 0);
282                         if (!r->page) {
283                                 CERROR("can't get page %d/%d for id "LPU64"\n",
284                                        j, obj->ioo_bufcnt, obj->ioo_id);
285                                 GOTO(preprw_cleanup, rc = -ENOMEM);
286                         }
287                         atomic_inc(&obd->u.echo.eo_prep);
288
289                         r->offset = nb->offset;
290                         r->addr = kmap(r->page);
291                         r->len = nb->len;
292
293                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
294                                r->page, r->addr, r->offset);
295
296                         if (verify && cmd == OBD_BRW_READ)
297                                 page_debug_setup(r->addr, r->len, r->offset,
298                                                  obj->ioo_id);
299                         else if (verify)
300                                 page_debug_setup(r->addr, r->len,
301                                                  0xecc0ecc0ecc0ecc0,
302                                                  0xecc0ecc0ecc0ecc0);
303                 }
304         }
305         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
306                atomic_read(&obd->u.echo.eo_prep));
307
308         RETURN(0);
309
310 preprw_cleanup:
311         /* It is possible that we would rather handle errors by  allow
312          * any already-set-up pages to complete, rather than tearing them
313          * all down again.  I believe that this is what the in-kernel
314          * prep/commit operations do.
315          */
316         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
317         while (r-- > res) {
318                 kunmap(r->page);
319                 __free_pages(r->page, 0);
320                 atomic_dec(&obd->u.echo.eo_prep);
321         }
322         memset(res, 0, sizeof(*res) * niocount);
323
324         return rc;
325 }
326
327 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
328                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
329                   void *desc_private)
330 {
331         struct obd_device *obd;
332         struct niobuf_local *r = res;
333         int rc = 0;
334         int i;
335         ENTRY;
336
337         obd = class_conn2obd(conn);
338         if (!obd) {
339                 CERROR("invalid client "LPX64"\n", conn->addr);
340                 RETURN(-EINVAL);
341         }
342
343         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
344                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
345                        objcount, niocount);
346         } else {
347                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
348                        objcount, niocount);
349         }
350
351         if (niocount && !r) {
352                 CERROR("NULL res niobuf with niocount %d\n", niocount);
353                 RETURN(-EINVAL);
354         }
355
356         LASSERT(desc_private == (void *)DESC_PRIV);
357
358         for (i = 0; i < objcount; i++, obj++) {
359                 int verify = obj->ioo_id != 0;
360                 int j;
361
362                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
363                         struct page *page = r->page;
364                         void *addr;
365
366                         if (!page || !(addr = page_address(page)) ||
367                             !kern_addr_valid(addr)) {
368
369                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
370                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
371                                 GOTO(commitrw_cleanup, rc = -EFAULT);
372                         }
373
374                         atomic_inc(&echo_page_rws);
375
376                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
377                                r->page, addr, r->offset);
378
379                         if (verify)
380                                 page_debug_check("echo", addr, r->len,
381                                                  r->offset, obj->ioo_id);
382
383                         kunmap(page);
384                         __free_pages(page, 0);
385                         atomic_dec(&obd->u.echo.eo_prep);
386                 }
387         }
388         CDEBUG(D_PAGE, "%d pages remain after commit\n",
389                atomic_read(&obd->u.echo.eo_prep));
390         RETURN(0);
391
392 commitrw_cleanup:
393         CERROR("cleaning up %ld pages (%d obdos)\n",
394                niocount - (long)(r - res) - 1, objcount);
395         while (++r < res + niocount) {
396                 struct page *page = r->page;
397
398                 kunmap(page);
399                 __free_pages(page, 0);
400                 atomic_dec(&obd->u.echo.eo_prep);
401         }
402         return rc;
403 }
404
405 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
406 {
407         ENTRY;
408
409         obddev->obd_namespace =
410                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
411         if (obddev->obd_namespace == NULL) {
412                 LBUG();
413                 RETURN(-ENOMEM);
414         }
415         spin_lock_init(&obddev->u.echo.eo_lock);
416         obddev->u.echo.eo_lastino = ECHO_INIT_OBJID;
417
418         RETURN(0);
419 }
420
421 static int echo_cleanup(struct obd_device *obddev)
422 {
423         ENTRY;
424
425         ldlm_namespace_free(obddev->obd_namespace);
426         CERROR("%d prep/commitrw pages leaked\n",
427                atomic_read(&obddev->u.echo.eo_prep));
428
429         RETURN(0);
430 }
431
432 int echo_attach(struct obd_device *dev, 
433                    obd_count len, void *data)
434 {
435         int rc;
436         rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
437         return rc; 
438 }
439
440 int echo_detach(struct obd_device *dev)
441 {
442         int rc;
443         rc = lprocfs_dereg_obd(dev);
444         return rc;
445
446 }
447
448
449 struct obd_ops echo_obd_ops = {
450         o_attach:       echo_attach,
451         o_detach:       echo_detach,
452         o_connect:      echo_connect,
453         o_disconnect:   echo_disconnect,
454         o_create:       echo_create,
455         o_destroy:      echo_destroy,
456         o_open:         echo_open,
457         o_close:        echo_close,
458         o_getattr:      echo_getattr,
459         o_setattr:      echo_setattr,
460         o_preprw:       echo_preprw,
461         o_commitrw:     echo_commitrw,
462         o_setup:        echo_setup,
463         o_cleanup:      echo_cleanup
464 };
465
466 static int __init obdecho_init(void)
467 {
468         int rc;
469         
470
471         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION
472                " info@clusterfs.com\n");
473
474         echo_proc_init();
475         rc = class_register_type(&echo_obd_ops, 
476                                  (lprocfs_vars_t*)status_class_var, 
477                                  OBD_ECHO_DEVICENAME);
478         if (rc) RETURN(rc);
479         
480         return 0;
481
482 }
483
484 static void __exit obdecho_exit(void)
485 {
486                 
487         echo_proc_fini();
488
489         class_unregister_type(OBD_ECHO_DEVICENAME);
490 }
491
492 MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
493 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver " OBDECHO_VERSION);
494 MODULE_LICENSE("GPL");
495
496 module_init(obdecho_init);
497 module_exit(obdecho_exit);