Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/ptllnd/ptllnd.c
37  *
38  * Author: Eric Barton <eeb@bartonsoftware.com>
39  */
40  */
41
42 #include "ptllnd.h"
43
44 lnd_t               the_ptllnd = {
45         .lnd_type       = PTLLND,
46         .lnd_startup    = ptllnd_startup,
47         .lnd_shutdown   = ptllnd_shutdown,
48         .lnd_ctl        = ptllnd_ctl,
49         .lnd_send       = ptllnd_send,
50         .lnd_recv       = ptllnd_recv,
51         .lnd_eager_recv = ptllnd_eager_recv,
52         .lnd_notify     = ptllnd_notify,
53         .lnd_wait       = ptllnd_wait,
54         .lnd_setasync   = ptllnd_setasync,
55 };
56
57 static int ptllnd_ni_count = 0;
58
59 static struct list_head ptllnd_idle_history;
60 static struct list_head ptllnd_history_list;
61
62 void
63 ptllnd_history_fini(void)
64 {
65         ptllnd_he_t *he;
66
67         while (!list_empty(&ptllnd_idle_history)) {
68                 he = list_entry(ptllnd_idle_history.next,
69                                 ptllnd_he_t, he_list);
70                 
71                 list_del(&he->he_list);
72                 LIBCFS_FREE(he, sizeof(*he));
73         }
74         
75         while (!list_empty(&ptllnd_history_list)) {
76                 he = list_entry(ptllnd_history_list.next,
77                                 ptllnd_he_t, he_list);
78                 
79                 list_del(&he->he_list);
80                 LIBCFS_FREE(he, sizeof(*he));
81         }
82 }
83
84 int
85 ptllnd_history_init(void)
86 {
87         int          i;
88         ptllnd_he_t *he;
89         int          n;
90         int          rc;
91         
92         CFS_INIT_LIST_HEAD(&ptllnd_idle_history);
93         CFS_INIT_LIST_HEAD(&ptllnd_history_list);
94         
95         rc = ptllnd_parse_int_tunable(&n, "PTLLND_HISTORY", 0);
96         if (rc != 0)
97                 return rc;
98         
99         for (i = 0; i < n; i++) {
100                 LIBCFS_ALLOC(he, sizeof(*he));
101                 if (he == NULL) {
102                         ptllnd_history_fini();
103                         return -ENOMEM;
104                 }
105                 
106                 list_add(&he->he_list, &ptllnd_idle_history);
107         }
108
109         PTLLND_HISTORY("Init");
110
111         return 0;
112 }
113
114 void
115 ptllnd_history(const char *fn, const char *file, const int line,
116                const char *fmt, ...)
117 {
118         static int     seq;
119         
120         va_list        ap;
121         ptllnd_he_t   *he;
122         
123         if (!list_empty(&ptllnd_idle_history)) {
124                 he = list_entry(ptllnd_idle_history.next,
125                                 ptllnd_he_t, he_list);
126         } else if (!list_empty(&ptllnd_history_list)) {
127                 he = list_entry(ptllnd_history_list.next,
128                                 ptllnd_he_t, he_list);
129         } else {
130                 return;
131         }
132
133         list_del(&he->he_list);
134         list_add_tail(&he->he_list, &ptllnd_history_list);
135
136         he->he_seq = seq++;
137         he->he_fn = fn;
138         he->he_file = file;
139         he->he_line = line;
140         gettimeofday(&he->he_time, NULL);
141         
142         va_start(ap, fmt);
143         vsnprintf(he->he_msg, sizeof(he->he_msg), fmt, ap);
144         va_end(ap);
145 }
146
147 void
148 ptllnd_dump_history(void)
149 {
150         ptllnd_he_t    *he;
151
152         PTLLND_HISTORY("dumping...");
153         
154         while (!list_empty(&ptllnd_history_list)) {
155                 he = list_entry(ptllnd_history_list.next,
156                                 ptllnd_he_t, he_list);
157
158                 list_del(&he->he_list);
159                 
160                 CDEBUG(D_WARNING, "%d %d.%06d (%s:%d:%s()) %s\n", he->he_seq,
161                        (int)he->he_time.tv_sec, (int)he->he_time.tv_usec,
162                        he->he_file, he->he_line, he->he_fn, he->he_msg);
163
164                 list_add_tail(&he->he_list, &ptllnd_idle_history);
165         }
166
167         PTLLND_HISTORY("complete");
168 }
169
170 void 
171 ptllnd_assert_wire_constants (void)
172 {
173         /* Wire protocol assertions generated by 'wirecheck'
174          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
175          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
176
177
178         /* Constants... */
179         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
180         CLASSERT (LNET_MSG_MATCHBITS == 0);
181         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
182         CLASSERT (PTLLND_MSG_VERSION == 0x04);
183         CLASSERT (PTLLND_RDMA_OK == 0x00);
184         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
185         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
186         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
187         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
188         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
189         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
190         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
191         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
192
193         /* Checks for struct kptl_msg_t */
194         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
195         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
196         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
197         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
198         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
199         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
200         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
201         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
202         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
203         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
204         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
205         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
206         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
207         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
208         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
209         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
210         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
211         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
212         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
213         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
214         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
215         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
216         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
217         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
218         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
219         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
220         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
221         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
222         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
223         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
224         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
225
226         /* Checks for struct kptl_immediate_msg_t */
227         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
228         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
229         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
230         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
231         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
232
233         /* Checks for struct kptl_rdma_msg_t */
234         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
235         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
236         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
237         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
238         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
239
240         /* Checks for struct kptl_hello_msg_t */
241         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
242         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
243         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
244         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
245         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
246 }
247
248 int
249 ptllnd_parse_int_tunable(int *value, char *name, int dflt)
250 {
251         char    *env = getenv(name);
252         char    *end;
253
254         if (env == NULL) {
255                 *value = dflt;
256                 return 0;
257         }
258
259         *value = strtoull(env, &end, 0);
260         if (*end == 0)
261                 return 0;
262
263         CERROR("Can't parse tunable %s=%s\n", name, env);
264         return -EINVAL;
265 }
266
267 int
268 ptllnd_get_tunables(lnet_ni_t *ni)
269 {
270         ptllnd_ni_t *plni = ni->ni_data;
271         int          max_msg_size;
272         int          msgs_per_buffer;
273         int          rc;
274         int          temp;
275
276         /*  Other tunable defaults depend on this */
277         rc = ptllnd_parse_int_tunable(&plni->plni_debug, "PTLLND_DEBUG", 0);
278         if (rc != 0)
279                 return rc;
280
281         rc = ptllnd_parse_int_tunable(&plni->plni_portal,
282                                       "PTLLND_PORTAL", PTLLND_PORTAL);
283         if (rc != 0)
284                 return rc;
285
286         rc = ptllnd_parse_int_tunable(&temp,
287                                       "PTLLND_PID", PTLLND_PID);
288         if (rc != 0)
289                 return rc;
290         plni->plni_ptllnd_pid = (ptl_pid_t)temp;
291
292         rc = ptllnd_parse_int_tunable(&plni->plni_peer_credits,
293                                       "PTLLND_PEERCREDITS", PTLLND_PEERCREDITS);
294         if (rc != 0)
295                 return rc;
296
297         rc = ptllnd_parse_int_tunable(&max_msg_size,
298                                       "PTLLND_MAX_MSG_SIZE",
299                                       PTLLND_MAX_ULND_MSG_SIZE);
300         if (rc != 0)
301                 return rc;
302
303         rc = ptllnd_parse_int_tunable(&msgs_per_buffer,
304                                       "PTLLND_MSGS_PER_BUFFER", 64);
305         if (rc != 0)
306                 return rc;
307
308         rc = ptllnd_parse_int_tunable(&plni->plni_msgs_spare,
309                                       "PTLLND_MSGS_SPARE", 256);
310         if (rc != 0)
311                 return rc;
312
313         rc = ptllnd_parse_int_tunable(&plni->plni_peer_hash_size,
314                                       "PTLLND_PEER_HASH_SIZE", 101);
315         if (rc != 0)
316                 return rc;
317
318
319         rc = ptllnd_parse_int_tunable(&plni->plni_eq_size,
320                                       "PTLLND_EQ_SIZE", 1024);
321         if (rc != 0)
322                 return rc;
323
324         rc = ptllnd_parse_int_tunable(&plni->plni_checksum,
325                                       "PTLLND_CHECKSUM", 0);
326         if (rc != 0)
327                 return rc;
328
329         rc = ptllnd_parse_int_tunable(&plni->plni_max_tx_history,
330                                       "PTLLND_TX_HISTORY",
331                                       plni->plni_debug ? 1024 : 0);
332         if (rc != 0)
333                 return rc;
334
335         rc = ptllnd_parse_int_tunable(&plni->plni_abort_on_protocol_mismatch,
336                                       "PTLLND_ABORT_ON_PROTOCOL_MISMATCH", 1);
337         if (rc != 0)
338                 return rc;
339
340         rc = ptllnd_parse_int_tunable(&plni->plni_abort_on_nak,
341                                       "PTLLND_ABORT_ON_NAK", 0);
342         if (rc != 0)
343                 return rc;
344
345         rc = ptllnd_parse_int_tunable(&plni->plni_dump_on_nak,
346                                       "PTLLND_DUMP_ON_NAK", plni->plni_debug);
347         if (rc != 0)
348                 return rc;
349
350         rc = ptllnd_parse_int_tunable(&plni->plni_watchdog_interval,
351                                       "PTLLND_WATCHDOG_INTERVAL", 1);
352         if (rc != 0)
353                 return rc;
354         if (plni->plni_watchdog_interval <= 0)
355                 plni->plni_watchdog_interval = 1;
356
357         rc = ptllnd_parse_int_tunable(&plni->plni_timeout,
358                                       "PTLLND_TIMEOUT", 50);
359         if (rc != 0)
360                 return rc;
361
362         rc = ptllnd_parse_int_tunable(&plni->plni_long_wait,
363                                       "PTLLND_LONG_WAIT",
364                                       plni->plni_debug ? 5 : plni->plni_timeout);
365         if (rc != 0)
366                 return rc;
367         plni->plni_long_wait *= 1000;           /* convert to mS */
368
369         plni->plni_max_msg_size = max_msg_size & ~7;
370         if (plni->plni_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
371                 plni->plni_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
372         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
373         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
374
375         plni->plni_buffer_size = plni->plni_max_msg_size * msgs_per_buffer;
376
377         CDEBUG(D_NET, "portal          = %d\n",plni->plni_portal);
378         CDEBUG(D_NET, "ptllnd_pid      = %d\n",plni->plni_ptllnd_pid);
379         CDEBUG(D_NET, "max_msg_size    = %d\n",max_msg_size);
380         CDEBUG(D_NET, "msgs_per_buffer = %d\n",msgs_per_buffer);
381         CDEBUG(D_NET, "msgs_spare      = %d\n",plni->plni_msgs_spare);
382         CDEBUG(D_NET, "peer_hash_size  = %d\n",plni->plni_peer_hash_size);
383         CDEBUG(D_NET, "eq_size         = %d\n",plni->plni_eq_size);
384         CDEBUG(D_NET, "max_msg_size    = %d\n",plni->plni_max_msg_size);
385         CDEBUG(D_NET, "buffer_size     = %d\n",plni->plni_buffer_size);
386
387         return 0;
388 }
389
390 ptllnd_buffer_t *
391 ptllnd_create_buffer (lnet_ni_t *ni)
392 {
393         ptllnd_ni_t     *plni = ni->ni_data;
394         ptllnd_buffer_t *buf;
395
396         LIBCFS_ALLOC(buf, sizeof(*buf));
397         if (buf == NULL) {
398                 CERROR("Can't allocate buffer descriptor\n");
399                 return NULL;
400         }
401
402         buf->plb_ni = ni;
403         buf->plb_posted = 0;
404         CFS_INIT_LIST_HEAD(&buf->plb_list);
405
406         LIBCFS_ALLOC(buf->plb_buffer, plni->plni_buffer_size);
407         if (buf->plb_buffer == NULL) {
408                 CERROR("Can't allocate buffer size %d\n",
409                        plni->plni_buffer_size);
410                 LIBCFS_FREE(buf, sizeof(*buf));
411                 return NULL;
412         }
413
414         list_add(&buf->plb_list, &plni->plni_buffers);
415         plni->plni_nbuffers++;
416
417         return buf;
418 }
419
420 void
421 ptllnd_destroy_buffer (ptllnd_buffer_t *buf)
422 {
423         ptllnd_ni_t     *plni = buf->plb_ni->ni_data;
424
425         LASSERT (!buf->plb_posted);
426
427         plni->plni_nbuffers--;
428         list_del(&buf->plb_list);
429         LIBCFS_FREE(buf->plb_buffer, plni->plni_buffer_size);
430         LIBCFS_FREE(buf, sizeof(*buf));
431 }
432
433 int
434 ptllnd_size_buffers (lnet_ni_t *ni, int delta)
435 {
436         ptllnd_ni_t     *plni = ni->ni_data;
437         ptllnd_buffer_t *buf;
438         int              nmsgs;
439         int              nbufs;
440         int              rc;
441
442         CDEBUG(D_NET, "nposted_buffers = %d (before)\n",plni->plni_nposted_buffers);
443         CDEBUG(D_NET, "nbuffers = %d (before)\n",plni->plni_nbuffers);
444
445         plni->plni_nmsgs += delta;
446         LASSERT(plni->plni_nmsgs >= 0);
447         
448         nmsgs = plni->plni_nmsgs + plni->plni_msgs_spare;
449
450         nbufs = (nmsgs * plni->plni_max_msg_size + plni->plni_buffer_size - 1) /
451                 plni->plni_buffer_size;
452
453         while (nbufs > plni->plni_nbuffers) {
454                 buf = ptllnd_create_buffer(ni);
455
456                 if (buf == NULL)
457                         return -ENOMEM;
458
459                 rc = ptllnd_post_buffer(buf);
460                 if (rc != 0) {
461                         /* TODO - this path seems to orpahn the buffer
462                          * in a state where its not posted and will never be
463                          * However it does not leak the buffer as it's
464                          * already been put onto the global buffer list
465                          * and will be cleaned up
466                          */
467                         return rc;
468                 }
469         }
470
471         CDEBUG(D_NET, "nposted_buffers = %d (after)\n",plni->plni_nposted_buffers);
472         CDEBUG(D_NET, "nbuffers = %d (after)\n",plni->plni_nbuffers);
473         return 0;
474 }
475
476 void
477 ptllnd_destroy_buffers (lnet_ni_t *ni)
478 {
479         ptllnd_ni_t       *plni = ni->ni_data;
480         ptllnd_buffer_t   *buf;
481         struct list_head  *tmp;
482         struct list_head  *nxt;
483
484         CDEBUG(D_NET, "nposted_buffers = %d (before)\n",plni->plni_nposted_buffers);
485         CDEBUG(D_NET, "nbuffers = %d (before)\n",plni->plni_nbuffers);
486
487         list_for_each_safe(tmp, nxt, &plni->plni_buffers) {
488                 buf = list_entry(tmp, ptllnd_buffer_t, plb_list);
489
490                 //CDEBUG(D_NET, "buf=%p posted=%d\n",buf,buf->plb_posted);
491
492                 LASSERT (plni->plni_nbuffers > 0);
493                 if (buf->plb_posted) {
494                         time_t   start = cfs_time_current_sec();
495                         int      w = plni->plni_long_wait;
496
497                         LASSERT (plni->plni_nposted_buffers > 0);
498
499 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
500                         (void) PtlMDUnlink(buf->plb_md);
501
502                         while (buf->plb_posted) {
503                                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
504                                         CWARN("Waited %ds to unlink buffer\n",
505                                               (int)(cfs_time_current_sec() - start));
506                                         w *= 2;
507                                 }
508                                 ptllnd_wait(ni, w);
509                         }
510 #else
511                         while (buf->plb_posted) {
512                                 rc = PtlMDUnlink(buf->plb_md);
513                                 if (rc == PTL_OK) {
514                                         buf->plb_posted = 0;
515                                         plni->plni_nposted_buffers--;
516                                         break;
517                                 }
518                                 LASSERT (rc == PTL_MD_IN_USE);
519                                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
520                                         CWARN("Waited %ds to unlink buffer\n",
521                                               cfs_time_current_sec() - start);
522                                         w *= 2;
523                                 }
524                                 ptllnd_wait(ni, w);
525                         }
526 #endif
527                 }
528                 ptllnd_destroy_buffer(buf);
529         }
530
531         CDEBUG(D_NET, "nposted_buffers = %d (after)\n",plni->plni_nposted_buffers);
532         CDEBUG(D_NET, "nbuffers = %d (after)\n",plni->plni_nbuffers);
533
534         LASSERT (plni->plni_nposted_buffers == 0);
535         LASSERT (plni->plni_nbuffers == 0);
536 }
537
538 int
539 ptllnd_create_peer_hash (lnet_ni_t *ni)
540 {
541         ptllnd_ni_t *plni = ni->ni_data;
542         int          i;
543
544         plni->plni_npeers = 0;
545
546         LIBCFS_ALLOC(plni->plni_peer_hash,
547                      plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
548         if (plni->plni_peer_hash == NULL) {
549                 CERROR("Can't allocate ptllnd peer hash (size %d)\n",
550                        plni->plni_peer_hash_size);
551                 return -ENOMEM;
552         }
553
554         for (i = 0; i < plni->plni_peer_hash_size; i++)
555                 CFS_INIT_LIST_HEAD(&plni->plni_peer_hash[i]);
556
557         return 0;
558 }
559
560 void
561 ptllnd_destroy_peer_hash (lnet_ni_t *ni)
562 {
563         ptllnd_ni_t    *plni = ni->ni_data;
564         int             i;
565
566         LASSERT( plni->plni_npeers == 0);
567
568         for (i = 0; i < plni->plni_peer_hash_size; i++)
569                 LASSERT (list_empty(&plni->plni_peer_hash[i]));
570
571         LIBCFS_FREE(plni->plni_peer_hash,
572                     plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
573 }
574
575 void
576 ptllnd_close_peers (lnet_ni_t *ni)
577 {
578         ptllnd_ni_t    *plni = ni->ni_data;
579         ptllnd_peer_t  *plp;
580         int             i;
581
582         for (i = 0; i < plni->plni_peer_hash_size; i++)
583                 while (!list_empty(&plni->plni_peer_hash[i])) {
584                         plp = list_entry(plni->plni_peer_hash[i].next,
585                                          ptllnd_peer_t, plp_list);
586
587                         ptllnd_close_peer(plp, 0);
588                 }
589 }
590
591 int
592 ptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
593 {
594         switch (cmd) {
595         case IOC_LIBCFS_DEBUG_PEER:
596                 ptllnd_dump_debug(ni, *((lnet_process_id_t *)arg));
597                 return 0;
598                 
599         default:
600                 return -EINVAL;
601         }
602 }
603
604 __u64
605 ptllnd_get_timestamp(void)
606 {
607         struct timeval  tv;
608         int             rc = gettimeofday(&tv, NULL);
609
610         LASSERT (rc == 0);
611         return ((__u64)tv.tv_sec) * 1000000 + tv.tv_usec;
612 }
613
614 void
615 ptllnd_shutdown (lnet_ni_t *ni)
616 {
617         ptllnd_ni_t *plni = ni->ni_data;
618         int          rc;
619         time_t       start = cfs_time_current_sec();
620         int          w = plni->plni_long_wait;
621
622         LASSERT (ptllnd_ni_count == 1);
623         plni->plni_max_tx_history = 0;
624
625         ptllnd_cull_tx_history(plni);
626
627         ptllnd_close_peers(ni);
628         ptllnd_destroy_buffers(ni);
629
630         while (plni->plni_npeers > 0) {
631                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
632                         CWARN("Waited %ds for peers to shutdown\n",
633                               (int)(cfs_time_current_sec() - start));
634                         w *= 2;
635                 }
636                 ptllnd_wait(ni, w);
637         }
638
639         LASSERT (plni->plni_ntxs == 0);
640         LASSERT (plni->plni_nrxs == 0);
641
642         rc = PtlEQFree(plni->plni_eqh);
643         LASSERT (rc == PTL_OK);
644
645         rc = PtlNIFini(plni->plni_nih);
646         LASSERT (rc == PTL_OK);
647
648         ptllnd_destroy_peer_hash(ni);
649         LIBCFS_FREE(plni, sizeof(*plni));
650         ptllnd_ni_count--;
651 }
652
653 int
654 ptllnd_startup (lnet_ni_t *ni)
655 {
656         ptllnd_ni_t *plni;
657         int          rc;
658
659         /* could get limits from portals I guess... */
660         ni->ni_maxtxcredits =
661         ni->ni_peertxcredits = 1000;
662
663         if (ptllnd_ni_count != 0) {
664                 CERROR("Can't have > 1 instance of ptllnd\n");
665                 return -EPERM;
666         }
667
668         ptllnd_ni_count++;
669
670         rc = ptllnd_history_init();
671         if (rc != 0) {
672                 CERROR("Can't init history\n");
673                 goto failed0;
674         }
675         
676         LIBCFS_ALLOC(plni, sizeof(*plni));
677         if (plni == NULL) {
678                 CERROR("Can't allocate ptllnd state\n");
679                 rc = -ENOMEM;
680                 goto failed0;
681         }
682
683         ni->ni_data = plni;
684
685         plni->plni_stamp = ptllnd_get_timestamp();
686         plni->plni_nrxs = 0;
687         plni->plni_ntxs = 0;
688         plni->plni_ntx_history = 0;
689         plni->plni_watchdog_peeridx = 0;
690         plni->plni_watchdog_nextt = cfs_time_current_sec();
691         CFS_INIT_LIST_HEAD(&plni->plni_zombie_txs);
692         CFS_INIT_LIST_HEAD(&plni->plni_tx_history);
693
694         /*
695          *  Initilize buffer related data structures
696          */
697         CFS_INIT_LIST_HEAD(&plni->plni_buffers);
698         plni->plni_nbuffers = 0;
699         plni->plni_nposted_buffers = 0;
700
701         rc = ptllnd_get_tunables(ni);
702         if (rc != 0)
703                 goto failed1;
704
705         rc = ptllnd_create_peer_hash(ni);
706         if (rc != 0)
707                 goto failed1;
708
709         /* NB I most probably won't get the PID I requested here.  It doesn't
710          * matter because I don't need a fixed PID (only connection acceptors
711          * need a "well known" PID). */
712
713         rc = PtlNIInit(PTL_IFACE_DEFAULT, plni->plni_ptllnd_pid,
714                        NULL, NULL, &plni->plni_nih);
715         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
716                 CERROR("PtlNIInit failed: %s(%d)\n",
717                        ptllnd_errtype2str(rc), rc);
718                 rc = -ENODEV;
719                 goto failed2;
720         }
721
722         rc = PtlEQAlloc(plni->plni_nih, plni->plni_eq_size,
723                         PTL_EQ_HANDLER_NONE, &plni->plni_eqh);
724         if (rc != PTL_OK) {
725                 CERROR("PtlEQAlloc failed: %s(%d)\n",
726                        ptllnd_errtype2str(rc), rc);
727                 rc = -ENODEV;
728                 goto failed3;
729         }
730
731         /*
732          * Fetch the Portals NID
733          */
734         rc = PtlGetId(plni->plni_nih, &plni->plni_portals_id);
735         if (rc != PTL_OK) {
736                 CERROR ("PtlGetID failed : %s(%d)\n",
737                         ptllnd_errtype2str(rc), rc);
738                 rc = -EINVAL;
739                 goto failed4;
740         }
741
742         /*
743          * Create the new NID.  Based on the LND network type
744          * and the lower ni's address data.
745          */
746         ni->ni_nid = ptllnd_ptl2lnetnid(ni, plni->plni_portals_id.nid);
747
748         CDEBUG(D_NET, "ptl id  =%s\n", ptllnd_ptlid2str(plni->plni_portals_id));
749         CDEBUG(D_NET, "lnet id =%s (passed back)\n",
750                libcfs_id2str((lnet_process_id_t) {
751                        .nid = ni->ni_nid, .pid = the_lnet.ln_pid}));
752
753         rc = ptllnd_size_buffers(ni, 0);
754         if (rc != 0)
755                 goto failed4;
756
757         return 0;
758
759  failed4:
760         ptllnd_destroy_buffers(ni);
761         PtlEQFree(plni->plni_eqh);
762  failed3:
763         PtlNIFini(plni->plni_nih);
764  failed2:
765         ptllnd_destroy_peer_hash(ni);
766  failed1:
767         LIBCFS_FREE(plni, sizeof(*plni));
768  failed0:
769         ptllnd_history_fini();
770         ptllnd_ni_count--;
771         CDEBUG(D_NET, "<<< rc=%d\n",rc);
772         return rc;
773 }
774
775 const char *ptllnd_evtype2str(int type)
776 {
777 #define DO_TYPE(x) case x: return #x;
778         switch(type)
779         {
780                 DO_TYPE(PTL_EVENT_GET_START);
781                 DO_TYPE(PTL_EVENT_GET_END);
782                 DO_TYPE(PTL_EVENT_PUT_START);
783                 DO_TYPE(PTL_EVENT_PUT_END);
784                 DO_TYPE(PTL_EVENT_REPLY_START);
785                 DO_TYPE(PTL_EVENT_REPLY_END);
786                 DO_TYPE(PTL_EVENT_ACK);
787                 DO_TYPE(PTL_EVENT_SEND_START);
788                 DO_TYPE(PTL_EVENT_SEND_END);
789                 DO_TYPE(PTL_EVENT_UNLINK);
790         default:
791                 return "<unknown event type>";
792         }
793 #undef DO_TYPE
794 }
795
796 const char *ptllnd_msgtype2str(int type)
797 {
798 #define DO_TYPE(x) case x: return #x;
799         switch(type)
800         {
801                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
802                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
803                 DO_TYPE(PTLLND_MSG_TYPE_GET);
804                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
805                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
806                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
807                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
808         default:
809                 return "<unknown msg type>";
810         }
811 #undef DO_TYPE
812 }
813
814 const char *ptllnd_errtype2str(int type)
815 {
816 #define DO_TYPE(x) case x: return #x;
817         switch(type)
818         {
819                 DO_TYPE(PTL_OK);
820                 DO_TYPE(PTL_SEGV);
821                 DO_TYPE(PTL_NO_SPACE);
822                 DO_TYPE(PTL_ME_IN_USE);
823                 DO_TYPE(PTL_NAL_FAILED);
824                 DO_TYPE(PTL_NO_INIT);
825                 DO_TYPE(PTL_IFACE_DUP);
826                 DO_TYPE(PTL_IFACE_INVALID);
827                 DO_TYPE(PTL_HANDLE_INVALID);
828                 DO_TYPE(PTL_MD_INVALID);
829                 DO_TYPE(PTL_ME_INVALID);
830                 DO_TYPE(PTL_PROCESS_INVALID);
831                 DO_TYPE(PTL_PT_INDEX_INVALID);
832                 DO_TYPE(PTL_SR_INDEX_INVALID);
833                 DO_TYPE(PTL_EQ_INVALID);
834                 DO_TYPE(PTL_EQ_DROPPED);
835                 DO_TYPE(PTL_EQ_EMPTY);
836                 DO_TYPE(PTL_MD_NO_UPDATE);
837                 DO_TYPE(PTL_FAIL);
838                 DO_TYPE(PTL_AC_INDEX_INVALID);
839                 DO_TYPE(PTL_MD_ILLEGAL);
840                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
841                 DO_TYPE(PTL_MD_IN_USE);
842                 DO_TYPE(PTL_NI_INVALID);
843                 DO_TYPE(PTL_PID_INVALID);
844                 DO_TYPE(PTL_PT_FULL);
845                 DO_TYPE(PTL_VAL_FAILED);
846                 DO_TYPE(PTL_NOT_IMPLEMENTED);
847                 DO_TYPE(PTL_NO_ACK);
848                 DO_TYPE(PTL_EQ_IN_USE);
849                 DO_TYPE(PTL_PID_IN_USE);
850                 DO_TYPE(PTL_INV_EQ_SIZE);
851                 DO_TYPE(PTL_AGAIN);
852         default:
853                 return "<unknown error type>";
854         }
855 #undef DO_TYPE
856 }