Whamcloud - gitweb
* Fix for 2895
[fs/lustre-release.git] / lnet / router / router.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "router.h"
25
26 LIST_HEAD(kpr_routes);
27 LIST_HEAD(kpr_gateways);
28 LIST_HEAD(kpr_nals);
29
30 unsigned long long kpr_fwd_bytes;
31 unsigned long      kpr_fwd_packets;
32 unsigned long      kpr_fwd_errors;
33 atomic_t           kpr_queue_depth;
34
35 /* Mostly the tables are read-only (thread and interrupt context)
36  *
37  * Once in a blue moon we register/deregister NALs and add/remove routing
38  * entries (thread context only)... */
39 rwlock_t         kpr_rwlock = RW_LOCK_UNLOCKED;
40
41 kpr_router_interface_t kpr_router_interface = {
42         kprri_register:         kpr_register_nal,
43         kprri_lookup:           kpr_lookup_target,
44         kprri_fwd_start:        kpr_forward_packet,
45         kprri_fwd_done:         kpr_complete_packet,
46         kprri_notify:           kpr_nal_notify,
47         kprri_shutdown:         kpr_shutdown_nal,
48         kprri_deregister:       kpr_deregister_nal,
49 };
50
51 kpr_control_interface_t kpr_control_interface = {
52         kprci_add_route:        kpr_add_route,
53         kprci_del_route:        kpr_del_route,
54         kprci_get_route:        kpr_get_route,
55         kprci_notify:           kpr_sys_notify,
56 };
57
58 int
59 kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
60 {
61         unsigned long      flags;
62         struct list_head  *e;
63         kpr_nal_entry_t   *ne;
64
65         CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
66
67         PORTAL_ALLOC (ne, sizeof (*ne));
68         if (ne == NULL)
69                 return (-ENOMEM);
70
71         memset (ne, 0, sizeof (*ne));
72         memcpy ((void *)&ne->kpne_interface, (void *)nalif, sizeof (*nalif));
73
74         LASSERT (!in_interrupt());
75         write_lock_irqsave (&kpr_rwlock, flags);
76
77         for (e = kpr_nals.next; e != &kpr_nals; e = e->next)
78         {
79                 kpr_nal_entry_t *ne2 = list_entry (e, kpr_nal_entry_t, kpne_list);
80
81                 if (ne2->kpne_interface.kprni_nalid == ne->kpne_interface.kprni_nalid)
82                 {
83                         write_unlock_irqrestore (&kpr_rwlock, flags);
84
85                         CERROR ("Attempt to register same NAL %d twice\n", ne->kpne_interface.kprni_nalid);
86
87                         PORTAL_FREE (ne, sizeof (*ne));
88                         return (-EEXIST);
89                 }
90         }
91
92         list_add (&ne->kpne_list, &kpr_nals);
93
94         write_unlock_irqrestore (&kpr_rwlock, flags);
95
96         *argp = ne;
97         PORTAL_MODULE_USE;
98         return (0);
99 }
100
101 void
102 kpr_do_upcall (void *arg)
103 {
104         kpr_upcall_t *u = (kpr_upcall_t *)arg;
105         char          nalstr[10];
106         char          nidstr[36];
107         char          whenstr[36];
108         char         *argv[] = {
109                 NULL,
110                 "ROUTER_NOTIFY",
111                 nalstr,
112                 nidstr,
113                 u->kpru_alive ? "up" : "down",
114                 whenstr,
115                 NULL};
116         
117         snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
118         snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
119         snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
120
121         portals_run_upcall (argv);
122
123         kfree (u);
124 }
125
126 void
127 kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
128 {
129         char str[PTL_NALFMT_SIZE];
130         
131         /* May be in arbitrary context */
132         kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
133
134         if (u == NULL) {
135                 CERROR ("Upcall out of memory: nal %d nid "LPX64" (%s) %s\n",
136                         gw_nalid, gw_nid,
137                         portals_nid2str(gw_nalid, gw_nid, str),
138                         alive ? "up" : "down");
139                 return;
140         }
141
142         u->kpru_nal_id     = gw_nalid;
143         u->kpru_nid        = gw_nid;
144         u->kpru_alive      = alive;
145         u->kpru_when       = when;
146
147         prepare_work (&u->kpru_tq, kpr_do_upcall, u);
148         schedule_work (&u->kpru_tq);
149 }
150
151 int
152 kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
153                int alive, time_t when)
154 {
155         unsigned long        flags;
156         int                  found;
157         kpr_nal_entry_t     *ne = NULL;
158         kpr_gateway_entry_t *ge = NULL;
159         struct timeval       now;
160         struct list_head    *e;
161         struct list_head    *n;
162         char                 str[PTL_NALFMT_SIZE];
163
164         CDEBUG (D_NET, "%s notifying [%d] "LPX64": %s\n", 
165                 byNal ? "NAL" : "userspace", 
166                 gateway_nalid, gateway_nid, alive ? "up" : "down");
167
168         /* can't do predictions... */
169         do_gettimeofday (&now);
170         if (when > now.tv_sec) {
171                 CWARN ("Ignoring prediction from %s of [%d] "LPX64" %s "
172                        "%ld seconds in the future\n", 
173                        byNal ? "NAL" : "userspace", 
174                        gateway_nalid, gateway_nid, 
175                        alive ? "up" : "down",
176                        when - now.tv_sec);
177                 return (EINVAL);
178         }
179
180         LASSERT (when <= now.tv_sec);
181
182         /* Serialise with lookups (i.e. write lock) */
183         write_lock_irqsave(&kpr_rwlock, flags);
184
185         found = 0;
186         list_for_each_safe (e, n, &kpr_gateways) {
187
188                 ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
189                 if ((gateway_nalid != 0 &&
190                      ge->kpge_nalid != gateway_nalid) ||
191                     ge->kpge_nid != gateway_nid)
192                         continue;
193
194                 found = 1;
195                 break;
196         }
197
198         if (!found) {
199                 /* gateway not found */
200                 write_unlock_irqrestore(&kpr_rwlock, flags);
201                 CDEBUG (D_NET, "Gateway not found\n");
202                 return (0);
203         }
204         
205         if (when < ge->kpge_timestamp) {
206                 /* out of date information */
207                 write_unlock_irqrestore (&kpr_rwlock, flags);
208                 CDEBUG (D_NET, "Out of date\n");
209                 return (0);
210         }
211
212         /* update timestamp */
213         ge->kpge_timestamp = when;
214
215         if ((!ge->kpge_alive) == (!alive)) {
216                 /* new date for old news */
217                 write_unlock_irqrestore (&kpr_rwlock, flags);
218                 CDEBUG (D_NET, "Old news\n");
219                 return (0);
220         }
221
222         ge->kpge_alive = alive;
223         CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
224
225         if (alive) {
226                 /* Reset all gateway weights so the newly-enabled gateway
227                  * doesn't have to play catch-up */
228                 list_for_each_safe (e, n, &kpr_gateways) {
229                         kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
230                                                              kpge_list);
231                         atomic_set (&ge->kpge_weight, 0);
232                 }
233         }
234
235         found = 0;
236         if (!byNal) {
237                 /* userland notified me: notify NAL? */
238                 ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
239                 if (ne != NULL) {
240                         if (!ne->kpne_shutdown &&
241                             ne->kpne_interface.kprni_notify != NULL) {
242                                 /* take a ref on this NAL until notifying
243                                  * it has completed... */
244                                 atomic_inc (&ne->kpne_refcount);
245                                 found = 1;
246                         }
247                 }
248         }
249
250         write_unlock_irqrestore(&kpr_rwlock, flags);
251
252         if (found) {
253                 ne->kpne_interface.kprni_notify (ne->kpne_interface.kprni_arg,
254                                                  gateway_nid, alive);
255                 /* 'ne' can disappear now... */
256                 atomic_dec (&ne->kpne_refcount);
257         }
258         
259         if (byNal) {
260                 /* It wasn't userland that notified me... */
261                 CWARN ("Upcall: NAL %d NID "LPX64" (%s) is %s\n",
262                        gateway_nalid, gateway_nid,
263                        portals_nid2str(gateway_nalid, gateway_nid, str),
264                        alive ? "alive" : "dead");
265                 kpr_upcall (gateway_nalid, gateway_nid, alive, when);
266         } else {
267                 CDEBUG (D_NET, " NOT Doing upcall\n");
268         }
269         
270         return (0);
271 }
272
273 void
274 kpr_nal_notify (void *arg, ptl_nid_t peer, int alive, time_t when)
275 {
276         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
277         
278         kpr_do_notify (1, ne->kpne_interface.kprni_nalid, peer, alive, when);
279 }
280
281 void
282 kpr_shutdown_nal (void *arg)
283 {
284         unsigned long    flags;
285         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
286
287         CDEBUG (D_NET, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
288
289         LASSERT (!ne->kpne_shutdown);
290         LASSERT (!in_interrupt());
291
292         write_lock_irqsave (&kpr_rwlock, flags);
293         ne->kpne_shutdown = 1;
294         write_unlock_irqrestore (&kpr_rwlock, flags);
295 }
296
297 void
298 kpr_deregister_nal (void *arg)
299 {
300         unsigned long     flags;
301         kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
302
303         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
304
305         LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
306         LASSERT (!in_interrupt());
307
308         write_lock_irqsave (&kpr_rwlock, flags);
309         list_del (&ne->kpne_list);
310         write_unlock_irqrestore (&kpr_rwlock, flags);
311
312         /* Wait until all outstanding messages/notifications have completed */
313         while (atomic_read (&ne->kpne_refcount) != 0)
314         {
315                 CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
316                         ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
317
318                 set_current_state (TASK_UNINTERRUPTIBLE);
319                 schedule_timeout (HZ);
320         }
321
322         PORTAL_FREE (ne, sizeof (*ne));
323         PORTAL_MODULE_UNUSE;
324 }
325
326 int
327 kpr_ge_isbetter (kpr_gateway_entry_t *ge1, kpr_gateway_entry_t *ge2)
328 {
329         const int significant_bits = 0x00ffffff;
330         /* We use atomic_t to record/compare route weights for
331          * load-balancing.  Here we limit ourselves to only using
332          * 'significant_bits' when we do an 'after' comparison */
333
334         int    diff = (atomic_read (&ge1->kpge_weight) -
335                        atomic_read (&ge2->kpge_weight)) & significant_bits;
336         int    rc = (diff > (significant_bits >> 1));
337
338         CDEBUG(D_NET, "[%p]"LPX64"=%d %s [%p]"LPX64"=%d\n",
339                ge1, ge1->kpge_nid, atomic_read (&ge1->kpge_weight),
340                rc ? ">" : "<",
341                ge2, ge2->kpge_nid, atomic_read (&ge2->kpge_weight));
342
343         return (rc);
344 }
345
346 void
347 kpr_update_weight (kpr_gateway_entry_t *ge, int nob)
348 {
349         int weight = 1 + (nob + sizeof (ptl_hdr_t)/2)/sizeof (ptl_hdr_t);
350
351         /* We've chosen this route entry (i.e. gateway) to forward payload
352          * of length 'nob'; update the route's weight to make it less
353          * favoured.  Note that the weight is 1 plus the payload size
354          * rounded and scaled to the portals header size, so we get better
355          * use of the significant bits in kpge_weight. */
356
357         CDEBUG(D_NET, "gateway [%p]"LPX64" += %d\n", ge,
358                ge->kpge_nid, weight);
359         
360         atomic_add (weight, &ge->kpge_weight);
361 }
362
363 int
364 kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
365                    ptl_nid_t *gateway_nidp)
366 {
367         kpr_nal_entry_t     *ne = (kpr_nal_entry_t *)arg;
368         struct list_head    *e;
369         kpr_route_entry_t   *re;
370         kpr_gateway_entry_t *ge = NULL;
371         int                  rc = -ENOENT;
372
373         /* Caller wants to know if 'target_nid' can be reached via a gateway
374          * ON HER OWN NETWORK */
375
376         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
377                 ne->kpne_interface.kprni_nalid);
378         LASSERT (!in_interrupt());
379
380         read_lock (&kpr_rwlock);
381
382         if (ne->kpne_shutdown) {        /* caller is shutting down */
383                 read_unlock (&kpr_rwlock);
384                 return (-ENOENT);
385         }
386
387         /* Search routes for one that has a gateway to target_nid on the callers network */
388
389         list_for_each (e, &kpr_routes) {
390                 re = list_entry (e, kpr_route_entry_t, kpre_list);
391
392                 if (re->kpre_lo_nid > target_nid ||
393                     re->kpre_hi_nid < target_nid)
394                         continue;
395
396                 /* found table entry */
397
398                 if (re->kpre_gateway->kpge_nalid != ne->kpne_interface.kprni_nalid ||
399                     !re->kpre_gateway->kpge_alive) {
400                         /* different NAL or gateway down */
401                         rc = -EHOSTUNREACH;
402                         continue;
403                 }
404                 
405                 if (ge == NULL ||
406                     kpr_ge_isbetter (re->kpre_gateway, ge))
407                     ge = re->kpre_gateway;
408         }
409
410         if (ge != NULL) {
411                 kpr_update_weight (ge, nob);
412                 *gateway_nidp = ge->kpge_nid;
413                 rc = 0;
414         }
415         
416         read_unlock (&kpr_rwlock);
417
418         /* NB can't deref 're' now; it might have been removed! */
419
420         CDEBUG (D_NET, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
421                 target_nid, ne->kpne_interface.kprni_nalid, rc,
422                 (rc == 0) ? *gateway_nidp : (ptl_nid_t)0);
423         return (rc);
424 }
425
426 kpr_nal_entry_t *
427 kpr_find_nal_entry_locked (int nal_id)
428 {
429         struct list_head    *e;
430         
431         /* Called with kpr_rwlock held */
432
433         list_for_each (e, &kpr_nals) {
434                 kpr_nal_entry_t *ne = list_entry (e, kpr_nal_entry_t, kpne_list);
435
436                 if (nal_id != ne->kpne_interface.kprni_nalid) /* no match */
437                         continue;
438
439                 return (ne);
440         }
441         
442         return (NULL);
443 }
444
445 void
446 kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
447 {
448         kpr_nal_entry_t     *src_ne = (kpr_nal_entry_t *)arg;
449         ptl_nid_t            target_nid = fwd->kprfd_target_nid;
450         int                  nob = fwd->kprfd_nob;
451         kpr_gateway_entry_t *ge = NULL;
452         kpr_nal_entry_t     *dst_ne = NULL;
453         struct list_head    *e;
454         kpr_route_entry_t   *re;
455         kpr_nal_entry_t     *tmp_ne;
456         int                  rc;
457
458         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
459                 target_nid, src_ne->kpne_interface.kprni_nalid);
460
461         LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
462         LASSERT (!in_interrupt());
463
464         read_lock (&kpr_rwlock);
465
466         kpr_fwd_packets++;                   /* (loose) stats accounting */
467         kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
468
469         if (src_ne->kpne_shutdown) {         /* caller is shutting down */
470                 rc = -ESHUTDOWN;
471                 goto out;
472         }
473
474         fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
475
476         /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
477
478         list_for_each (e, &kpr_routes) {
479                 re = list_entry (e, kpr_route_entry_t, kpre_list);
480
481                 if (re->kpre_lo_nid > target_nid || /* no match */
482                     re->kpre_hi_nid < target_nid)
483                         continue;
484
485                 if (re->kpre_gateway->kpge_nalid == src_ne->kpne_interface.kprni_nalid)
486                         continue;               /* don't route to same NAL */
487
488                 if (!re->kpre_gateway->kpge_alive)
489                         continue;               /* gateway is dead */
490                 
491                 tmp_ne = kpr_find_nal_entry_locked (re->kpre_gateway->kpge_nalid);
492
493                 if (tmp_ne == NULL ||
494                     tmp_ne->kpne_shutdown) {
495                         /* NAL must be registered and not shutting down */
496                         continue;
497                 }
498
499                 if (ge == NULL ||
500                     kpr_ge_isbetter (re->kpre_gateway, ge)) {
501                         ge = re->kpre_gateway;
502                         dst_ne = tmp_ne;
503                 }
504         }
505         
506         if (ge != NULL) {
507                 LASSERT (dst_ne != NULL);
508                 
509                 kpr_update_weight (ge, nob);
510
511                 fwd->kprfd_gateway_nid = ge->kpge_nid;
512                 atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */
513                 atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */
514                 atomic_inc (&kpr_queue_depth);
515
516                 read_unlock (&kpr_rwlock);
517
518                 CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d: "
519                         "to "LPX64" on NAL %d\n", 
520                         fwd, target_nid, src_ne->kpne_interface.kprni_nalid,
521                         fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
522
523                 dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
524                 return;
525         }
526
527         rc = -EHOSTUNREACH;
528  out:
529         kpr_fwd_errors++;
530
531         CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n", 
532                 fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc);
533
534         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc);
535
536         read_unlock (&kpr_rwlock);
537 }
538
539 void
540 kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
541 {
542         kpr_nal_entry_t *dst_ne = (kpr_nal_entry_t *)arg;
543         kpr_nal_entry_t *src_ne = (kpr_nal_entry_t *)fwd->kprfd_router_arg;
544
545         CDEBUG (D_NET, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
546                 src_ne->kpne_interface.kprni_nalid, dst_ne->kpne_interface.kprni_nalid, error);
547
548         atomic_dec (&dst_ne->kpne_refcount);    /* CAVEAT EMPTOR dst_ne can disappear now!!! */
549
550         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, error);
551
552         CDEBUG (D_NET, "complete(2) [%p] from NAL %d: %d\n", fwd,
553                 src_ne->kpne_interface.kprni_nalid, error);
554
555         atomic_dec (&kpr_queue_depth);
556         atomic_dec (&src_ne->kpne_refcount);    /* CAVEAT EMPTOR src_ne can disappear now!!! */
557 }
558
559 int
560 kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, 
561                ptl_nid_t lo_nid, ptl_nid_t hi_nid)
562 {
563         unsigned long        flags;
564         struct list_head    *e;
565         kpr_route_entry_t   *re;
566         kpr_gateway_entry_t *ge;
567         int                  dup = 0;
568
569         CDEBUG(D_NET, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
570                gateway_nalid, gateway_nid, lo_nid, hi_nid);
571
572         if (gateway_nalid == PTL_NID_ANY ||
573             lo_nid == PTL_NID_ANY ||
574             hi_nid == PTL_NID_ANY ||
575             lo_nid > hi_nid)
576                 return (-EINVAL);
577
578         PORTAL_ALLOC (ge, sizeof (*ge));
579         if (ge == NULL)
580                 return (-ENOMEM);
581
582         ge->kpge_nalid = gateway_nalid;
583         ge->kpge_nid   = gateway_nid;
584         ge->kpge_alive = 1;
585         ge->kpge_timestamp = 0;
586         ge->kpge_refcount = 0;
587         atomic_set (&ge->kpge_weight, 0);
588
589         PORTAL_ALLOC (re, sizeof (*re));
590         if (re == NULL) {
591                 PORTAL_FREE (ge, sizeof (*ge));
592                 return (-ENOMEM);
593         }
594
595         re->kpre_lo_nid = lo_nid;
596         re->kpre_hi_nid = hi_nid;
597
598         LASSERT(!in_interrupt());
599         write_lock_irqsave (&kpr_rwlock, flags);
600
601         list_for_each (e, &kpr_gateways) {
602                 kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
603                                                       kpge_list);
604                 
605                 if (ge2->kpge_nalid == gateway_nalid &&
606                     ge2->kpge_nid == gateway_nid) {
607                         PORTAL_FREE (ge, sizeof (*ge));
608                         ge = ge2;
609                         dup = 1;
610                         break;
611                 }
612         }
613
614         if (!dup) {
615                 /* Adding a new gateway... */
616  
617                 list_add (&ge->kpge_list, &kpr_gateways);
618
619                 /* ...zero all gateway weights so this one doesn't have to
620                  * play catch-up */
621
622                 list_for_each (e, &kpr_gateways) {
623                         kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
624                                                               kpge_list);
625                         atomic_set (&ge2->kpge_weight, 0);
626                 }
627                 
628         }
629
630         re->kpre_gateway = ge;
631         ge->kpge_refcount++;
632         list_add (&re->kpre_list, &kpr_routes);
633
634         write_unlock_irqrestore (&kpr_rwlock, flags);
635         return (0);
636 }
637
638 int
639 kpr_sys_notify (int gateway_nalid, ptl_nid_t gateway_nid,
640             int alive, time_t when)
641 {
642         return (kpr_do_notify (0, gateway_nalid, gateway_nid, alive, when));
643 }
644
645 int
646 kpr_del_route (int gw_nalid, ptl_nid_t gw_nid,
647                ptl_nid_t lo, ptl_nid_t hi)
648 {
649         int                specific = (lo != PTL_NID_ANY);
650         unsigned long      flags;
651         int                rc = -ENOENT;
652         struct list_head  *e;
653         struct list_head  *n;
654
655         CDEBUG(D_NET, "Del route [%d] "LPX64" : "LPX64" - "LPX64"\n", 
656                gw_nalid, gw_nid, lo, hi);
657
658         LASSERT(!in_interrupt());
659
660         /* NB Caller may specify either all routes via the given gateway
661          * (lo/hi == PTL_NID_ANY) or a specific route entry (lo/hi are
662          * actual NIDs) */
663         
664         if (specific ? (hi == PTL_NID_ANY || hi < lo) : (hi != PTL_NID_ANY))
665                 return (-EINVAL);
666
667         write_lock_irqsave(&kpr_rwlock, flags);
668
669         list_for_each_safe (e, n, &kpr_routes) {
670                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
671                                                    kpre_list);
672                 kpr_gateway_entry_t *ge = re->kpre_gateway;
673                 
674                 if (ge->kpge_nalid != gw_nalid ||
675                     ge->kpge_nid != gw_nid ||
676                     (specific && 
677                      (lo != re->kpre_lo_nid || hi != re->kpre_hi_nid)))
678                         continue;
679
680                 rc = 0;
681
682                 if (--ge->kpge_refcount == 0) {
683                         list_del (&ge->kpge_list);
684                         PORTAL_FREE (ge, sizeof (*ge));
685                 }
686
687                 list_del (&re->kpre_list);
688                 PORTAL_FREE(re, sizeof (*re));
689
690                 if (specific)
691                         break;
692         }
693
694         write_unlock_irqrestore(&kpr_rwlock, flags);
695         return (rc);
696 }
697
698 int
699 kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
700                ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive)
701 {
702         struct list_head  *e;
703
704         LASSERT (!in_interrupt());
705         read_lock(&kpr_rwlock);
706
707         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
708                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
709                                                      kpre_list);
710                 kpr_gateway_entry_t *ge = re->kpre_gateway;
711                 
712                 if (idx-- == 0) {
713                         *gateway_nalid = ge->kpge_nalid;
714                         *gateway_nid = ge->kpge_nid;
715                         *alive = ge->kpge_alive;
716                         *lo_nid = re->kpre_lo_nid;
717                         *hi_nid = re->kpre_hi_nid;
718
719                         read_unlock(&kpr_rwlock);
720                         return (0);
721                 }
722         }
723
724         read_unlock (&kpr_rwlock);
725         return (-ENOENT);
726 }
727
728 static void /*__exit*/
729 kpr_finalise (void)
730 {
731         LASSERT (list_empty (&kpr_nals));
732
733         while (!list_empty (&kpr_routes)) {
734                 kpr_route_entry_t *re = list_entry(kpr_routes.next,
735                                                    kpr_route_entry_t,
736                                                    kpre_list);
737
738                 list_del(&re->kpre_list);
739                 PORTAL_FREE(re, sizeof (*re));
740         }
741
742         kpr_proc_fini();
743
744         PORTAL_SYMBOL_UNREGISTER(kpr_router_interface);
745         PORTAL_SYMBOL_UNREGISTER(kpr_control_interface);
746
747         CDEBUG(D_MALLOC, "kpr_finalise: kmem back to %d\n",
748                atomic_read(&portal_kmemory));
749 }
750
751 static int __init
752 kpr_initialise (void)
753 {
754         CDEBUG(D_MALLOC, "kpr_initialise: kmem %d\n",
755                atomic_read(&portal_kmemory));
756
757         kpr_proc_init();
758
759         PORTAL_SYMBOL_REGISTER(kpr_router_interface);
760         PORTAL_SYMBOL_REGISTER(kpr_control_interface);
761         return (0);
762 }
763
764 MODULE_AUTHOR("Eric Barton");
765 MODULE_DESCRIPTION("Kernel Portals Router v0.01");
766 MODULE_LICENSE("GPL");
767
768 module_init (kpr_initialise);
769 module_exit (kpr_finalise);
770
771 EXPORT_SYMBOL (kpr_control_interface);
772 EXPORT_SYMBOL (kpr_router_interface);