Whamcloud - gitweb
* Stopped outputting error messages on lctl set_route, when <nid> isn't
[fs/lustre-release.git] / lustre / portals / router / router.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "router.h"
25
26 LIST_HEAD(kpr_routes);
27 LIST_HEAD(kpr_gateways);
28 LIST_HEAD(kpr_nals);
29
30 unsigned long long kpr_fwd_bytes;
31 unsigned long      kpr_fwd_packets;
32 unsigned long      kpr_fwd_errors;
33 atomic_t           kpr_queue_depth;
34
35 /* Mostly the tables are read-only (thread and interrupt context)
36  *
37  * Once in a blue moon we register/deregister NALs and add/remove routing
38  * entries (thread context only)... */
39 rwlock_t         kpr_rwlock = RW_LOCK_UNLOCKED;
40
41 kpr_router_interface_t kpr_router_interface = {
42         kprri_register:         kpr_register_nal,
43         kprri_lookup:           kpr_lookup_target,
44         kprri_fwd_start:        kpr_forward_packet,
45         kprri_fwd_done:         kpr_complete_packet,
46         kprri_notify:           kpr_nal_notify,
47         kprri_shutdown:         kpr_shutdown_nal,
48         kprri_deregister:       kpr_deregister_nal,
49 };
50
51 kpr_control_interface_t kpr_control_interface = {
52         kprci_add_route:        kpr_add_route,
53         kprci_del_route:        kpr_del_route,
54         kprci_get_route:        kpr_get_route,
55         kprci_notify:           kpr_sys_notify,
56 };
57
58 int
59 kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
60 {
61         unsigned long      flags;
62         struct list_head  *e;
63         kpr_nal_entry_t   *ne;
64
65         CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
66
67         PORTAL_ALLOC (ne, sizeof (*ne));
68         if (ne == NULL)
69                 return (-ENOMEM);
70
71         memset (ne, 0, sizeof (*ne));
72         memcpy ((void *)&ne->kpne_interface, (void *)nalif, sizeof (*nalif));
73
74         LASSERT (!in_interrupt());
75         write_lock_irqsave (&kpr_rwlock, flags);
76
77         for (e = kpr_nals.next; e != &kpr_nals; e = e->next)
78         {
79                 kpr_nal_entry_t *ne2 = list_entry (e, kpr_nal_entry_t, kpne_list);
80
81                 if (ne2->kpne_interface.kprni_nalid == ne->kpne_interface.kprni_nalid)
82                 {
83                         write_unlock_irqrestore (&kpr_rwlock, flags);
84
85                         CERROR ("Attempt to register same NAL %d twice\n", ne->kpne_interface.kprni_nalid);
86
87                         PORTAL_FREE (ne, sizeof (*ne));
88                         return (-EEXIST);
89                 }
90         }
91
92         list_add (&ne->kpne_list, &kpr_nals);
93
94         write_unlock_irqrestore (&kpr_rwlock, flags);
95
96         *argp = ne;
97         PORTAL_MODULE_USE;
98         return (0);
99 }
100
101 void
102 kpr_do_upcall (void *arg)
103 {
104         kpr_upcall_t *u = (kpr_upcall_t *)arg;
105         char          nalstr[10];
106         char          nidstr[36];
107         char          whenstr[36];
108         char         *argv[] = {
109                 NULL,
110                 "ROUTER_NOTIFY",
111                 nalstr,
112                 nidstr,
113                 u->kpru_alive ? "up" : "down",
114                 whenstr,
115                 NULL};
116         
117         snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
118         snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
119         snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
120
121         portals_run_upcall (argv);
122
123         kfree (u);
124 }
125
126 void
127 kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
128 {
129         /* May be in arbitrary context */
130         kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
131
132         if (u == NULL) {
133                 CERROR ("Upcall out of memory: nal %d nid "LPX64" %s\n",
134                         gw_nalid, gw_nid, alive ? "up" : "down");
135                 return;
136         }
137
138         u->kpru_nal_id     = gw_nalid;
139         u->kpru_nid        = gw_nid;
140         u->kpru_alive      = alive;
141         u->kpru_when       = when;
142
143         prepare_work (&u->kpru_tq, kpr_do_upcall, u);
144         schedule_work (&u->kpru_tq);
145 }
146
147 int
148 kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
149                int alive, time_t when)
150 {
151         unsigned long        flags;
152         int                  found;
153         kpr_nal_entry_t     *ne = NULL;
154         kpr_gateway_entry_t *ge = NULL;
155         struct timeval       now;
156         struct list_head    *e;
157         struct list_head    *n;
158
159         CDEBUG (D_NET, "%s notifying [%d] "LPX64": %s\n", 
160                 byNal ? "NAL" : "userspace", 
161                 gateway_nalid, gateway_nid, alive ? "up" : "down");
162
163         /* can't do predictions... */
164         do_gettimeofday (&now);
165         if (when > now.tv_sec) {
166                 CWARN ("Ignoring prediction from %s of [%d] "LPX64" %s "
167                        "%ld seconds in the future\n", 
168                        byNal ? "NAL" : "userspace", 
169                        gateway_nalid, gateway_nid, 
170                        alive ? "up" : "down",
171                        when - now.tv_sec);
172                 return (EINVAL);
173         }
174
175         LASSERT (when <= now.tv_sec);
176
177         /* Serialise with lookups (i.e. write lock) */
178         write_lock_irqsave(&kpr_rwlock, flags);
179
180         found = 0;
181         list_for_each_safe (e, n, &kpr_gateways) {
182
183                 ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
184                 if ((gateway_nalid != 0 &&
185                      ge->kpge_nalid != gateway_nalid) ||
186                     ge->kpge_nid != gateway_nid)
187                         continue;
188
189                 found = 1;
190                 break;
191         }
192
193         if (!found) {
194                 /* gateway not found */
195                 write_unlock_irqrestore(&kpr_rwlock, flags);
196                 CDEBUG (D_NET, "Gateway not found\n");
197                 return (0);
198         }
199         
200         if (when < ge->kpge_timestamp) {
201                 /* out of date information */
202                 write_unlock_irqrestore (&kpr_rwlock, flags);
203                 CDEBUG (D_NET, "Out of date\n");
204                 return (0);
205         }
206
207         /* update timestamp */
208         ge->kpge_timestamp = when;
209
210         if ((!ge->kpge_alive) == (!alive)) {
211                 /* new date for old news */
212                 write_unlock_irqrestore (&kpr_rwlock, flags);
213                 CDEBUG (D_NET, "Old news\n");
214                 return (0);
215         }
216
217         ge->kpge_alive = alive;
218         CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
219
220         if (alive) {
221                 /* Reset all gateway weights so the newly-enabled gateway
222                  * doesn't have to play catch-up */
223                 list_for_each_safe (e, n, &kpr_gateways) {
224                         kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
225                                                              kpge_list);
226                         atomic_set (&ge->kpge_weight, 0);
227                 }
228         }
229
230         found = 0;
231         if (!byNal) {
232                 /* userland notified me: notify NAL? */
233                 ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
234                 if (ne != NULL) {
235                         if (!ne->kpne_shutdown &&
236                             ne->kpne_interface.kprni_notify != NULL) {
237                                 /* take a ref on this NAL until notifying
238                                  * it has completed... */
239                                 atomic_inc (&ne->kpne_refcount);
240                                 found = 1;
241                         }
242                 }
243         }
244
245         write_unlock_irqrestore(&kpr_rwlock, flags);
246
247         if (found) {
248                 ne->kpne_interface.kprni_notify (ne->kpne_interface.kprni_arg,
249                                                  gateway_nid, alive);
250                 /* 'ne' can disappear now... */
251                 atomic_dec (&ne->kpne_refcount);
252         }
253         
254         if (byNal) {
255                 /* It wasn't userland that notified me... */
256                 CWARN ("Upcall: NAL %d NID "LPX64" is %s\n",
257                        gateway_nalid, gateway_nid,
258                        alive ? "alive" : "dead");
259                 kpr_upcall (gateway_nalid, gateway_nid, alive, when);
260         } else {
261                 CDEBUG (D_NET, " NOT Doing upcall\n");
262         }
263         
264         return (0);
265 }
266
267 void
268 kpr_nal_notify (void *arg, ptl_nid_t peer, int alive, time_t when)
269 {
270         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
271         
272         kpr_do_notify (1, ne->kpne_interface.kprni_nalid, peer, alive, when);
273 }
274
275 void
276 kpr_shutdown_nal (void *arg)
277 {
278         unsigned long    flags;
279         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
280
281         CDEBUG (D_NET, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
282
283         LASSERT (!ne->kpne_shutdown);
284         LASSERT (!in_interrupt());
285
286         write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
287         ne->kpne_shutdown = 1;
288         write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
289
290         while (atomic_read (&ne->kpne_refcount) != 0)
291         {
292                 CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
293                         ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
294
295                 set_current_state (TASK_UNINTERRUPTIBLE);
296                 schedule_timeout (HZ);
297         }
298 }
299
300 void
301 kpr_deregister_nal (void *arg)
302 {
303         unsigned long     flags;
304         kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
305
306         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
307
308         LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
309         LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
310         LASSERT (!in_interrupt());
311
312         write_lock_irqsave (&kpr_rwlock, flags);
313
314         list_del (&ne->kpne_list);
315
316         write_unlock_irqrestore (&kpr_rwlock, flags);
317
318         PORTAL_FREE (ne, sizeof (*ne));
319         PORTAL_MODULE_UNUSE;
320 }
321
322 int
323 kpr_ge_isbetter (kpr_gateway_entry_t *ge1, kpr_gateway_entry_t *ge2)
324 {
325         const int significant_bits = 0x00ffffff;
326         /* We use atomic_t to record/compare route weights for
327          * load-balancing.  Here we limit ourselves to only using
328          * 'significant_bits' when we do an 'after' comparison */
329
330         int    diff = (atomic_read (&ge1->kpge_weight) -
331                        atomic_read (&ge2->kpge_weight)) & significant_bits;
332         int    rc = (diff > (significant_bits >> 1));
333
334         CDEBUG(D_NET, "[%p]"LPX64"=%d %s [%p]"LPX64"=%d\n",
335                ge1, ge1->kpge_nid, atomic_read (&ge1->kpge_weight),
336                rc ? ">" : "<",
337                ge2, ge2->kpge_nid, atomic_read (&ge2->kpge_weight));
338
339         return (rc);
340 }
341
342 void
343 kpr_update_weight (kpr_gateway_entry_t *ge, int nob)
344 {
345         int weight = 1 + (nob + sizeof (ptl_hdr_t)/2)/sizeof (ptl_hdr_t);
346
347         /* We've chosen this route entry (i.e. gateway) to forward payload
348          * of length 'nob'; update the route's weight to make it less
349          * favoured.  Note that the weight is 1 plus the payload size
350          * rounded and scaled to the portals header size, so we get better
351          * use of the significant bits in kpge_weight. */
352
353         CDEBUG(D_NET, "gateway [%p]"LPX64" += %d\n", ge,
354                ge->kpge_nid, weight);
355         
356         atomic_add (weight, &ge->kpge_weight);
357 }
358
359 int
360 kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
361                    ptl_nid_t *gateway_nidp)
362 {
363         kpr_nal_entry_t     *ne = (kpr_nal_entry_t *)arg;
364         struct list_head    *e;
365         kpr_route_entry_t   *re;
366         kpr_gateway_entry_t *ge = NULL;
367         int                  rc = -ENOENT;
368
369         /* Caller wants to know if 'target_nid' can be reached via a gateway
370          * ON HER OWN NETWORK */
371
372         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
373                 ne->kpne_interface.kprni_nalid);
374
375         if (ne->kpne_shutdown)          /* caller is shutting down */
376                 return (-ENOENT);
377
378         read_lock (&kpr_rwlock);
379
380         /* Search routes for one that has a gateway to target_nid on the callers network */
381
382         list_for_each (e, &kpr_routes) {
383                 re = list_entry (e, kpr_route_entry_t, kpre_list);
384
385                 if (re->kpre_lo_nid > target_nid ||
386                     re->kpre_hi_nid < target_nid)
387                         continue;
388
389                 /* found table entry */
390
391                 if (re->kpre_gateway->kpge_nalid != ne->kpne_interface.kprni_nalid ||
392                     !re->kpre_gateway->kpge_alive) {
393                         /* different NAL or gateway down */
394                         rc = -EHOSTUNREACH;
395                         continue;
396                 }
397                 
398                 if (ge == NULL ||
399                     kpr_ge_isbetter (re->kpre_gateway, ge))
400                     ge = re->kpre_gateway;
401         }
402
403         if (ge != NULL) {
404                 kpr_update_weight (ge, nob);
405                 *gateway_nidp = ge->kpge_nid;
406                 rc = 0;
407         }
408         
409         read_unlock (&kpr_rwlock);
410
411         /* NB can't deref 're' now; it might have been removed! */
412
413         CDEBUG (D_NET, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
414                 target_nid, ne->kpne_interface.kprni_nalid, rc,
415                 (rc == 0) ? *gateway_nidp : (ptl_nid_t)0);
416         return (rc);
417 }
418
419 kpr_nal_entry_t *
420 kpr_find_nal_entry_locked (int nal_id)
421 {
422         struct list_head    *e;
423         
424         /* Called with kpr_rwlock held */
425
426         list_for_each (e, &kpr_nals) {
427                 kpr_nal_entry_t *ne = list_entry (e, kpr_nal_entry_t, kpne_list);
428
429                 if (nal_id != ne->kpne_interface.kprni_nalid) /* no match */
430                         continue;
431
432                 return (ne);
433         }
434         
435         return (NULL);
436 }
437
438 void
439 kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
440 {
441         kpr_nal_entry_t     *src_ne = (kpr_nal_entry_t *)arg;
442         ptl_nid_t            target_nid = fwd->kprfd_target_nid;
443         int                  nob = fwd->kprfd_nob;
444         kpr_gateway_entry_t *ge = NULL;
445         kpr_nal_entry_t     *dst_ne = NULL;
446         struct list_head    *e;
447         kpr_route_entry_t   *re;
448         kpr_nal_entry_t     *tmp_ne;
449
450         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
451                 target_nid, src_ne->kpne_interface.kprni_nalid);
452
453         LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
454         LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov));
455         
456         atomic_inc (&kpr_queue_depth);
457         atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
458
459         kpr_fwd_packets++;                   /* (loose) stats accounting */
460         kpr_fwd_bytes += nob;
461
462         if (src_ne->kpne_shutdown)           /* caller is shutting down */
463                 goto out;
464
465         fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
466
467         read_lock (&kpr_rwlock);
468
469         /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
470
471         list_for_each (e, &kpr_routes) {
472                 re = list_entry (e, kpr_route_entry_t, kpre_list);
473
474                 if (re->kpre_lo_nid > target_nid || /* no match */
475                     re->kpre_hi_nid < target_nid)
476                         continue;
477
478                 if (re->kpre_gateway->kpge_nalid == src_ne->kpne_interface.kprni_nalid)
479                         continue;               /* don't route to same NAL */
480
481                 if (!re->kpre_gateway->kpge_alive)
482                         continue;               /* gateway is dead */
483                 
484                 tmp_ne = kpr_find_nal_entry_locked (re->kpre_gateway->kpge_nalid);
485
486                 if (tmp_ne == NULL ||
487                     tmp_ne->kpne_shutdown) {
488                         /* NAL must be registered and not shutting down */
489                         continue;
490                 }
491
492                 if (ge == NULL ||
493                     kpr_ge_isbetter (re->kpre_gateway, ge)) {
494                         ge = re->kpre_gateway;
495                         dst_ne = tmp_ne;
496                 }
497         }
498         
499         if (ge != NULL) {
500                 LASSERT (dst_ne != NULL);
501                 
502                 kpr_update_weight (ge, nob);
503
504                 fwd->kprfd_gateway_nid = ge->kpge_nid;
505                 atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
506
507                 read_unlock (&kpr_rwlock);
508
509                 CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d: "
510                         "to "LPX64" on NAL %d\n", 
511                         fwd, target_nid, src_ne->kpne_interface.kprni_nalid,
512                         fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
513
514                 dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
515                 return;
516         }
517
518         read_unlock (&kpr_rwlock);
519  out:
520         kpr_fwd_errors++;
521
522         CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
523                 target_nid, src_ne->kpne_interface.kprni_nalid);
524
525         /* Can't find anywhere to forward to */
526         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
527
528         atomic_dec (&kpr_queue_depth);
529         atomic_dec (&src_ne->kpne_refcount);
530 }
531
532 void
533 kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
534 {
535         kpr_nal_entry_t *dst_ne = (kpr_nal_entry_t *)arg;
536         kpr_nal_entry_t *src_ne = (kpr_nal_entry_t *)fwd->kprfd_router_arg;
537
538         CDEBUG (D_NET, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
539                 src_ne->kpne_interface.kprni_nalid, dst_ne->kpne_interface.kprni_nalid, error);
540
541         atomic_dec (&dst_ne->kpne_refcount);    /* CAVEAT EMPTOR dst_ne can disappear now!!! */
542
543         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, error);
544
545         CDEBUG (D_NET, "complete(2) [%p] from NAL %d: %d\n", fwd,
546                 src_ne->kpne_interface.kprni_nalid, error);
547
548         atomic_dec (&kpr_queue_depth);
549         atomic_dec (&src_ne->kpne_refcount);    /* CAVEAT EMPTOR src_ne can disappear now!!! */
550 }
551
552 int
553 kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, 
554                ptl_nid_t lo_nid, ptl_nid_t hi_nid)
555 {
556         unsigned long        flags;
557         struct list_head    *e;
558         kpr_route_entry_t   *re;
559         kpr_gateway_entry_t *ge;
560         int                  dup = 0;
561
562         CDEBUG(D_NET, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
563                gateway_nalid, gateway_nid, lo_nid, hi_nid);
564
565         if (gateway_nalid == PTL_NID_ANY ||
566             lo_nid == PTL_NID_ANY ||
567             hi_nid == PTL_NID_ANY ||
568             lo_nid > hi_nid)
569                 return (-EINVAL);
570
571         PORTAL_ALLOC (ge, sizeof (*ge));
572         if (ge == NULL)
573                 return (-ENOMEM);
574
575         ge->kpge_nalid = gateway_nalid;
576         ge->kpge_nid   = gateway_nid;
577         ge->kpge_alive = 1;
578         ge->kpge_timestamp = 0;
579         ge->kpge_refcount = 0;
580         atomic_set (&ge->kpge_weight, 0);
581
582         PORTAL_ALLOC (re, sizeof (*re));
583         if (re == NULL)
584                 return (-ENOMEM);
585
586         re->kpre_lo_nid = lo_nid;
587         re->kpre_hi_nid = hi_nid;
588
589         LASSERT(!in_interrupt());
590         write_lock_irqsave (&kpr_rwlock, flags);
591
592         list_for_each (e, &kpr_gateways) {
593                 kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
594                                                       kpge_list);
595                 
596                 if (ge2->kpge_nalid == gateway_nalid &&
597                     ge2->kpge_nid == gateway_nid) {
598                         PORTAL_FREE (ge, sizeof (*ge));
599                         ge = ge2;
600                         dup = 1;
601                         break;
602                 }
603         }
604
605         if (!dup) {
606                 /* Adding a new gateway... */
607  
608                 list_add (&ge->kpge_list, &kpr_gateways);
609
610                 /* ...zero all gateway weights so this one doesn't have to
611                  * play catch-up */
612
613                 list_for_each (e, &kpr_gateways) {
614                         kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
615                                                               kpge_list);
616                         atomic_set (&ge2->kpge_weight, 0);
617                 }
618                 
619         }
620
621         re->kpre_gateway = ge;
622         ge->kpge_refcount++;
623         list_add (&re->kpre_list, &kpr_routes);
624
625         write_unlock_irqrestore (&kpr_rwlock, flags);
626         return (0);
627 }
628
629 int
630 kpr_sys_notify (int gateway_nalid, ptl_nid_t gateway_nid,
631             int alive, time_t when)
632 {
633         return (kpr_do_notify (0, gateway_nalid, gateway_nid, alive, when));
634 }
635
636 int
637 kpr_del_route (int gw_nalid, ptl_nid_t gw_nid,
638                ptl_nid_t lo, ptl_nid_t hi)
639 {
640         int                specific = (lo != PTL_NID_ANY);
641         unsigned long      flags;
642         int                rc = -ENOENT;
643         struct list_head  *e;
644         struct list_head  *n;
645
646         CDEBUG(D_NET, "Del route [%d] "LPX64" : "LPX64" - "LPX64"\n", 
647                gw_nalid, gw_nid, lo, hi);
648
649         LASSERT(!in_interrupt());
650
651         /* NB Caller may specify either all routes via the given gateway
652          * (lo/hi == PTL_NID_ANY) or a specific route entry (lo/hi are
653          * actual NIDs) */
654         
655         if (specific ? (hi == PTL_NID_ANY || hi < lo) : (hi != PTL_NID_ANY))
656                 return (-EINVAL);
657
658         write_lock_irqsave(&kpr_rwlock, flags);
659
660         list_for_each_safe (e, n, &kpr_routes) {
661                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
662                                                    kpre_list);
663                 kpr_gateway_entry_t *ge = re->kpre_gateway;
664                 
665                 if (ge->kpge_nalid != gw_nalid ||
666                     ge->kpge_nid != gw_nid ||
667                     (specific && 
668                      (lo != re->kpre_lo_nid || hi != re->kpre_hi_nid)))
669                         continue;
670
671                 rc = 0;
672
673                 if (--ge->kpge_refcount == 0) {
674                         list_del (&ge->kpge_list);
675                         PORTAL_FREE (ge, sizeof (*ge));
676                 }
677
678                 list_del (&re->kpre_list);
679                 PORTAL_FREE(re, sizeof (*re));
680
681                 if (specific)
682                         break;
683         }
684
685         write_unlock_irqrestore(&kpr_rwlock, flags);
686         return (rc);
687 }
688
689 int
690 kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
691                ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive)
692 {
693         struct list_head  *e;
694
695         read_lock(&kpr_rwlock);
696
697         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
698                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
699                                                      kpre_list);
700                 kpr_gateway_entry_t *ge = re->kpre_gateway;
701                 
702                 if (idx-- == 0) {
703                         *gateway_nalid = ge->kpge_nalid;
704                         *gateway_nid = ge->kpge_nid;
705                         *alive = ge->kpge_alive;
706                         *lo_nid = re->kpre_lo_nid;
707                         *hi_nid = re->kpre_hi_nid;
708
709                         read_unlock(&kpr_rwlock);
710                         return (0);
711                 }
712         }
713
714         read_unlock (&kpr_rwlock);
715         return (-ENOENT);
716 }
717
718 static void /*__exit*/
719 kpr_finalise (void)
720 {
721         LASSERT (list_empty (&kpr_nals));
722
723         while (!list_empty (&kpr_routes)) {
724                 kpr_route_entry_t *re = list_entry(kpr_routes.next,
725                                                    kpr_route_entry_t,
726                                                    kpre_list);
727
728                 list_del(&re->kpre_list);
729                 PORTAL_FREE(re, sizeof (*re));
730         }
731
732         kpr_proc_fini();
733
734         PORTAL_SYMBOL_UNREGISTER(kpr_router_interface);
735         PORTAL_SYMBOL_UNREGISTER(kpr_control_interface);
736
737         CDEBUG(D_MALLOC, "kpr_finalise: kmem back to %d\n",
738                atomic_read(&portal_kmemory));
739 }
740
741 static int __init
742 kpr_initialise (void)
743 {
744         CDEBUG(D_MALLOC, "kpr_initialise: kmem %d\n",
745                atomic_read(&portal_kmemory));
746
747         kpr_proc_init();
748
749         PORTAL_SYMBOL_REGISTER(kpr_router_interface);
750         PORTAL_SYMBOL_REGISTER(kpr_control_interface);
751         return (0);
752 }
753
754 MODULE_AUTHOR("Eric Barton");
755 MODULE_DESCRIPTION("Kernel Portals Router v0.01");
756 MODULE_LICENSE("GPL");
757
758 module_init (kpr_initialise);
759 module_exit (kpr_finalise);
760
761 EXPORT_SYMBOL (kpr_control_interface);
762 EXPORT_SYMBOL (kpr_router_interface);