Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
[fs/lustre-release.git] / lnet / router / router.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "router.h"
25
26 LIST_HEAD(kpr_routes);
27 LIST_HEAD(kpr_gateways);
28 LIST_HEAD(kpr_nals);
29
30 unsigned long long kpr_fwd_bytes;
31 unsigned long      kpr_fwd_packets;
32 unsigned long      kpr_fwd_errors;
33 atomic_t           kpr_queue_depth;
34
35 /* Mostly the tables are read-only (thread and interrupt context)
36  *
37  * Once in a blue moon we register/deregister NALs and add/remove routing
38  * entries (thread context only)... */
39 rwlock_t         kpr_rwlock = RW_LOCK_UNLOCKED;
40
41 kpr_router_interface_t kpr_router_interface = {
42         kprri_register:         kpr_register_nal,
43         kprri_lookup:           kpr_lookup_target,
44         kprri_fwd_start:        kpr_forward_packet,
45         kprri_fwd_done:         kpr_complete_packet,
46         kprri_notify:           kpr_nal_notify,
47         kprri_shutdown:         kpr_shutdown_nal,
48         kprri_deregister:       kpr_deregister_nal,
49 };
50
51 kpr_control_interface_t kpr_control_interface = {
52         kprci_add_route:        kpr_add_route,
53         kprci_del_route:        kpr_del_route,
54         kprci_get_route:        kpr_get_route,
55         kprci_notify:           kpr_sys_notify,
56 };
57
58 int
59 kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
60 {
61         unsigned long      flags;
62         struct list_head  *e;
63         kpr_nal_entry_t   *ne;
64
65         CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
66
67         PORTAL_ALLOC (ne, sizeof (*ne));
68         if (ne == NULL)
69                 return (-ENOMEM);
70
71         memset (ne, 0, sizeof (*ne));
72         memcpy ((void *)&ne->kpne_interface, (void *)nalif, sizeof (*nalif));
73
74         LASSERT (!in_interrupt());
75         write_lock_irqsave (&kpr_rwlock, flags);
76
77         for (e = kpr_nals.next; e != &kpr_nals; e = e->next)
78         {
79                 kpr_nal_entry_t *ne2 = list_entry (e, kpr_nal_entry_t, kpne_list);
80
81                 if (ne2->kpne_interface.kprni_nalid == ne->kpne_interface.kprni_nalid)
82                 {
83                         write_unlock_irqrestore (&kpr_rwlock, flags);
84
85                         CERROR ("Attempt to register same NAL %d twice\n", ne->kpne_interface.kprni_nalid);
86
87                         PORTAL_FREE (ne, sizeof (*ne));
88                         return (-EEXIST);
89                 }
90         }
91
92         list_add (&ne->kpne_list, &kpr_nals);
93
94         write_unlock_irqrestore (&kpr_rwlock, flags);
95
96         *argp = ne;
97         PORTAL_MODULE_USE;
98         return (0);
99 }
100
101 void
102 kpr_do_upcall (void *arg)
103 {
104         kpr_upcall_t *u = (kpr_upcall_t *)arg;
105         char          nalstr[10];
106         char          nidstr[36];
107         char          whenstr[36];
108         char         *argv[] = {
109                 NULL,
110                 "ROUTER_NOTIFY",
111                 nalstr,
112                 nidstr,
113                 u->kpru_alive ? "up" : "down",
114                 whenstr,
115                 NULL};
116         
117         snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
118         snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
119         snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
120
121         portals_run_upcall (argv);
122 }
123
124 void
125 kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
126 {
127         /* May be in arbitrary context */
128         kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
129
130         if (u == NULL) {
131                 CERROR ("Upcall out of memory: nal %d nid "LPX64" %s\n",
132                         gw_nalid, gw_nid, alive ? "up" : "down");
133                 return;
134         }
135
136         u->kpru_nal_id     = gw_nalid;
137         u->kpru_nid        = gw_nid;
138         u->kpru_alive      = alive;
139         u->kpru_when       = when;
140
141         prepare_work (&u->kpru_tq, kpr_do_upcall, u);
142         schedule_work (&u->kpru_tq);
143 }
144
145 int
146 kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
147                int alive, time_t when)
148 {
149         unsigned long        flags;
150         int                  rc = -ENOENT;
151         kpr_nal_entry_t     *ne = NULL;
152         kpr_gateway_entry_t *ge = NULL;
153         struct timeval       now;
154         struct list_head    *e;
155         struct list_head    *n;
156
157         CDEBUG (D_ERROR, "%s notifying [%d] "LPX64": %s\n", 
158                 byNal ? "NAL" : "userspace", 
159                 gateway_nalid, gateway_nid, alive ? "up" : "down");
160
161         /* can't do predictions... */
162         do_gettimeofday (&now);
163         if (when > now.tv_sec) {
164                 CERROR ("Ignoring prediction from %s of [%d] "LPX64" %s "
165                         "%ld seconds in the future\n", 
166                 byNal ? "NAL" : "userspace", 
167                 gateway_nalid, gateway_nid, alive ? "up" : "down",
168                         when - now.tv_sec);
169                 return (EINVAL);
170         }
171
172         LASSERT (when <= now.tv_sec);
173
174         /* Serialise with lookups (i.e. write lock) */
175         write_lock_irqsave(&kpr_rwlock, flags);
176
177         list_for_each_safe (e, n, &kpr_gateways) {
178
179                 ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
180                 if ((gateway_nalid != 0 &&
181                      ge->kpge_nalid != gateway_nalid) ||
182                     ge->kpge_nid != gateway_nid)
183                         continue;
184
185                 rc = 0;
186                 break;
187         }
188
189         if (rc != 0) {
190                 /* gateway not found */
191                 write_unlock_irqrestore(&kpr_rwlock, flags);
192                 CERROR ("Gateway not found\n");
193                 return (rc);
194         }
195         
196         if (when < ge->kpge_timestamp) {
197                 /* out of date information */
198                 write_unlock_irqrestore (&kpr_rwlock, flags);
199                 CERROR ("Out of date\n");
200                 return (0);
201         }
202
203         /* update timestamp */
204         ge->kpge_timestamp = when;
205
206         if ((!ge->kpge_alive) == (!alive)) {
207                 /* new date for old news */
208                 write_unlock_irqrestore (&kpr_rwlock, flags);
209                 CERROR ("Old news\n");
210                 return (0);
211         }
212
213         ge->kpge_alive = alive;
214         CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
215
216         if (alive) {
217                 /* Reset all gateway weights so the newly-enabled gateway
218                  * doesn't have to play catch-up */
219                 list_for_each_safe (e, n, &kpr_gateways) {
220                         kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
221                                                              kpge_list);
222                         atomic_set (&ge->kpge_weight, 0);
223                 }
224         }
225
226         if (!byNal) {
227                 /* userland notified me: notify NAL? */
228                 ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
229                 if (ne != NULL) {
230                         if (ne->kpne_shutdown ||
231                             ne->kpne_interface.kprni_notify == NULL) {
232                                 /* no need to notify */
233                                 ne = NULL;
234                         } else {
235                                 /* take a ref on this NAL until notifying
236                                  * it has completed... */
237                                 atomic_inc (&ne->kpne_refcount);
238                         }
239                 }
240         }
241
242         write_unlock_irqrestore(&kpr_rwlock, flags);
243
244         if (ne != NULL) {
245                 ne->kpne_interface.kprni_notify (ne->kpne_interface.kprni_arg,
246                                                  gateway_nid, alive);
247                 /* 'ne' can disappear now... */
248                 atomic_dec (&ne->kpne_refcount);
249         }
250         
251         if (byNal) {
252                 /* It wasn't userland that notified me... */
253                 CERROR ("Doing upcall\n");
254                 kpr_upcall (gateway_nalid, gateway_nid, alive, when);
255         } else {
256                 CERROR (" NOT Doing upcall\n");
257         }
258         
259         return (0);
260 }
261
262 void
263 kpr_nal_notify (void *arg, ptl_nid_t peer, int alive, time_t when)
264 {
265         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
266         
267         kpr_do_notify (1, ne->kpne_interface.kprni_nalid, peer, alive, when);
268 }
269
270 void
271 kpr_shutdown_nal (void *arg)
272 {
273         unsigned long    flags;
274         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
275
276         CDEBUG (D_NET, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
277
278         LASSERT (!ne->kpne_shutdown);
279         LASSERT (!in_interrupt());
280
281         write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
282         ne->kpne_shutdown = 1;
283         write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
284
285         while (atomic_read (&ne->kpne_refcount) != 0)
286         {
287                 CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
288                         ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
289
290                 set_current_state (TASK_UNINTERRUPTIBLE);
291                 schedule_timeout (HZ);
292         }
293 }
294
295 void
296 kpr_deregister_nal (void *arg)
297 {
298         unsigned long     flags;
299         kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
300
301         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
302
303         LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
304         LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
305         LASSERT (!in_interrupt());
306
307         write_lock_irqsave (&kpr_rwlock, flags);
308
309         list_del (&ne->kpne_list);
310
311         write_unlock_irqrestore (&kpr_rwlock, flags);
312
313         PORTAL_FREE (ne, sizeof (*ne));
314         PORTAL_MODULE_UNUSE;
315 }
316
317 int
318 kpr_ge_isbetter (kpr_gateway_entry_t *ge1, kpr_gateway_entry_t *ge2)
319 {
320         const int significant_bits = 0x00ffffff;
321         /* We use atomic_t to record/compare route weights for
322          * load-balancing.  Here we limit ourselves to only using
323          * 'significant_bits' when we do an 'after' comparison */
324
325         int    diff = (atomic_read (&ge1->kpge_weight) -
326                        atomic_read (&ge2->kpge_weight)) & significant_bits;
327         int    rc = (diff > (significant_bits >> 1));
328
329         CDEBUG(D_NET, "[%p]"LPX64"=%d %s [%p]"LPX64"=%d\n",
330                ge1, ge1->kpge_nid, atomic_read (&ge1->kpge_weight),
331                rc ? ">" : "<",
332                ge2, ge2->kpge_nid, atomic_read (&ge2->kpge_weight));
333
334         return (rc);
335 }
336
337 void
338 kpr_update_weight (kpr_gateway_entry_t *ge, int nob)
339 {
340         int weight = 1 + (nob + sizeof (ptl_hdr_t)/2)/sizeof (ptl_hdr_t);
341
342         /* We've chosen this route entry (i.e. gateway) to forward payload
343          * of length 'nob'; update the route's weight to make it less
344          * favoured.  Note that the weight is 1 plus the payload size
345          * rounded and scaled to the portals header size, so we get better
346          * use of the significant bits in kpge_weight. */
347
348         CDEBUG(D_NET, "gateway [%p]"LPX64" += %d\n", ge,
349                ge->kpge_nid, weight);
350         
351         atomic_add (weight, &ge->kpge_weight);
352 }
353
354 int
355 kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
356                    ptl_nid_t *gateway_nidp)
357 {
358         kpr_nal_entry_t     *ne = (kpr_nal_entry_t *)arg;
359         struct list_head    *e;
360         kpr_route_entry_t   *re;
361         kpr_gateway_entry_t *ge = NULL;
362         int                  rc = -ENOENT;
363
364         /* Caller wants to know if 'target_nid' can be reached via a gateway
365          * ON HER OWN NETWORK */
366
367         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
368                 ne->kpne_interface.kprni_nalid);
369
370         if (ne->kpne_shutdown)          /* caller is shutting down */
371                 return (-ENOENT);
372
373         read_lock (&kpr_rwlock);
374
375         /* Search routes for one that has a gateway to target_nid on the callers network */
376
377         list_for_each (e, &kpr_routes) {
378                 re = list_entry (e, kpr_route_entry_t, kpre_list);
379
380                 if (re->kpre_lo_nid > target_nid ||
381                     re->kpre_hi_nid < target_nid)
382                         continue;
383
384                 /* found table entry */
385
386                 if (re->kpre_gateway->kpge_nalid != ne->kpne_interface.kprni_nalid ||
387                     !re->kpre_gateway->kpge_alive) {
388                         /* different NAL or gateway down */
389                         rc = -EHOSTUNREACH;
390                         continue;
391                 }
392                 
393                 if (ge == NULL ||
394                     kpr_ge_isbetter (re->kpre_gateway, ge))
395                     ge = re->kpre_gateway;
396         }
397
398         if (ge != NULL) {
399                 kpr_update_weight (ge, nob);
400                 *gateway_nidp = ge->kpge_nid;
401                 rc = 0;
402         }
403         
404         read_unlock (&kpr_rwlock);
405
406         /* NB can't deref 're' now; it might have been removed! */
407
408         CDEBUG (D_NET, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
409                 target_nid, ne->kpne_interface.kprni_nalid, rc,
410                 (rc == 0) ? *gateway_nidp : (ptl_nid_t)0);
411         return (rc);
412 }
413
414 kpr_nal_entry_t *
415 kpr_find_nal_entry_locked (int nal_id)
416 {
417         struct list_head    *e;
418         
419         /* Called with kpr_rwlock held */
420
421         list_for_each (e, &kpr_nals) {
422                 kpr_nal_entry_t *ne = list_entry (e, kpr_nal_entry_t, kpne_list);
423
424                 if (nal_id != ne->kpne_interface.kprni_nalid) /* no match */
425                         continue;
426
427                 return (ne);
428         }
429         
430         return (NULL);
431 }
432
433 void
434 kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
435 {
436         kpr_nal_entry_t     *src_ne = (kpr_nal_entry_t *)arg;
437         ptl_nid_t            target_nid = fwd->kprfd_target_nid;
438         int                  nob = fwd->kprfd_nob;
439         kpr_gateway_entry_t *ge = NULL;
440         kpr_nal_entry_t     *dst_ne = NULL;
441         struct list_head    *e;
442         kpr_route_entry_t   *re;
443         kpr_nal_entry_t     *tmp_ne;
444
445         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
446                 target_nid, src_ne->kpne_interface.kprni_nalid);
447
448         LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
449         LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov));
450         
451         atomic_inc (&kpr_queue_depth);
452         atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
453
454         kpr_fwd_packets++;                   /* (loose) stats accounting */
455         kpr_fwd_bytes += nob;
456
457         if (src_ne->kpne_shutdown)           /* caller is shutting down */
458                 goto out;
459
460         fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
461
462         read_lock (&kpr_rwlock);
463
464         /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
465
466         list_for_each (e, &kpr_routes) {
467                 re = list_entry (e, kpr_route_entry_t, kpre_list);
468
469                 if (re->kpre_lo_nid > target_nid || /* no match */
470                     re->kpre_hi_nid < target_nid)
471                         continue;
472
473                 if (re->kpre_gateway->kpge_nalid == src_ne->kpne_interface.kprni_nalid)
474                         continue;               /* don't route to same NAL */
475
476                 if (!re->kpre_gateway->kpge_alive)
477                         continue;               /* gateway is dead */
478                 
479                 tmp_ne = kpr_find_nal_entry_locked (re->kpre_gateway->kpge_nalid);
480
481                 if (tmp_ne == NULL ||
482                     tmp_ne->kpne_shutdown) {
483                         /* NAL must be registered and not shutting down */
484                         continue;
485                 }
486
487                 if (ge == NULL ||
488                     kpr_ge_isbetter (re->kpre_gateway, ge)) {
489                         ge = re->kpre_gateway;
490                         dst_ne = tmp_ne;
491                 }
492         }
493         
494         if (ge != NULL) {
495                 LASSERT (dst_ne != NULL);
496                 
497                 kpr_update_weight (ge, nob);
498
499                 fwd->kprfd_gateway_nid = ge->kpge_nid;
500                 atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
501
502                 read_unlock (&kpr_rwlock);
503
504                 CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d: "
505                         "to "LPX64" on NAL %d\n", 
506                         fwd, target_nid, src_ne->kpne_interface.kprni_nalid,
507                         fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
508
509                 dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
510                 return;
511         }
512
513         read_unlock (&kpr_rwlock);
514  out:
515         kpr_fwd_errors++;
516
517         CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
518                 target_nid, src_ne->kpne_interface.kprni_nalid);
519
520         /* Can't find anywhere to forward to */
521         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
522
523         atomic_dec (&kpr_queue_depth);
524         atomic_dec (&src_ne->kpne_refcount);
525 }
526
527 void
528 kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
529 {
530         kpr_nal_entry_t *dst_ne = (kpr_nal_entry_t *)arg;
531         kpr_nal_entry_t *src_ne = (kpr_nal_entry_t *)fwd->kprfd_router_arg;
532
533         CDEBUG (D_NET, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
534                 src_ne->kpne_interface.kprni_nalid, dst_ne->kpne_interface.kprni_nalid, error);
535
536         atomic_dec (&dst_ne->kpne_refcount);    /* CAVEAT EMPTOR dst_ne can disappear now!!! */
537
538         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, error);
539
540         CDEBUG (D_NET, "complete(2) [%p] from NAL %d: %d\n", fwd,
541                 src_ne->kpne_interface.kprni_nalid, error);
542
543         atomic_dec (&kpr_queue_depth);
544         atomic_dec (&src_ne->kpne_refcount);    /* CAVEAT EMPTOR src_ne can disappear now!!! */
545 }
546
547 int
548 kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, 
549                ptl_nid_t lo_nid, ptl_nid_t hi_nid)
550 {
551         unsigned long        flags;
552         struct list_head    *e;
553         kpr_route_entry_t   *re;
554         kpr_gateway_entry_t *ge;
555         int                  dup = 0;
556
557         CDEBUG(D_NET, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
558                gateway_nalid, gateway_nid, lo_nid, hi_nid);
559
560         if (gateway_nalid == PTL_NID_ANY ||
561             lo_nid == PTL_NID_ANY ||
562             hi_nid == PTL_NID_ANY ||
563             lo_nid > hi_nid)
564                 return (-EINVAL);
565
566         PORTAL_ALLOC (ge, sizeof (*ge));
567         if (ge == NULL)
568                 return (-ENOMEM);
569
570         ge->kpge_nalid = gateway_nalid;
571         ge->kpge_nid   = gateway_nid;
572         ge->kpge_alive = 1;
573         ge->kpge_timestamp = 0;
574         ge->kpge_refcount = 0;
575         atomic_set (&ge->kpge_weight, 0);
576
577         PORTAL_ALLOC (re, sizeof (*re));
578         if (re == NULL)
579                 return (-ENOMEM);
580
581         re->kpre_lo_nid = lo_nid;
582         re->kpre_hi_nid = hi_nid;
583
584         LASSERT(!in_interrupt());
585         write_lock_irqsave (&kpr_rwlock, flags);
586
587         list_for_each (e, &kpr_gateways) {
588                 kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
589                                                       kpge_list);
590                 
591                 if (ge2->kpge_nalid == gateway_nalid &&
592                     ge2->kpge_nid == gateway_nid) {
593                         PORTAL_FREE (ge, sizeof (*ge));
594                         ge = ge2;
595                         dup = 1;
596                         break;
597                 }
598         }
599
600         if (!dup) {
601                 /* Adding a new gateway... */
602  
603                 list_add (&ge->kpge_list, &kpr_gateways);
604
605                 /* ...zero all gateway weights so this one doesn't have to
606                  * play catch-up */
607
608                 list_for_each (e, &kpr_gateways) {
609                         kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
610                                                               kpge_list);
611                         atomic_set (&ge2->kpge_weight, 0);
612                 }
613                 
614         }
615
616         re->kpre_gateway = ge;
617         ge->kpge_refcount++;
618         list_add (&re->kpre_list, &kpr_routes);
619
620         write_unlock_irqrestore (&kpr_rwlock, flags);
621         return (0);
622 }
623
624 int
625 kpr_sys_notify (int gateway_nalid, ptl_nid_t gateway_nid,
626             int alive, time_t when)
627 {
628         return (kpr_do_notify (0, gateway_nalid, gateway_nid, alive, when));
629 }
630
631 int
632 kpr_del_route (int gw_nalid, ptl_nid_t gw_nid,
633                ptl_nid_t lo, ptl_nid_t hi)
634 {
635         int                specific = (lo != PTL_NID_ANY);
636         unsigned long      flags;
637         int                rc = -ENOENT;
638         struct list_head  *e;
639         struct list_head  *n;
640
641         CDEBUG(D_NET, "Del route [%d] "LPX64" : "LPX64" - "LPX64"\n", 
642                gw_nalid, gw_nid, lo, hi);
643
644         LASSERT(!in_interrupt());
645
646         /* NB Caller may specify either all routes via the given gateway
647          * (lo/hi == PTL_NID_ANY) or a specific route entry (lo/hi are
648          * actual NIDs) */
649         
650         if (specific ? (hi == PTL_NID_ANY || hi < lo) : (hi != PTL_NID_ANY))
651                 return (-EINVAL);
652
653         write_lock_irqsave(&kpr_rwlock, flags);
654
655         list_for_each_safe (e, n, &kpr_routes) {
656                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
657                                                    kpre_list);
658                 kpr_gateway_entry_t *ge = re->kpre_gateway;
659                 
660                 if (ge->kpge_nalid != gw_nalid ||
661                     ge->kpge_nid != gw_nid ||
662                     (specific && 
663                      (lo != re->kpre_lo_nid || hi != re->kpre_hi_nid)))
664                         continue;
665
666                 rc = 0;
667
668                 if (--ge->kpge_refcount == 0) {
669                         list_del (&ge->kpge_list);
670                         PORTAL_FREE (ge, sizeof (*ge));
671                 }
672
673                 list_del (&re->kpre_list);
674                 PORTAL_FREE(re, sizeof (*re));
675
676                 if (specific)
677                         break;
678         }
679
680         write_unlock_irqrestore(&kpr_rwlock, flags);
681         return (rc);
682 }
683
684 int
685 kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
686                ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive)
687 {
688         struct list_head  *e;
689
690         read_lock(&kpr_rwlock);
691
692         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
693                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
694                                                      kpre_list);
695                 kpr_gateway_entry_t *ge = re->kpre_gateway;
696                 
697                 if (idx-- == 0) {
698                         *gateway_nalid = ge->kpge_nalid;
699                         *gateway_nid = ge->kpge_nid;
700                         *alive = ge->kpge_alive;
701                         *lo_nid = re->kpre_lo_nid;
702                         *hi_nid = re->kpre_hi_nid;
703
704                         read_unlock(&kpr_rwlock);
705                         return (0);
706                 }
707         }
708
709         read_unlock (&kpr_rwlock);
710         return (-ENOENT);
711 }
712
713 static void /*__exit*/
714 kpr_finalise (void)
715 {
716         LASSERT (list_empty (&kpr_nals));
717
718         while (!list_empty (&kpr_routes)) {
719                 kpr_route_entry_t *re = list_entry(kpr_routes.next,
720                                                    kpr_route_entry_t,
721                                                    kpre_list);
722
723                 list_del(&re->kpre_list);
724                 PORTAL_FREE(re, sizeof (*re));
725         }
726
727         kpr_proc_fini();
728
729         PORTAL_SYMBOL_UNREGISTER(kpr_router_interface);
730         PORTAL_SYMBOL_UNREGISTER(kpr_control_interface);
731
732         CDEBUG(D_MALLOC, "kpr_finalise: kmem back to %d\n",
733                atomic_read(&portal_kmemory));
734 }
735
736 static int __init
737 kpr_initialise (void)
738 {
739         CDEBUG(D_MALLOC, "kpr_initialise: kmem %d\n",
740                atomic_read(&portal_kmemory));
741
742         kpr_proc_init();
743
744         PORTAL_SYMBOL_REGISTER(kpr_router_interface);
745         PORTAL_SYMBOL_REGISTER(kpr_control_interface);
746         return (0);
747 }
748
749 MODULE_AUTHOR("Eric Barton");
750 MODULE_DESCRIPTION("Kernel Portals Router v0.01");
751 MODULE_LICENSE("GPL");
752
753 module_init (kpr_initialise);
754 module_exit (kpr_finalise);
755
756 EXPORT_SYMBOL (kpr_control_interface);
757 EXPORT_SYMBOL (kpr_router_interface);