Whamcloud - gitweb
Header changes needed to compile under 2.5 (compiled also with 2.4 to verify).
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdecho/echo.c
5  *
6  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  * This code is issued under the GNU General Public License.
9  * See the file COPYING in this distribution
10  *
11  * by Peter Braam <braam@clusterfs.com>
12  * and Andreas Dilger <adilger@clusterfs.com>
13  */
14
15 static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.40 2002/10/18 21:19:57 adilger Exp $";
16 #define OBDECHO_VERSION "$Revision: 1.40 $"
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/highmem.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/smp_lock.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <linux/proc_fs.h>
29 #include <linux/init.h>
30 #include <asm/unistd.h>
31
32 #define DEBUG_SUBSYSTEM S_ECHO
33
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include <linux/obd_echo.h>
37 #include <linux/lustre_debug.h>
38 #include <linux/lustre_dlm.h>
39
40 static atomic_t echo_page_rws;
41 static atomic_t echo_getattrs;
42
43 #define ECHO_PROC_STAT "sys/obdecho"
44 #define ECHO_INIT_OBJID 0x1000000000000000ULL
45
46 int echo_proc_read(char *page, char **start, off_t off, int count, int *eof,
47                    void *data)
48 {
49         long long attrs = atomic_read(&echo_getattrs);
50         long long pages = atomic_read(&echo_page_rws);
51         int len;
52
53         *eof = 1;
54         if (off != 0)
55                 return (0);
56
57         len = sprintf(page, "%Ld %Ld\n", attrs, pages);
58
59         *start = page;
60         return (len);
61 }
62
63 int echo_proc_write(struct file *file, const char *ubuffer,
64                     unsigned long count, void *data)
65 {
66         /* Ignore what we've been asked to write, and just zero the counters */
67         atomic_set (&echo_page_rws, 0);
68         atomic_set (&echo_getattrs, 0);
69
70         return (count);
71 }
72
73 void echo_proc_init(void)
74 {
75         struct proc_dir_entry *entry;
76
77         entry = create_proc_entry(ECHO_PROC_STAT, S_IFREG|S_IRUGO|S_IWUSR,NULL);
78
79         if (entry == NULL) {
80                 CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
81                 return;
82         }
83
84         entry->data = NULL;
85         entry->read_proc = echo_proc_read;
86         entry->write_proc = echo_proc_write;
87 }
88
89 void echo_proc_fini(void)
90 {
91         remove_proc_entry(ECHO_PROC_STAT, 0);
92 }
93
94 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
95                         obd_uuid_t cluuid, struct recovd_obd *recovd,
96                         ptlrpc_recovery_cb_t recover)
97 {
98         int rc;
99
100         MOD_INC_USE_COUNT;
101         rc = class_connect(conn, obd, cluuid);
102
103         if (rc)
104                 MOD_DEC_USE_COUNT;
105
106         return rc;
107 }
108
109 static int echo_disconnect(struct lustre_handle *conn)
110 {
111         int rc;
112
113         rc = class_disconnect(conn);
114         if (!rc)
115                 MOD_DEC_USE_COUNT;
116
117         return rc;
118 }
119
120 static __u64 echo_next_id(struct obd_device *obddev)
121 {
122         obd_id id;
123
124         spin_lock(&obddev->u.echo.eo_lock);
125         id = ++obddev->u.echo.eo_lastino;
126         spin_unlock(&obddev->u.echo.eo_lock);
127
128         return id;
129 }
130
131 int echo_create(struct lustre_handle *conn, struct obdo *oa,
132                 struct lov_stripe_md **ea)
133 {
134         struct obd_device *obd = class_conn2obd(conn);
135
136         if (!obd) {
137                 CERROR("invalid client %Lx\n", conn->addr);
138                 return -EINVAL;
139         }
140
141         if (!(oa->o_mode && S_IFMT)) {
142                 CERROR("filter obd: no type!\n");
143                 return -ENOENT;
144         }
145
146         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
147                 CERROR("invalid o_valid %08x\n", oa->o_valid);
148                 return -EINVAL;
149         }
150
151         oa->o_id = echo_next_id(obd);
152         oa->o_valid = OBD_MD_FLID;
153         atomic_inc(&obd->u.echo.eo_create);
154
155         return 0;
156 }
157
158 int echo_destroy(struct lustre_handle *conn, struct obdo *oa,
159                  struct lov_stripe_md *ea)
160 {
161         struct obd_device *obd = class_conn2obd(conn);
162
163         if (!obd) {
164                 CERROR("invalid client "LPX64"\n", conn->addr);
165                 RETURN(-EINVAL);
166         }
167
168         if (!(oa->o_valid & OBD_MD_FLID)) {
169                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
170                 RETURN(-EINVAL);
171         }
172
173         if (oa->o_id > obd->u.echo.eo_lastino || oa->o_id < ECHO_INIT_OBJID) {
174                 CERROR("bad destroy objid: "LPX64"\n", oa->o_id);
175                 RETURN(-EINVAL);
176         }
177
178         atomic_inc(&obd->u.echo.eo_destroy);
179
180         return 0;
181 }
182
183 static int echo_open(struct lustre_handle *conn, struct obdo *oa,
184                      struct lov_stripe_md *md)
185 {
186         return 0;
187 }
188
189 static int echo_close(struct lustre_handle *conn, struct obdo *oa,
190                       struct lov_stripe_md *md)
191 {
192         return 0;
193 }
194
195 static int echo_getattr(struct lustre_handle *conn, struct obdo *oa,
196                         struct lov_stripe_md *md)
197 {
198         struct obd_device *obd = class_conn2obd(conn);
199         obd_id id = oa->o_id;
200
201         if (!obd) {
202                 CERROR("invalid client "LPX64"\n", conn->addr);
203                 RETURN(-EINVAL);
204         }
205
206         if (!(oa->o_valid & OBD_MD_FLID)) {
207                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
208                 RETURN(-EINVAL);
209         }
210
211         memcpy(oa, &obd->u.echo.oa, sizeof(*oa));
212         oa->o_id = id;
213         oa->o_valid |= OBD_MD_FLID;
214
215         atomic_inc(&echo_getattrs);
216
217         return 0;
218 }
219
220 static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
221                         struct lov_stripe_md *md)
222 {
223         struct obd_device *obd = class_conn2obd(conn);
224
225         if (!obd) {
226                 CERROR("invalid client "LPX64"\n", conn->addr);
227                 RETURN(-EINVAL);
228         }
229
230         if (!(oa->o_valid & OBD_MD_FLID)) {
231                 CERROR("obdo missing FLID valid flag: %08x\n", oa->o_valid);
232                 RETURN(-EINVAL);
233         }
234
235         memcpy(&obd->u.echo.oa, oa, sizeof(*oa));
236
237         atomic_inc(&obd->u.echo.eo_setattr);
238
239         return 0;
240 }
241
242 /* This allows us to verify that desc_private is passed unmolested */
243 #define DESC_PRIV 0x10293847
244
245 int echo_preprw(int cmd, struct lustre_handle *conn, int objcount,
246                 struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
247                 struct niobuf_local *res, void **desc_private)
248 {
249         struct obd_device *obd;
250         struct niobuf_local *r = res;
251         int rc = 0;
252         int i;
253
254         ENTRY;
255
256         obd = class_conn2obd(conn);
257         if (!obd) {
258                 CERROR("invalid client "LPX64"\n", conn->addr);
259                 RETURN(-EINVAL);
260         }
261
262         memset(res, 0, sizeof(*res) * niocount);
263
264         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
265                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
266
267         *desc_private = (void *)DESC_PRIV;
268
269         for (i = 0; i < objcount; i++, obj++) {
270                 int gfp_mask = (obj->ioo_id & 1) ? GFP_HIGHUSER : GFP_KERNEL;
271                 int verify = obj->ioo_id != 0;
272                 int j;
273
274                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
275                         r->page = alloc_pages(gfp_mask, 0);
276                         if (!r->page) {
277                                 CERROR("can't get page %d/%d for id "LPU64"\n",
278                                        j, obj->ioo_bufcnt, obj->ioo_id);
279                                 GOTO(preprw_cleanup, rc = -ENOMEM);
280                         }
281                         atomic_inc(&obd->u.echo.eo_prep);
282
283                         r->offset = nb->offset;
284                         r->addr = kmap(r->page);
285                         r->len = nb->len;
286
287                         CDEBUG(D_PAGE, "$$$$ get page %p, addr %p@"LPU64"\n",
288                                r->page, r->addr, r->offset);
289
290                         if (verify && cmd == OBD_BRW_READ)
291                                 page_debug_setup(r->addr, r->len, r->offset,
292                                                  obj->ioo_id);
293                         else if (verify)
294                                 page_debug_setup(r->addr, r->len,
295                                                  0xecc0ecc0ecc0ecc0,
296                                                  0xecc0ecc0ecc0ecc0);
297                 }
298         }
299         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
300                atomic_read(&obd->u.echo.eo_prep));
301
302         RETURN(0);
303
304 preprw_cleanup:
305         /* It is possible that we would rather handle errors by  allow
306          * any already-set-up pages to complete, rather than tearing them
307          * all down again.  I believe that this is what the in-kernel
308          * prep/commit operations do.
309          */
310         CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
311         while (r-- > res) {
312                 kunmap(r->page);
313                 __free_pages(r->page, 0);
314                 atomic_dec(&obd->u.echo.eo_prep);
315         }
316         memset(res, 0, sizeof(*res) * niocount);
317
318         return rc;
319 }
320
321 int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
322                   struct obd_ioobj *obj, int niocount, struct niobuf_local *res,
323                   void *desc_private)
324 {
325         struct obd_device *obd;
326         struct niobuf_local *r = res;
327         int rc = 0;
328         int i;
329         ENTRY;
330
331         obd = class_conn2obd(conn);
332         if (!obd) {
333                 CERROR("invalid client "LPX64"\n", conn->addr);
334                 RETURN(-EINVAL);
335         }
336
337         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
338                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
339                        objcount, niocount);
340         } else {
341                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
342                        objcount, niocount);
343         }
344
345         if (niocount && !r) {
346                 CERROR("NULL res niobuf with niocount %d\n", niocount);
347                 RETURN(-EINVAL);
348         }
349
350         LASSERT(desc_private == (void *)DESC_PRIV);
351
352         for (i = 0; i < objcount; i++, obj++) {
353                 int verify = obj->ioo_id != 0;
354                 int j;
355
356                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, r++) {
357                         struct page *page = r->page;
358                         void *addr;
359
360                         if (!page || !(addr = page_address(page)) ||
361                             !kern_addr_valid(addr)) {
362
363                                 CERROR("bad page objid "LPU64":%p, buf %d/%d\n",
364                                        obj->ioo_id, page, j, obj->ioo_bufcnt);
365                                 GOTO(commitrw_cleanup, rc = -EFAULT);
366                         }
367
368                         atomic_inc(&echo_page_rws);
369
370                         CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@"LPU64"\n",
371                                r->page, addr, r->offset);
372
373                         if (verify)
374                                 page_debug_check("echo", addr, r->len,
375                                                  r->offset, obj->ioo_id);
376
377                         kunmap(page);
378                         __free_pages(page, 0);
379                         atomic_dec(&obd->u.echo.eo_prep);
380                 }
381         }
382         CDEBUG(D_PAGE, "%d pages remain after commit\n",
383                atomic_read(&obd->u.echo.eo_prep));
384         RETURN(0);
385
386 commitrw_cleanup:
387         CERROR("cleaning up %ld pages (%d obdos)\n",
388                niocount - (long)(r - res) - 1, objcount);
389         while (++r < res + niocount) {
390                 struct page *page = r->page;
391
392                 kunmap(page);
393                 __free_pages(page, 0);
394                 atomic_dec(&obd->u.echo.eo_prep);
395         }
396         return rc;
397 }
398
399 static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
400 {
401         ENTRY;
402
403         obddev->obd_namespace =
404                 ldlm_namespace_new("echo-tgt", LDLM_NAMESPACE_SERVER);
405         if (obddev->obd_namespace == NULL) {
406                 LBUG();
407                 RETURN(-ENOMEM);
408         }
409         spin_lock_init(&obddev->u.echo.eo_lock);
410         obddev->u.echo.eo_lastino = ECHO_INIT_OBJID;
411
412         RETURN(0);
413 }
414
415 static int echo_cleanup(struct obd_device *obddev)
416 {
417         ENTRY;
418
419         ldlm_namespace_free(obddev->obd_namespace);
420         CERROR("%d prep/commitrw pages leaked\n",
421                atomic_read(&obddev->u.echo.eo_prep));
422
423         RETURN(0);
424 }
425
426 struct obd_ops echo_obd_ops = {
427         o_connect:      echo_connect,
428         o_disconnect:   echo_disconnect,
429         o_create:       echo_create,
430         o_destroy:      echo_destroy,
431         o_open:         echo_open,
432         o_close:        echo_close,
433         o_getattr:      echo_getattr,
434         o_setattr:      echo_setattr,
435         o_preprw:       echo_preprw,
436         o_commitrw:     echo_commitrw,
437         o_setup:        echo_setup,
438         o_cleanup:      echo_cleanup
439 };
440
441 static int __init obdecho_init(void)
442 {
443         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION
444                " info@clusterfs.com\n");
445
446         echo_proc_init();
447
448         return class_register_type(&echo_obd_ops, OBD_ECHO_DEVICENAME);
449 }
450
451 static void __exit obdecho_exit(void)
452 {
453         echo_proc_fini();
454
455         class_unregister_type(OBD_ECHO_DEVICENAME);
456 }
457
458 MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
459 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver " OBDECHO_VERSION);
460 MODULE_LICENSE("GPL");
461
462 module_init(obdecho_init);
463 module_exit(obdecho_exit);