Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / portals / router / router.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Portals; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "router.h"
25
26 LIST_HEAD(kpr_routes);
27 LIST_HEAD(kpr_gateways);
28 LIST_HEAD(kpr_nals);
29
30 unsigned long long kpr_fwd_bytes;
31 unsigned long      kpr_fwd_packets;
32 unsigned long      kpr_fwd_errors;
33 atomic_t           kpr_queue_depth;
34
35 /* Mostly the tables are read-only (thread and interrupt context)
36  *
37  * Once in a blue moon we register/deregister NALs and add/remove routing
38  * entries (thread context only)... */
39 rwlock_t         kpr_rwlock = RW_LOCK_UNLOCKED;
40
41 kpr_router_interface_t kpr_router_interface = {
42         kprri_register:         kpr_register_nal,
43         kprri_lookup:           kpr_lookup_target,
44         kprri_fwd_start:        kpr_forward_packet,
45         kprri_fwd_done:         kpr_complete_packet,
46         kprri_notify:           kpr_nal_notify,
47         kprri_shutdown:         kpr_shutdown_nal,
48         kprri_deregister:       kpr_deregister_nal,
49 };
50
51 kpr_control_interface_t kpr_control_interface = {
52         kprci_add_route:        kpr_add_route,
53         kprci_del_route:        kpr_del_route,
54         kprci_get_route:        kpr_get_route,
55         kprci_notify:           kpr_sys_notify,
56 };
57
58 int
59 kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
60 {
61         unsigned long      flags;
62         struct list_head  *e;
63         kpr_nal_entry_t   *ne;
64
65         CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
66
67         PORTAL_ALLOC (ne, sizeof (*ne));
68         if (ne == NULL)
69                 return (-ENOMEM);
70
71         memset (ne, 0, sizeof (*ne));
72         memcpy ((void *)&ne->kpne_interface, (void *)nalif, sizeof (*nalif));
73
74         LASSERT (!in_interrupt());
75         write_lock_irqsave (&kpr_rwlock, flags);
76
77         for (e = kpr_nals.next; e != &kpr_nals; e = e->next)
78         {
79                 kpr_nal_entry_t *ne2 = list_entry (e, kpr_nal_entry_t, kpne_list);
80
81                 if (ne2->kpne_interface.kprni_nalid == ne->kpne_interface.kprni_nalid)
82                 {
83                         write_unlock_irqrestore (&kpr_rwlock, flags);
84
85                         CERROR ("Attempt to register same NAL %d twice\n", ne->kpne_interface.kprni_nalid);
86
87                         PORTAL_FREE (ne, sizeof (*ne));
88                         return (-EEXIST);
89                 }
90         }
91
92         list_add (&ne->kpne_list, &kpr_nals);
93
94         write_unlock_irqrestore (&kpr_rwlock, flags);
95
96         *argp = ne;
97         PORTAL_MODULE_USE;
98         return (0);
99 }
100
101 void
102 kpr_do_upcall (void *arg)
103 {
104         kpr_upcall_t *u = (kpr_upcall_t *)arg;
105         char          nalstr[10];
106         char          nidstr[36];
107         char          whenstr[36];
108         char         *argv[] = {
109                 NULL,
110                 "ROUTER_NOTIFY",
111                 nalstr,
112                 nidstr,
113                 u->kpru_alive ? "up" : "down",
114                 whenstr,
115                 NULL};
116         
117         snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
118         snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
119         snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
120
121         portals_run_upcall (argv);
122
123         kfree (u);
124 }
125
126 void
127 kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
128 {
129         char str[PTL_NALFMT_SIZE];
130         
131         /* May be in arbitrary context */
132         kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
133
134         if (u == NULL) {
135                 CERROR ("Upcall out of memory: nal %d nid "LPX64" (%s) %s\n",
136                         gw_nalid, gw_nid,
137                         portals_nid2str(gw_nalid, gw_nid, str),
138                         alive ? "up" : "down");
139                 return;
140         }
141
142         u->kpru_nal_id     = gw_nalid;
143         u->kpru_nid        = gw_nid;
144         u->kpru_alive      = alive;
145         u->kpru_when       = when;
146
147         prepare_work (&u->kpru_tq, kpr_do_upcall, u);
148         schedule_work (&u->kpru_tq);
149 }
150
151 int
152 kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
153                int alive, time_t when)
154 {
155         unsigned long        flags;
156         int                  found;
157         kpr_nal_entry_t     *ne = NULL;
158         kpr_gateway_entry_t *ge = NULL;
159         struct timeval       now;
160         struct list_head    *e;
161         struct list_head    *n;
162         char                 str[PTL_NALFMT_SIZE];
163
164         CDEBUG (D_NET, "%s notifying [%d] "LPX64": %s\n", 
165                 byNal ? "NAL" : "userspace", 
166                 gateway_nalid, gateway_nid, alive ? "up" : "down");
167
168         /* can't do predictions... */
169         do_gettimeofday (&now);
170         if (when > now.tv_sec) {
171                 CWARN ("Ignoring prediction from %s of [%d] "LPX64" %s "
172                        "%ld seconds in the future\n", 
173                        byNal ? "NAL" : "userspace", 
174                        gateway_nalid, gateway_nid, 
175                        alive ? "up" : "down",
176                        when - now.tv_sec);
177                 return (EINVAL);
178         }
179
180         LASSERT (when <= now.tv_sec);
181
182         /* Serialise with lookups (i.e. write lock) */
183         write_lock_irqsave(&kpr_rwlock, flags);
184
185         found = 0;
186         list_for_each_safe (e, n, &kpr_gateways) {
187
188                 ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
189                 if ((gateway_nalid != 0 &&
190                      ge->kpge_nalid != gateway_nalid) ||
191                     ge->kpge_nid != gateway_nid)
192                         continue;
193
194                 found = 1;
195                 break;
196         }
197
198         if (!found) {
199                 /* gateway not found */
200                 write_unlock_irqrestore(&kpr_rwlock, flags);
201                 CDEBUG (D_NET, "Gateway not found\n");
202                 return (0);
203         }
204         
205         if (when < ge->kpge_timestamp) {
206                 /* out of date information */
207                 write_unlock_irqrestore (&kpr_rwlock, flags);
208                 CDEBUG (D_NET, "Out of date\n");
209                 return (0);
210         }
211
212         /* update timestamp */
213         ge->kpge_timestamp = when;
214
215         if ((!ge->kpge_alive) == (!alive)) {
216                 /* new date for old news */
217                 write_unlock_irqrestore (&kpr_rwlock, flags);
218                 CDEBUG (D_NET, "Old news\n");
219                 return (0);
220         }
221
222         ge->kpge_alive = alive;
223         CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
224
225         if (alive) {
226                 /* Reset all gateway weights so the newly-enabled gateway
227                  * doesn't have to play catch-up */
228                 list_for_each_safe (e, n, &kpr_gateways) {
229                         kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
230                                                              kpge_list);
231                         atomic_set (&ge->kpge_weight, 0);
232                 }
233         }
234
235         found = 0;
236         if (!byNal) {
237                 /* userland notified me: notify NAL? */
238                 ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
239                 if (ne != NULL) {
240                         if (!ne->kpne_shutdown &&
241                             ne->kpne_interface.kprni_notify != NULL) {
242                                 /* take a ref on this NAL until notifying
243                                  * it has completed... */
244                                 atomic_inc (&ne->kpne_refcount);
245                                 found = 1;
246                         }
247                 }
248         }
249
250         write_unlock_irqrestore(&kpr_rwlock, flags);
251
252         if (found) {
253                 ne->kpne_interface.kprni_notify (ne->kpne_interface.kprni_arg,
254                                                  gateway_nid, alive);
255                 /* 'ne' can disappear now... */
256                 atomic_dec (&ne->kpne_refcount);
257         }
258         
259         if (byNal) {
260                 /* It wasn't userland that notified me... */
261                 CWARN ("Upcall: NAL %d NID "LPX64" (%s) is %s\n",
262                        gateway_nalid, gateway_nid,
263                        portals_nid2str(gateway_nalid, gateway_nid, str),
264                        alive ? "alive" : "dead");
265                 kpr_upcall (gateway_nalid, gateway_nid, alive, when);
266         } else {
267                 CDEBUG (D_NET, " NOT Doing upcall\n");
268         }
269         
270         return (0);
271 }
272
273 void
274 kpr_nal_notify (void *arg, ptl_nid_t peer, int alive, time_t when)
275 {
276         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
277         
278         kpr_do_notify (1, ne->kpne_interface.kprni_nalid, peer, alive, when);
279 }
280
281 void
282 kpr_shutdown_nal (void *arg)
283 {
284         unsigned long    flags;
285         kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
286
287         CDEBUG (D_NET, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
288
289         LASSERT (!ne->kpne_shutdown);
290         LASSERT (!in_interrupt());
291
292         write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
293         ne->kpne_shutdown = 1;
294         write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
295
296         while (atomic_read (&ne->kpne_refcount) != 0)
297         {
298                 CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
299                         ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
300
301                 set_current_state (TASK_UNINTERRUPTIBLE);
302                 schedule_timeout (HZ);
303         }
304 }
305
306 void
307 kpr_deregister_nal (void *arg)
308 {
309         unsigned long     flags;
310         kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
311
312         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
313
314         LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
315         LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
316         LASSERT (!in_interrupt());
317
318         write_lock_irqsave (&kpr_rwlock, flags);
319
320         list_del (&ne->kpne_list);
321
322         write_unlock_irqrestore (&kpr_rwlock, flags);
323
324         PORTAL_FREE (ne, sizeof (*ne));
325         PORTAL_MODULE_UNUSE;
326 }
327
328 int
329 kpr_ge_isbetter (kpr_gateway_entry_t *ge1, kpr_gateway_entry_t *ge2)
330 {
331         const int significant_bits = 0x00ffffff;
332         /* We use atomic_t to record/compare route weights for
333          * load-balancing.  Here we limit ourselves to only using
334          * 'significant_bits' when we do an 'after' comparison */
335
336         int    diff = (atomic_read (&ge1->kpge_weight) -
337                        atomic_read (&ge2->kpge_weight)) & significant_bits;
338         int    rc = (diff > (significant_bits >> 1));
339
340         CDEBUG(D_NET, "[%p]"LPX64"=%d %s [%p]"LPX64"=%d\n",
341                ge1, ge1->kpge_nid, atomic_read (&ge1->kpge_weight),
342                rc ? ">" : "<",
343                ge2, ge2->kpge_nid, atomic_read (&ge2->kpge_weight));
344
345         return (rc);
346 }
347
348 void
349 kpr_update_weight (kpr_gateway_entry_t *ge, int nob)
350 {
351         int weight = 1 + (nob + sizeof (ptl_hdr_t)/2)/sizeof (ptl_hdr_t);
352
353         /* We've chosen this route entry (i.e. gateway) to forward payload
354          * of length 'nob'; update the route's weight to make it less
355          * favoured.  Note that the weight is 1 plus the payload size
356          * rounded and scaled to the portals header size, so we get better
357          * use of the significant bits in kpge_weight. */
358
359         CDEBUG(D_NET, "gateway [%p]"LPX64" += %d\n", ge,
360                ge->kpge_nid, weight);
361         
362         atomic_add (weight, &ge->kpge_weight);
363 }
364
365 int
366 kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
367                    ptl_nid_t *gateway_nidp)
368 {
369         kpr_nal_entry_t     *ne = (kpr_nal_entry_t *)arg;
370         struct list_head    *e;
371         kpr_route_entry_t   *re;
372         kpr_gateway_entry_t *ge = NULL;
373         int                  rc = -ENOENT;
374
375         /* Caller wants to know if 'target_nid' can be reached via a gateway
376          * ON HER OWN NETWORK */
377
378         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
379                 ne->kpne_interface.kprni_nalid);
380
381         if (ne->kpne_shutdown)          /* caller is shutting down */
382                 return (-ENOENT);
383
384         read_lock (&kpr_rwlock);
385
386         /* Search routes for one that has a gateway to target_nid on the callers network */
387
388         list_for_each (e, &kpr_routes) {
389                 re = list_entry (e, kpr_route_entry_t, kpre_list);
390
391                 if (re->kpre_lo_nid > target_nid ||
392                     re->kpre_hi_nid < target_nid)
393                         continue;
394
395                 /* found table entry */
396
397                 if (re->kpre_gateway->kpge_nalid != ne->kpne_interface.kprni_nalid ||
398                     !re->kpre_gateway->kpge_alive) {
399                         /* different NAL or gateway down */
400                         rc = -EHOSTUNREACH;
401                         continue;
402                 }
403                 
404                 if (ge == NULL ||
405                     kpr_ge_isbetter (re->kpre_gateway, ge))
406                     ge = re->kpre_gateway;
407         }
408
409         if (ge != NULL) {
410                 kpr_update_weight (ge, nob);
411                 *gateway_nidp = ge->kpge_nid;
412                 rc = 0;
413         }
414         
415         read_unlock (&kpr_rwlock);
416
417         /* NB can't deref 're' now; it might have been removed! */
418
419         CDEBUG (D_NET, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
420                 target_nid, ne->kpne_interface.kprni_nalid, rc,
421                 (rc == 0) ? *gateway_nidp : (ptl_nid_t)0);
422         return (rc);
423 }
424
425 kpr_nal_entry_t *
426 kpr_find_nal_entry_locked (int nal_id)
427 {
428         struct list_head    *e;
429         
430         /* Called with kpr_rwlock held */
431
432         list_for_each (e, &kpr_nals) {
433                 kpr_nal_entry_t *ne = list_entry (e, kpr_nal_entry_t, kpne_list);
434
435                 if (nal_id != ne->kpne_interface.kprni_nalid) /* no match */
436                         continue;
437
438                 return (ne);
439         }
440         
441         return (NULL);
442 }
443
444 void
445 kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
446 {
447         kpr_nal_entry_t     *src_ne = (kpr_nal_entry_t *)arg;
448         ptl_nid_t            target_nid = fwd->kprfd_target_nid;
449         int                  nob = fwd->kprfd_nob;
450         kpr_gateway_entry_t *ge = NULL;
451         kpr_nal_entry_t     *dst_ne = NULL;
452         struct list_head    *e;
453         kpr_route_entry_t   *re;
454         kpr_nal_entry_t     *tmp_ne;
455
456         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
457                 target_nid, src_ne->kpne_interface.kprni_nalid);
458
459         LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
460         LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov));
461         
462         atomic_inc (&kpr_queue_depth);
463         atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
464
465         kpr_fwd_packets++;                   /* (loose) stats accounting */
466         kpr_fwd_bytes += nob;
467
468         if (src_ne->kpne_shutdown)           /* caller is shutting down */
469                 goto out;
470
471         fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
472
473         read_lock (&kpr_rwlock);
474
475         /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
476
477         list_for_each (e, &kpr_routes) {
478                 re = list_entry (e, kpr_route_entry_t, kpre_list);
479
480                 if (re->kpre_lo_nid > target_nid || /* no match */
481                     re->kpre_hi_nid < target_nid)
482                         continue;
483
484                 if (re->kpre_gateway->kpge_nalid == src_ne->kpne_interface.kprni_nalid)
485                         continue;               /* don't route to same NAL */
486
487                 if (!re->kpre_gateway->kpge_alive)
488                         continue;               /* gateway is dead */
489                 
490                 tmp_ne = kpr_find_nal_entry_locked (re->kpre_gateway->kpge_nalid);
491
492                 if (tmp_ne == NULL ||
493                     tmp_ne->kpne_shutdown) {
494                         /* NAL must be registered and not shutting down */
495                         continue;
496                 }
497
498                 if (ge == NULL ||
499                     kpr_ge_isbetter (re->kpre_gateway, ge)) {
500                         ge = re->kpre_gateway;
501                         dst_ne = tmp_ne;
502                 }
503         }
504         
505         if (ge != NULL) {
506                 LASSERT (dst_ne != NULL);
507                 
508                 kpr_update_weight (ge, nob);
509
510                 fwd->kprfd_gateway_nid = ge->kpge_nid;
511                 atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
512
513                 read_unlock (&kpr_rwlock);
514
515                 CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d: "
516                         "to "LPX64" on NAL %d\n", 
517                         fwd, target_nid, src_ne->kpne_interface.kprni_nalid,
518                         fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
519
520                 dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
521                 return;
522         }
523
524         read_unlock (&kpr_rwlock);
525  out:
526         kpr_fwd_errors++;
527
528         CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
529                 target_nid, src_ne->kpne_interface.kprni_nalid);
530
531         /* Can't find anywhere to forward to */
532         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
533
534         atomic_dec (&kpr_queue_depth);
535         atomic_dec (&src_ne->kpne_refcount);
536 }
537
538 void
539 kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
540 {
541         kpr_nal_entry_t *dst_ne = (kpr_nal_entry_t *)arg;
542         kpr_nal_entry_t *src_ne = (kpr_nal_entry_t *)fwd->kprfd_router_arg;
543
544         CDEBUG (D_NET, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
545                 src_ne->kpne_interface.kprni_nalid, dst_ne->kpne_interface.kprni_nalid, error);
546
547         atomic_dec (&dst_ne->kpne_refcount);    /* CAVEAT EMPTOR dst_ne can disappear now!!! */
548
549         (fwd->kprfd_callback)(fwd->kprfd_callback_arg, error);
550
551         CDEBUG (D_NET, "complete(2) [%p] from NAL %d: %d\n", fwd,
552                 src_ne->kpne_interface.kprni_nalid, error);
553
554         atomic_dec (&kpr_queue_depth);
555         atomic_dec (&src_ne->kpne_refcount);    /* CAVEAT EMPTOR src_ne can disappear now!!! */
556 }
557
558 int
559 kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, 
560                ptl_nid_t lo_nid, ptl_nid_t hi_nid)
561 {
562         unsigned long        flags;
563         struct list_head    *e;
564         kpr_route_entry_t   *re;
565         kpr_gateway_entry_t *ge;
566         int                  dup = 0;
567
568         CDEBUG(D_NET, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
569                gateway_nalid, gateway_nid, lo_nid, hi_nid);
570
571         if (gateway_nalid == PTL_NID_ANY ||
572             lo_nid == PTL_NID_ANY ||
573             hi_nid == PTL_NID_ANY ||
574             lo_nid > hi_nid)
575                 return (-EINVAL);
576
577         PORTAL_ALLOC (ge, sizeof (*ge));
578         if (ge == NULL)
579                 return (-ENOMEM);
580
581         ge->kpge_nalid = gateway_nalid;
582         ge->kpge_nid   = gateway_nid;
583         ge->kpge_alive = 1;
584         ge->kpge_timestamp = 0;
585         ge->kpge_refcount = 0;
586         atomic_set (&ge->kpge_weight, 0);
587
588         PORTAL_ALLOC (re, sizeof (*re));
589         if (re == NULL) {
590                 PORTAL_FREE (ge, sizeof (*ge));
591                 return (-ENOMEM);
592         }
593
594         re->kpre_lo_nid = lo_nid;
595         re->kpre_hi_nid = hi_nid;
596
597         LASSERT(!in_interrupt());
598         write_lock_irqsave (&kpr_rwlock, flags);
599
600         list_for_each (e, &kpr_gateways) {
601                 kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
602                                                       kpge_list);
603                 
604                 if (ge2->kpge_nalid == gateway_nalid &&
605                     ge2->kpge_nid == gateway_nid) {
606                         PORTAL_FREE (ge, sizeof (*ge));
607                         ge = ge2;
608                         dup = 1;
609                         break;
610                 }
611         }
612
613         if (!dup) {
614                 /* Adding a new gateway... */
615  
616                 list_add (&ge->kpge_list, &kpr_gateways);
617
618                 /* ...zero all gateway weights so this one doesn't have to
619                  * play catch-up */
620
621                 list_for_each (e, &kpr_gateways) {
622                         kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
623                                                               kpge_list);
624                         atomic_set (&ge2->kpge_weight, 0);
625                 }
626                 
627         }
628
629         re->kpre_gateway = ge;
630         ge->kpge_refcount++;
631         list_add (&re->kpre_list, &kpr_routes);
632
633         write_unlock_irqrestore (&kpr_rwlock, flags);
634         return (0);
635 }
636
637 int
638 kpr_sys_notify (int gateway_nalid, ptl_nid_t gateway_nid,
639             int alive, time_t when)
640 {
641         return (kpr_do_notify (0, gateway_nalid, gateway_nid, alive, when));
642 }
643
644 int
645 kpr_del_route (int gw_nalid, ptl_nid_t gw_nid,
646                ptl_nid_t lo, ptl_nid_t hi)
647 {
648         int                specific = (lo != PTL_NID_ANY);
649         unsigned long      flags;
650         int                rc = -ENOENT;
651         struct list_head  *e;
652         struct list_head  *n;
653
654         CDEBUG(D_NET, "Del route [%d] "LPX64" : "LPX64" - "LPX64"\n", 
655                gw_nalid, gw_nid, lo, hi);
656
657         LASSERT(!in_interrupt());
658
659         /* NB Caller may specify either all routes via the given gateway
660          * (lo/hi == PTL_NID_ANY) or a specific route entry (lo/hi are
661          * actual NIDs) */
662         
663         if (specific ? (hi == PTL_NID_ANY || hi < lo) : (hi != PTL_NID_ANY))
664                 return (-EINVAL);
665
666         write_lock_irqsave(&kpr_rwlock, flags);
667
668         list_for_each_safe (e, n, &kpr_routes) {
669                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
670                                                    kpre_list);
671                 kpr_gateway_entry_t *ge = re->kpre_gateway;
672                 
673                 if (ge->kpge_nalid != gw_nalid ||
674                     ge->kpge_nid != gw_nid ||
675                     (specific && 
676                      (lo != re->kpre_lo_nid || hi != re->kpre_hi_nid)))
677                         continue;
678
679                 rc = 0;
680
681                 if (--ge->kpge_refcount == 0) {
682                         list_del (&ge->kpge_list);
683                         PORTAL_FREE (ge, sizeof (*ge));
684                 }
685
686                 list_del (&re->kpre_list);
687                 PORTAL_FREE(re, sizeof (*re));
688
689                 if (specific)
690                         break;
691         }
692
693         write_unlock_irqrestore(&kpr_rwlock, flags);
694         return (rc);
695 }
696
697 int
698 kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
699                ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive)
700 {
701         struct list_head  *e;
702
703         read_lock(&kpr_rwlock);
704
705         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
706                 kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
707                                                      kpre_list);
708                 kpr_gateway_entry_t *ge = re->kpre_gateway;
709                 
710                 if (idx-- == 0) {
711                         *gateway_nalid = ge->kpge_nalid;
712                         *gateway_nid = ge->kpge_nid;
713                         *alive = ge->kpge_alive;
714                         *lo_nid = re->kpre_lo_nid;
715                         *hi_nid = re->kpre_hi_nid;
716
717                         read_unlock(&kpr_rwlock);
718                         return (0);
719                 }
720         }
721
722         read_unlock (&kpr_rwlock);
723         return (-ENOENT);
724 }
725
726 static void /*__exit*/
727 kpr_finalise (void)
728 {
729         LASSERT (list_empty (&kpr_nals));
730
731         while (!list_empty (&kpr_routes)) {
732                 kpr_route_entry_t *re = list_entry(kpr_routes.next,
733                                                    kpr_route_entry_t,
734                                                    kpre_list);
735
736                 list_del(&re->kpre_list);
737                 PORTAL_FREE(re, sizeof (*re));
738         }
739
740         kpr_proc_fini();
741
742         PORTAL_SYMBOL_UNREGISTER(kpr_router_interface);
743         PORTAL_SYMBOL_UNREGISTER(kpr_control_interface);
744
745         CDEBUG(D_MALLOC, "kpr_finalise: kmem back to %d\n",
746                atomic_read(&portal_kmemory));
747 }
748
749 static int __init
750 kpr_initialise (void)
751 {
752         CDEBUG(D_MALLOC, "kpr_initialise: kmem %d\n",
753                atomic_read(&portal_kmemory));
754
755         kpr_proc_init();
756
757         PORTAL_SYMBOL_REGISTER(kpr_router_interface);
758         PORTAL_SYMBOL_REGISTER(kpr_control_interface);
759         return (0);
760 }
761
762 MODULE_AUTHOR("Eric Barton");
763 MODULE_DESCRIPTION("Kernel Portals Router v0.01");
764 MODULE_LICENSE("GPL");
765
766 module_init (kpr_initialise);
767 module_exit (kpr_finalise);
768
769 EXPORT_SYMBOL (kpr_control_interface);
770 EXPORT_SYMBOL (kpr_router_interface);