Whamcloud - gitweb
LU-8734 gnilnd: Handle dla credits exhaustion
[fs/lustre-release.git] / lnet / klnds / gnilnd / gnilnd_stack.c
1 /*
2  * Copyright (C) 2012 Cray, Inc.
3  *
4  * Copyright (c) 2014, Intel Corporation.
5  *
6  *   Author: Nic Henke <nic@cray.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24 #include "gnilnd.h"
25 #if defined(GNILND_USE_RCA)
26 #include <rsms/rs_sm_states.h>
27 #endif
28 /* Advance all timeouts by nap_time seconds. */
29 void
30 kgnilnd_bump_timeouts(__u32 nap_time, char *reason)
31 {
32         int                     i;
33         kgn_peer_t             *peer;
34         kgn_conn_t             *conn;
35         kgn_tx_t               *tx;
36         kgn_device_t           *dev;
37         kgn_dgram_t            *dgram;
38
39         CDEBUG(D_INFO, "%s: bumping all timeouts by %ds\n", reason, nap_time);
40
41         LASSERTF(GNILND_IS_QUIESCED, "gnilnd not quiesced %d != %d\n",
42                  atomic_read(&kgnilnd_data.kgn_nquiesce),
43                  atomic_read(&kgnilnd_data.kgn_nthreads));
44
45         /* requiring that the threads are paused ensures a couple of things:
46          * - combined code paths for stack reset and quiesce event as stack reset
47          *   runs with the threads paused
48          * - prevents traffic to the Gemini during a quiesce period
49          * - reduces the locking requirements
50         */
51
52         for (i = 0; i < *kgnilnd_tunables.kgn_peer_hash_size; i++) {
53                 list_for_each_entry(peer, &kgnilnd_data.kgn_peers[i], gnp_list) {
54
55                         /* we can reconnect again at any time */
56                         peer->gnp_reconnect_time = jiffies;
57                         /* reset now that network is healthy */
58                         peer->gnp_reconnect_interval = 0;
59                         /* tell LNet dude is still alive */
60                         kgnilnd_peer_alive(peer);
61                         kgnilnd_peer_notify(peer, 0, 1);
62
63                         list_for_each_entry(tx, &peer->gnp_tx_queue, tx_list) {
64                                 tx->tx_qtime = jiffies;
65                         }
66
67                         list_for_each_entry(conn, &peer->gnp_conns, gnc_list) {
68                                 unsigned long           timeout;
69
70                                 timeout = cfs_time_seconds(conn->gnc_timeout);
71
72                                 /* bump last_rx/last_rx_cq on all conns - including
73                                  * closed ones, this will have the effect of
74                                  * bumping the purgatory timers for those */
75                                 conn->gnc_last_rx = conn->gnc_last_rx_cq = jiffies;
76
77                                 /* we don't timeout based on old gnc_last_tx, so
78                                  * we'll back it up and schedule the conn to trigger
79                                  * a NOOP */
80                                 conn->gnc_last_tx = jiffies - timeout;
81                                 if (conn->gnc_state != GNILND_CONN_DONE)
82                                 kgnilnd_schedule_conn(conn);
83                         }
84                 }
85         }
86
87         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
88                 dev = &kgnilnd_data.kgn_devices[i];
89                 for (i = 0; i < (*kgnilnd_tunables.kgn_peer_hash_size - 1); i++) {
90                         list_for_each_entry(dgram, &dev->gnd_dgrams[i], gndg_list) {
91                                 dgram->gndg_post_time = jiffies;
92                         }
93                 }
94         }
95 }
96
97 /* Quiesce or wake up the stack.  The caller must hold the kgn_quiesce_sem semaphore
98  * on entry, which holds off any pending stack shutdown.   */
99 void
100 kgnilnd_quiesce_wait(char *reason)
101 {
102         int             i;
103
104         if (kgnilnd_data.kgn_quiesce_trigger) {
105                 unsigned long   quiesce_deadline, quiesce_to;
106                 /* FREEZE TAG!!!! */
107
108                 /* morning sunshine */
109                 spin_lock(&kgnilnd_data.kgn_reaper_lock);
110                 wake_up_all(&kgnilnd_data.kgn_reaper_waitq);
111                 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
112
113                 for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
114                         kgn_device_t *dev = &kgnilnd_data.kgn_devices[i];
115
116                         wake_up_all(&dev->gnd_waitq);
117                         wake_up_all(&dev->gnd_dgram_waitq);
118                         wake_up_all(&dev->gnd_dgping_waitq);
119                 }
120
121                 kgnilnd_wakeup_rca_thread();
122
123                 /* we'll wait for 10x the timeout for the threads to pause */
124                 quiesce_to = cfs_time_seconds(*kgnilnd_tunables.kgn_timeout * 10);
125                 quiesce_deadline = (long) jiffies + quiesce_to;
126
127                 LCONSOLE_INFO("Quiesce start: %s\n", reason);
128                 /* wait for everyone to check-in as quiesced */
129                 while (!GNILND_IS_QUIESCED) {
130                         CDEBUG(D_INFO,
131                                  "%s: Waiting for %d threads to pause\n",
132                                  reason,
133                                  atomic_read(&kgnilnd_data.kgn_nthreads) -
134                                  atomic_read(&kgnilnd_data.kgn_nquiesce));
135                         CFS_RACE(CFS_FAIL_GNI_QUIESCE_RACE);
136                         set_current_state(TASK_UNINTERRUPTIBLE);
137                         schedule_timeout(cfs_time_seconds(1 * i));
138
139                         LASSERTF(quiesce_deadline > jiffies,
140                                  "couldn't quiesce threads in %lu seconds, falling over now\n",
141                                  cfs_duration_sec(quiesce_to));
142                 }
143
144                 CDEBUG(D_INFO, "%s: All threads paused!\n", reason);
145                 /* XXX Nic: Is there a set of counters we can grab here to
146                  * ensure that there is no traffic until quiesce is over ?*/
147         } else {
148                 LCONSOLE_INFO("Quiesce complete: %s\n", reason);
149
150                 for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
151                         kgn_device_t *dev = &kgnilnd_data.kgn_devices[i];
152                         kgnilnd_schedule_dgram(dev);
153                 }
154
155                 /* wait for everyone to check-in as running - they will be spinning
156                  * and looking, so no need to poke any waitq */
157                 while (atomic_read(&kgnilnd_data.kgn_nquiesce) > 0) {
158                         CDEBUG(D_INFO,
159                                  "%s: Waiting for %d threads to wake up\n",
160                                   reason,
161                                   atomic_read(&kgnilnd_data.kgn_nquiesce));
162                         set_current_state(TASK_UNINTERRUPTIBLE);
163                         schedule_timeout(cfs_time_seconds(1 * i));
164                 }
165
166                 CDEBUG(D_INFO, "%s: All threads awake!\n", reason);
167         }
168 }
169
170 /* Reset the stack.  */
171 void
172 kgnilnd_reset_stack(void)
173 {
174         int              i, rc = 0;
175         kgn_net_t       *net;
176         kgn_peer_t      *peer, *peerN;
177         LIST_HEAD        (souls);
178         char            *reason = "critical hardware error";
179         __u32            seconds;
180         unsigned long    start, end;
181         ENTRY;
182
183         /* Race with del_peer and its atomics */
184         CFS_RACE(CFS_FAIL_GNI_RACE_RESET);
185
186         if (kgnilnd_data.kgn_init != GNILND_INIT_ALL) {
187                 CERROR("can't reset the stack, gnilnd is not initialized\n");
188                 RETURN_EXIT;
189         }
190
191         /* First make sure we are not already quiesced - we panic if so,
192          * as that could leave software in a bad state */
193         LASSERTF(kgnilnd_data.kgn_quiesce_trigger == GNILND_QUIESCE_IDLE,
194                 "can't reset the stack, already doing so: trigger %d\n",
195                  kgnilnd_data.kgn_quiesce_trigger);
196
197         set_mb(kgnilnd_data.kgn_quiesce_trigger, GNILND_QUIESCE_RESET);
198
199         /* wake up the dgram waitq thread - but after trigger set to make sure it
200          * goes into quiesce */
201         CFS_RACE(CFS_FAIL_GNI_WC_DGRAM_FREE);
202         /* same for scheduler that is dropping state transitiosn */
203         CFS_RACE(CFS_FAIL_GNI_DROP_CLOSING);
204         CFS_RACE(CFS_FAIL_GNI_DROP_DESTROY_EP);
205
206         kgnilnd_quiesce_wait(reason);
207
208         start = jiffies;
209
210         kgnilnd_data.kgn_in_reset = 1;
211         kgnilnd_data.kgn_nresets++;
212         LCONSOLE_WARN("%s: resetting all resources (count %d)\n",
213                       reason, kgnilnd_data.kgn_nresets);
214
215         for (i = 0; i < *kgnilnd_tunables.kgn_net_hash_size; i++) {
216                 list_for_each_entry(net, &kgnilnd_data.kgn_nets[i], gnn_list) {
217                         rc = kgnilnd_cancel_net_dgrams(net);
218                         LASSERTF(rc == 0, "couldn't cleanup datagrams: %d\n", rc);
219                 }
220         }
221
222         /* error -ENOTRECOVERABLE is stack reset */
223         kgnilnd_del_conn_or_peer(NULL, LNET_NID_ANY, GNILND_DEL_CONN, -ENOTRECOVERABLE);
224
225         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
226                 kgn_device_t    *dev = &kgnilnd_data.kgn_devices[i];
227                 kgnilnd_cancel_wc_dgrams(dev);
228                 kgnilnd_wait_for_canceled_dgrams(dev);
229         }
230
231         /* manually do some conn processing ala kgnilnd_process_conns */
232         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
233                 kgn_device_t    *dev = &kgnilnd_data.kgn_devices[i];
234                 kgn_conn_t      *conn;
235                 int              conn_sched;
236
237                 /* go find all the closed conns that need to be nuked - the
238                  * scheduler thread isn't running to do this for us */
239
240                 CDEBUG(D_NET, "will try to clear up %d ready_conns\n",
241                         kgnilnd_count_list(&dev->gnd_ready_conns));
242
243                 /* use while/list_first_entry loop to ensure we can handle any
244                  * DESTROY_EP conns added from kgnilnd_complete_closed_conn */
245                 while (!list_empty(&dev->gnd_ready_conns)) {
246                         conn = list_first_entry(&dev->gnd_ready_conns,
247                                                 kgn_conn_t, gnc_schedlist);
248                         conn_sched = xchg(&conn->gnc_scheduled, GNILND_CONN_PROCESS);
249
250                         LASSERTF(conn_sched != GNILND_CONN_IDLE &&
251                                  conn_sched != GNILND_CONN_PROCESS,
252                                  "conn %p on ready list but in bad state: %d\n",
253                                  conn, conn_sched);
254
255                         list_del_init(&conn->gnc_schedlist);
256
257                         if (!list_empty(&conn->gnc_delaylist))
258                                 list_del_init(&conn->gnc_delaylist); 
259
260                         if (conn->gnc_state == GNILND_CONN_CLOSING) {
261                                 /* bump to CLOSED to fake out send of CLOSE */
262                                 conn->gnc_state = GNILND_CONN_CLOSED;
263                                 conn->gnc_close_sent = 1;
264                         }
265
266                         if (conn->gnc_state == GNILND_CONN_DESTROY_EP) {
267                                 kgnilnd_destroy_conn_ep(conn);
268                         } else {
269                                 kgnilnd_complete_closed_conn(conn);
270                         }
271
272                         /* there really shouldn't be any other states here -
273                          * they would have been cleared out in the del_peer_or_conn or the dgram
274                          * aborts above.
275                          * there is an LASSERTF in kgnilnd_complete_closed_conn that will take
276                          * care of catching anything else for us */
277
278                         kgnilnd_schedule_process_conn(conn, -1);
279
280                         kgnilnd_conn_decref(conn);
281                 }
282         }
283
284         /* don't let the little weasily purgatory conns hide from us */
285         for (i = 0; i < *kgnilnd_tunables.kgn_peer_hash_size; i++) {
286                 list_for_each_entry_safe(peer, peerN, &kgnilnd_data.kgn_peers[i], gnp_list) {
287                         kgn_conn_t       *conn, *connN;
288
289                         list_for_each_entry_safe(conn, connN, &peer->gnp_conns, gnc_list) {
290                                 kgnilnd_detach_purgatory_locked(conn, &souls);
291                         }
292                 }
293         }
294
295         CDEBUG(D_NET, "about to release %d purgatory entries\n",
296                 kgnilnd_count_list(&souls));
297
298         kgnilnd_release_purgatory_list(&souls);
299
300         /* validate we are now clean */
301         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
302                 kgn_device_t    *dev = &kgnilnd_data.kgn_devices[i];
303
304                 /* now all the cons/mboxes should be cleaned up, including purgatory
305                  * so go through and release the MDDs for our persistent PHYS fma_blks
306                  */
307                 kgnilnd_unmap_fma_blocks(dev);
308
309                 LASSERTF(atomic_read(&dev->gnd_nfmablk) == 0,
310                         "reset failed: fma blocks still live %d\n",
311                         atomic_read(&dev->gnd_nfmablk));
312
313                 LASSERTF(atomic_read(&dev->gnd_neps) == 0,
314                         "reset failed: EP handles still live %d\n",
315                         atomic_read(&dev->gnd_neps));
316         }
317
318         LASSERTF(atomic_read(&kgnilnd_data.kgn_nconns) == 0,
319                 "reset failed: conns left %d\n",
320                 atomic_read(&kgnilnd_data.kgn_nconns));
321
322         /* fine to have peers left - they are waiting for new conns
323          * but should not be holding any open HW resources */
324
325         /* like the last part of kgnilnd_base_shutdown() */
326
327         CFS_RACE(CFS_FAIL_GNI_SR_DOWN_RACE);
328
329         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
330                 kgnilnd_dev_fini(&kgnilnd_data.kgn_devices[i]);
331         }
332
333         /* no need to free and recreate the TX descriptors
334          * we nuked all the ones that could be using HW resources in
335          * kgnilnd_close_matching_conns and asserted it worked in
336          * kgnilnd_dev_fini */
337
338         /* At this point, all HW is torn down, start to reset */
339
340         /* only reset our known devs */
341         for (i = 0; i < kgnilnd_data.kgn_ndevs; i++) {
342                 kgn_device_t    *dev = &kgnilnd_data.kgn_devices[i];
343                 rc = kgnilnd_dev_init(dev);
344                 LASSERTF(rc == 0, "dev_init failed for dev %d\n", i);
345                 kgnilnd_map_phys_fmablk(dev);
346                 LASSERTF(rc == 0, "map_phys_fmablk failed for dev %d\n", i);
347                 rc = kgnilnd_setup_wildcard_dgram(dev);
348                 LASSERTF(rc == 0, "couldnt setup datagrams on dev %d: %d\n",
349                         i, rc);
350         }
351
352         /* Now the fun restarts... - release the hounds! */
353
354         end = jiffies;
355         seconds = cfs_duration_sec((long)end - start);
356         kgnilnd_bump_timeouts(seconds, reason);
357
358         kgnilnd_data.kgn_in_reset = 0;
359         set_mb(kgnilnd_data.kgn_quiesce_trigger, GNILND_QUIESCE_IDLE);
360         kgnilnd_quiesce_wait(reason);
361         LCONSOLE_WARN("%s reset of all hardware resources\n",
362                 rc ? "failed" : "successful");
363
364         RETURN_EXIT;
365 }
366
367 /* A thread that handles quiece and reset hardware events.
368  * We do the same thing regardless of which device reported the event. */
369 int
370 kgnilnd_ruhroh_thread(void *arg)
371 {
372         int                i = 1;
373         DEFINE_WAIT(wait);
374
375         cfs_block_allsigs();
376         set_user_nice(current, *kgnilnd_tunables.kgn_nice);
377         kgnilnd_data.kgn_ruhroh_running = 1;
378
379         while (1) {
380
381                 /* Block until there's a request..  A reset request could come in
382                  * while we're handling a quiesce one, or vice versa.
383                  * Keep processing requests until there are none.*/
384                 prepare_to_wait(&kgnilnd_data.kgn_ruhroh_waitq, &wait, TASK_INTERRUPTIBLE);
385                 while (!(kgnilnd_data.kgn_ruhroh_shutdown ||
386                                 kgnilnd_data.kgn_needs_reset || kgnilnd_data.kgn_needs_pause))
387                         schedule();
388                 finish_wait(&kgnilnd_data.kgn_ruhroh_waitq, &wait);
389
390                /* Exit if the driver is shutting down. */
391                 if (kgnilnd_data.kgn_ruhroh_shutdown)
392                         break;
393
394                 /* Serialize with driver startup and shutdown. */
395                 mutex_lock(&kgnilnd_data.kgn_quiesce_mutex);
396
397                CDEBUG(D_NET, "trigger %d reset %d to_bump %d pause %d\n",
398                         kgnilnd_data.kgn_quiesce_trigger,
399                         kgnilnd_data.kgn_needs_reset,
400                         kgnilnd_data.kgn_bump_info_rdy,
401                         kgnilnd_data.kgn_needs_pause);
402
403                 /* Do we need to do a pause/quiesce? */
404                 if (kgnilnd_data.kgn_needs_pause) {
405
406                         /* Pause all other kgnilnd threads. */
407                         set_mb(kgnilnd_data.kgn_quiesce_trigger, GNILND_QUIESCE_HW_QUIESCE);
408                         kgnilnd_quiesce_wait("hardware quiesce");
409
410                         /* If the hardware quiesce flag is set, wait for it to clear.
411                          * This should happen relatively quickly, so we wait for it.
412                          * This will hold up the eventd thread, but on everything but
413                          * the simulator, this is ok-- there is one thread per core.
414                          *
415                          * Handle (possibly multiple) quiesce events while we wait. The
416                          * memory barrier ensures that the core doesn't start fetching
417                          * kgn_bump_info_rdy before it fetches kgn_needs_pause, and
418                          * matches the second mb in kgnilnd_quiesce_end_callback(). */
419                         smp_rmb();
420                         while (kgnilnd_hw_in_quiesce() || kgnilnd_data.kgn_bump_info_rdy) {
421
422                                 i++;
423                                 CDEBUG(D_INFO, "Waiting for hardware quiesce "
424                                                "flag to clear\n");
425                                 set_current_state(TASK_UNINTERRUPTIBLE);
426                                 schedule_timeout(cfs_time_seconds(1 * i));
427
428                                 /* If we got a quiesce event with bump info, DO THE BUMP!. */
429                                 if (kgnilnd_data.kgn_bump_info_rdy) {
430                                         /* reset console rate limiting for each event */
431                                         i = 1;
432
433                                         /* Make sure the core doesn't start fetching
434                                          * kgni_quiesce_seconds until after it sees
435                                          * kgn_bump_info_rdy set.  This is the match to the
436                                          * first mb in kgnilnd_quiesce_end_callback(). */
437                                         smp_rmb();
438                                         (void) kgnilnd_bump_timeouts(kgnilnd_data.kgn_quiesce_secs,
439                                                                "hardware quiesce callback");
440                                         set_mb(kgnilnd_data.kgn_quiesce_secs, 0);
441                                         set_mb(kgnilnd_data.kgn_bump_info_rdy, 0);
442                                 }
443                       }
444
445                         /* Reset the kgn_needs_pause flag before coming out of
446                          * the pause.  This ordering avoids a race with the
447                          * setting of this flag in kgnilnd_pause_threads().  */
448                         set_mb(kgnilnd_data.kgn_needs_pause, 0);
449
450                         /* ok, let the kids back into the pool */
451                         set_mb(kgnilnd_data.kgn_quiesce_trigger, GNILND_QUIESCE_IDLE);
452                         kgnilnd_quiesce_wait("hardware quiesce");
453                 }
454
455                 /* Do a stack reset if needed. */
456                 if (kgnilnd_data.kgn_needs_reset) {
457                         kgnilnd_reset_stack();
458                         set_mb(kgnilnd_data.kgn_needs_reset, 0);
459                 }
460
461                 mutex_unlock(&kgnilnd_data.kgn_quiesce_mutex);
462         }
463
464         kgnilnd_data.kgn_ruhroh_running = 0;
465         return 0;
466 }
467
468 /* Set pause request flag.  Any functions that
469  * call this one are responsible for ensuring that
470  * variables they set up are visible on other cores before
471  * this flag setting.  This executes in interrupt or kernel
472  * thread context.  */
473 void
474 kgnilnd_pause_threads(void)
475 {
476         /* only device 0 gets the handle, see kgnilnd_dev_init */
477         kgn_device_t  *dev = &kgnilnd_data.kgn_devices[0];
478         LASSERTF(dev != NULL, "dev 0 is NULL\n");
479
480         /* If we're currently in a pause triggered by the pause flag,
481          * there's no need to set it again.  We clear the kgn_needs_pause
482          * flag before we reset kgn_quiesce_trigger to avoid a race.  The
483          * read memory barrier matches the setmb() on the trigger in
484          * kgnilnd_ruhroh_task().                                       */
485         smp_rmb();
486         if (!(kgnilnd_data.kgn_quiesce_trigger == GNILND_QUIESCE_HW_QUIESCE &&
487                         GNILND_IS_QUIESCED)) {
488                  CDEBUG(D_NET, "requesting thread pause\n");
489
490                 kgnilnd_data.kgn_needs_pause = 1;
491
492                 wake_up(&kgnilnd_data.kgn_ruhroh_waitq);
493         } else {
494             CDEBUG(D_NET, "thread pause already underway\n");
495         }
496 }
497
498 /* Return non-zero if the GNI hardware quiesce flag is set */
499 int
500 kgnilnd_hw_in_quiesce(void)
501 {
502         /* only device 0 gets the handle, see kgnilnd_dev_init */
503         kgn_device_t      *dev0 = &kgnilnd_data.kgn_devices[0];
504
505         LASSERTF(dev0 != NULL, "dev 0 is NULL\n");
506
507         smp_rmb();
508         return kgnilnd_get_quiesce_status(dev0->gnd_handle) != 0;
509 }
510
511
512 /* If the GNI hardware quiesce flag is set, initiate our pause and
513  * return non-zero.  Also return non-zero if the stack is shutting down. */
514 int
515 kgnilnd_check_hw_quiesce(void)
516 {
517         if (likely(!kgnilnd_hw_in_quiesce()))
518                 return 0;
519
520         if (!kgnilnd_data.kgn_ruhroh_shutdown) {
521                 CDEBUG(D_NET, "initiating thread pause\n");
522                 kgnilnd_pause_threads();
523         } else {
524                 CDEBUG(D_NET, "thread pause bypassed because of shutdown\n");
525         }
526
527         return 1;
528 }
529
530 /* Callback from kngi with the quiesce duration.  This executes
531  * in interrupt context.                                        */
532 void
533 kgnilnd_quiesce_end_callback(gni_nic_handle_t nic_handle, uint64_t msecs)
534 {
535         /* only device 0 gets the handle, see kgnilnd_dev_init */
536         kgn_device_t  *dev = &kgnilnd_data.kgn_devices[0];
537         LASSERTF(dev != NULL, "dev 0 is NULL\n");
538
539         if (!kgnilnd_data.kgn_ruhroh_shutdown) {
540
541                 CDEBUG(D_NET, "requesting timeout bump by %lld msecs\n", msecs);
542
543                 /* Save the bump interval and request the bump.
544                  * The memory barrier ensures that the interval is in place before
545                  * the bump flag can be seen (in case a core is already running the
546                  * ruhroh task), and that the bump request flag in place before
547                  * the pause request can be seen (to ensure a core doesn't miss the bump
548                  * request flag).       */
549                 /* If another callback occurred before the ruhroh task
550                  * finished processing the first bump request, we'd over-write its info.
551                  * Nic says that callbacks occur so slowly that this isn't an issue.    */
552                 set_mb(kgnilnd_data.kgn_quiesce_secs, msecs / MSEC_PER_SEC);
553                 set_mb(kgnilnd_data.kgn_bump_info_rdy, 1);
554                 kgnilnd_pause_threads();
555         } else {
556                 CDEBUG(D_NET, "timeout bump bypassed because of shutdown\n");
557         }
558 }
559
560 void
561 kgnilnd_critical_error(struct gni_err *err_handle)
562 {
563         /* only device 0 gets the handle, see kgnilnd_dev_init */
564         kgn_device_t  *dev = &kgnilnd_data.kgn_devices[0];
565         LASSERTF(dev != NULL, "dev 0 is NULL\n");
566
567         if (!kgnilnd_data.kgn_ruhroh_shutdown) {
568                 CDEBUG(D_NET, "requesting stack reset\n");
569                 kgnilnd_data.kgn_needs_reset = 1;
570                 wake_up(&kgnilnd_data.kgn_ruhroh_waitq);
571         } else {
572                 CDEBUG(D_NET, "stack reset bypassed because of shutdown\n");
573         }
574 }
575
576 #if defined(GNILND_USE_RCA)
577 #include <krca_lib.h>
578 #define RCA_EVENTS 3
579 /* RCA ticket is needed for krca_wakeup_wait_event() */
580 static krca_ticket_t rca_krt = KRCA_NULL_TICKET;
581 struct rcadata {
582         rca_ticket_t ticket;
583         int subscribed;
584         rs_event_code_t ec;
585 };
586 static struct rcadata rd[RCA_EVENTS] = {
587         { .ec = ec_node_unavailable },
588         { .ec = ec_node_available },
589         { .ec = ec_node_failed } };
590
591 /* thread for receiving rca events */
592 int
593 kgnilnd_rca(void *arg)
594 {
595         int        i, rc;
596         int        retry_count;
597         rs_event_t event;
598         lnet_nid_t nid;
599
600         cfs_block_allsigs();
601
602         /* all gnilnd threads need to run fairly urgently */
603         set_user_nice(current, *kgnilnd_tunables.kgn_nice);
604
605         /*
606          * Register our service with RCA and subscribe to events
607          * of interest.
608          */
609         rca_krt = KRCA_NULL_TICKET;
610         rc = krca_register(&rca_krt, RCA_SVCTYPE_GNILND, current->pid, 0);
611         if (rc < 0) {
612                 CNETERR("krca_register(%x) returned %d\n", current->pid, rc);
613                 goto done;
614         }
615
616         for (i = 0; i < RCA_EVENTS; i++) {
617                 retry_count = 0;
618 subscribe_retry:
619                 rc = krca_subscribe(&rca_krt, rd[i].ec, RCA_RX_SVC_ANY,
620                                     &rd[i].ticket);
621
622                 if ((rc == -EINTR) && !retry_count) {
623                         retry_count++;
624                         CNETERR("krca_subscribe returned %d - retrying\n", rc);
625                         goto subscribe_retry;
626                 }
627
628                 if (rc < 0) {
629                         CNETERR("rca subscription failed (%d)\n", rc);
630                         goto done;
631                 }
632
633                 rd[i].subscribed = 1;
634         }
635
636         while (!kgnilnd_data.kgn_shutdown) {
637                 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
638                         KGNILND_SPIN_QUIESCE;
639                 }
640                 /* wait here for a subscribed event */
641                 rc = krca_wait_event(&rca_krt);
642
643                 /* RCA return values:
644                  * 0 indicates krca_wakeup_wait_event caused krca_wait_event
645                  *   return.
646                  * -ERESTARTSYS indicates krca_wait_event returned because of a
647                  *   signal.
648                  * -ENOSPC indicates no space available to create an rcad_reg_t
649                  * 1 indicates a message is waiting.
650                  */
651                 if (rc <= 0) {
652                         continue;
653                 }
654
655                 if (krca_get_message(&rca_krt, &event) == 0) {
656                         int node_down = GNILND_PEER_UNKNOWN;
657                         rs_state_t state;
658                         LIST_HEAD(zombies);
659
660                         /* Compute nodes don't care about other compute nodes
661                          * so we don't need to create a peer.
662                          */
663                         if (GNILND_COMPUTE &&
664                             !RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
665                                         IS_SVC)) {
666                                 continue;
667                         }
668
669                         /* Only care about compute and service nodes not GPUs */
670                         if (!(RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
671                                         TYPE) == rt_node ||
672                              RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
673                                         TYPE) == rt_accel)) {
674                                                 continue;
675                         }
676
677                         switch (event.ev_id) {
678                         case ec_node_available:
679                                 CDEBUG(D_INFO, "ec_node_available\n");
680                                 node_down = GNILND_PEER_UP;
681                                 break;
682                         case ec_node_failed:
683                                 CDEBUG(D_INFO, "ec_node_failed\n");
684                                 if (event.ev_len > 0) {
685                                         CDEBUG(D_ERROR,
686                                                 "ec_node_failed ignored\n");
687                                         break;
688                                 }
689                                 node_down = GNILND_PEER_DOWN;
690                                 break;
691                         case ec_node_unavailable:
692                                 state = RSN_GET_FLD(event.ev_gen.svid_node.rsn_intval, STATE);
693
694                                 CDEBUG(D_INFO, "ec_node_unavailable\n");
695
696                                 /*
697                                  * Ignore overloaded ec_node_unavailable events
698                                  * generated by 'xtcli set_reserve'.
699                                  */
700                                 if (RS_GET_CS_STATE(state) == RS_CS_READY) {
701                                         CDEBUG(D_INFO, "ignoring "
702                                                 "ec_node_unavailable event with"
703                                                 " RS_CS_READY state\n");
704                                         break;
705                                 }
706                                 node_down = GNILND_PEER_DOWN;
707                                 break;
708                         default:
709                                 CDEBUG(D_INFO, "unknown event\n");
710                                 break;
711                         }
712
713                         /* if we get an event we don't know about, just go ahead
714                          * and wait for another event */
715                         if (node_down == GNILND_PEER_UNKNOWN)
716                                 continue;
717
718                         nid = RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
719                                           NID);
720                         CDEBUG(D_INFO,"kgnilnd_rca() reporting nid %d %s\n",
721                                (int)nid, node_down ? "down" : "up");
722                         kgnilnd_report_node_state(nid, node_down);
723
724                 } else {
725                         CNETERR("krca_get_message failed\n");
726                 }
727         }
728
729 done:
730         CDEBUG(D_INFO, "done\n");
731
732         for (i = 0; i < RCA_EVENTS; i++) {
733                 if (rd[i].subscribed) {
734                         rc = krca_unsubscribe(&rca_krt, rd[i].ticket);
735
736                         if (rc) {
737                                 CNETERR("rca unsubscribe failed (%d)\n", rc);
738                         }
739
740                         rd[i].subscribed = 0;
741                 }
742         }
743
744         krca_unregister(&rca_krt);
745         kgnilnd_thread_fini();
746         return 0;
747
748 }
749
750 int
751 kgnilnd_start_rca_thread(void)
752 {
753         return kgnilnd_thread_start(kgnilnd_rca, NULL, "kgnilnd_rca", 0);
754 }
755
756 void
757 kgnilnd_wakeup_rca_thread(void)
758 {
759         int ret;
760
761         ret = krca_wakeup_wait_event(&rca_krt);
762
763         if (ret) {
764                 CDEBUG(D_ERROR, "krca_wakeup_wait_event failed\n");
765         }
766 }
767
768 int
769 kgnilnd_get_node_state(__u32 nid)
770 {
771         int i;
772         int rc = GNILND_PEER_UNKNOWN;
773         int ret;
774         rs_node_array_t nlist;
775         rs_node_t       *na = NULL;
776
777         if ((ret = krca_get_sysnodes(&nlist)) < 0) {
778                 CDEBUG(D_NETERROR, "krca_get_sysnodes failed %d\n", ret);
779                 goto ns_done;
780         }
781
782         na = nlist.na_ids;
783
784         for (i = 0; i < nlist.na_len; i++) {
785                 if ((rca_nid_t)RSN_GET_FLD(na[i].rs_node_flat, NID) == nid) {
786                         rc = RSN_GET_FLD(na[i].rs_node_flat, STATE) == RS_CS_READY ?
787                                 GNILND_PEER_UP : GNILND_PEER_DOWN;
788                         break;
789                 }
790         }
791
792 ns_done:
793         kfree(na);
794         CDEBUG(D_NET, "nid %d rc %d (0=up)\n", nid, rc);
795         return rc;
796 }
797
798 #else /* GNILND_USE_RCA */
799
800 int
801 kgnilnd_start_rca_thread(void)
802 {
803         return 0;
804 }
805
806 void
807 kgnilnd_wakeup_rca_thread(void)
808 {
809 }
810
811 int
812 kgnilnd_get_node_state(__u32 nid)
813 {
814         return GNILND_PEER_UP;
815 }
816 #endif /* GNILND_USE_RCA */