Whamcloud - gitweb
Severity : enhancement
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd.c
1
2 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
6  *   Author: Eric Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21
22 lnd_t               the_ptllnd = {
23         .lnd_type       = PTLLND,
24         .lnd_startup    = ptllnd_startup,
25         .lnd_shutdown   = ptllnd_shutdown,
26         .lnd_ctl        = ptllnd_ctl,
27         .lnd_send       = ptllnd_send,
28         .lnd_recv       = ptllnd_recv,
29         .lnd_eager_recv = ptllnd_eager_recv,
30         .lnd_notify     = ptllnd_notify,
31         .lnd_wait       = ptllnd_wait,
32 };
33
34 static int ptllnd_ni_count = 0;
35
36 static struct list_head ptllnd_idle_history;
37 static struct list_head ptllnd_history_list;
38
39 void
40 ptllnd_history_fini(void)
41 {
42         ptllnd_he_t *he;
43
44         while (!list_empty(&ptllnd_idle_history)) {
45                 he = list_entry(ptllnd_idle_history.next,
46                                 ptllnd_he_t, he_list);
47                 
48                 list_del(&he->he_list);
49                 LIBCFS_FREE(he, sizeof(*he));
50         }
51         
52         while (!list_empty(&ptllnd_history_list)) {
53                 he = list_entry(ptllnd_history_list.next,
54                                 ptllnd_he_t, he_list);
55                 
56                 list_del(&he->he_list);
57                 LIBCFS_FREE(he, sizeof(*he));
58         }
59 }
60
61 int
62 ptllnd_history_init(void)
63 {
64         int          i;
65         ptllnd_he_t *he;
66         int          n;
67         int          rc;
68         
69         CFS_INIT_LIST_HEAD(&ptllnd_idle_history);
70         CFS_INIT_LIST_HEAD(&ptllnd_history_list);
71         
72         rc = ptllnd_parse_int_tunable(&n, "PTLLND_HISTORY", 0);
73         if (rc != 0)
74                 return rc;
75         
76         for (i = 0; i < n; i++) {
77                 LIBCFS_ALLOC(he, sizeof(*he));
78                 if (he == NULL) {
79                         ptllnd_history_fini();
80                         return -ENOMEM;
81                 }
82                 
83                 list_add(&he->he_list, &ptllnd_idle_history);
84         }
85
86         return 0;
87 }
88
89 void
90 ptllnd_history(const char *fn, const char *file, const int line,
91                const char *fmt, ...)
92 {
93         static int     seq;
94         
95         va_list        ap;
96         ptllnd_he_t   *he;
97         
98         if (!list_empty(&ptllnd_idle_history)) {
99                 he = list_entry(ptllnd_idle_history.next,
100                                 ptllnd_he_t, he_list);
101         } else if (!list_empty(&ptllnd_history_list)) {
102                 he = list_entry(ptllnd_history_list.next,
103                                 ptllnd_he_t, he_list);
104         } else {
105                 return;
106         }
107
108         list_del(&he->he_list);
109         list_add_tail(&he->he_list, &ptllnd_history_list);
110
111         he->he_seq = seq++;
112         he->he_fn = fn;
113         he->he_file = file;
114         he->he_line = line;
115         gettimeofday(&he->he_time, NULL);
116         
117         va_start(ap, fmt);
118         vsnprintf(he->he_msg, sizeof(he->he_msg), fmt, ap);
119         va_end(ap);
120 }
121
122 void
123 ptllnd_dump_history(void)
124 {
125         ptllnd_he_t    *he;
126         
127         while (!list_empty(&ptllnd_history_list)) {
128                 he = list_entry(ptllnd_history_list.next,
129                                 ptllnd_he_t, he_list);
130
131                 list_del(&he->he_list);
132                 
133                 CDEBUG(D_WARNING, "%d %d.%06d (%s:%d:%s()) %s\n", he->he_seq,
134                        (int)he->he_time.tv_sec, (int)he->he_time.tv_usec,
135                        he->he_file, he->he_line, he->he_fn, he->he_msg);
136
137                 list_add_tail(&he->he_list, &ptllnd_idle_history);
138         }
139 }
140
141 void 
142 ptllnd_assert_wire_constants (void)
143 {
144         /* Wire protocol assertions generated by 'wirecheck'
145          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
146          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
147
148
149         /* Constants... */
150         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
151         CLASSERT (LNET_MSG_MATCHBITS == 0);
152         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
153         CLASSERT (PTLLND_MSG_VERSION == 0x04);
154         CLASSERT (PTLLND_RDMA_OK == 0x00);
155         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
156         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
157         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
158         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
159         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
160         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
161         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
162         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
163
164         /* Checks for struct kptl_msg_t */
165         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
166         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
167         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
168         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
169         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
170         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
171         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
172         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
173         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
174         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
175         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
176         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
177         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
178         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
179         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
180         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
181         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
182         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
183         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
184         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
185         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
186         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
187         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
188         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
189         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
190         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
191         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
192         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
193         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
194         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
195         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
196
197         /* Checks for struct kptl_immediate_msg_t */
198         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
199         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
200         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
201         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
202         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
203
204         /* Checks for struct kptl_rdma_msg_t */
205         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
206         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
207         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
208         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
209         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
210
211         /* Checks for struct kptl_hello_msg_t */
212         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
213         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
214         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
215         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
216         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
217 }
218
219 int
220 ptllnd_parse_int_tunable(int *value, char *name, int dflt)
221 {
222         char    *env = getenv(name);
223         char    *end;
224
225         if (env == NULL) {
226                 *value = dflt;
227                 return 0;
228         }
229
230         *value = strtoull(env, &end, 0);
231         if (*end == 0)
232                 return 0;
233
234         CERROR("Can't parse tunable %s=%s\n", name, env);
235         return -EINVAL;
236 }
237
238 int
239 ptllnd_get_tunables(lnet_ni_t *ni)
240 {
241         ptllnd_ni_t *plni = ni->ni_data;
242         int          max_msg_size;
243         int          msgs_per_buffer;
244         int          rc;
245         int          temp;
246
247         rc = ptllnd_parse_int_tunable(&plni->plni_portal,
248                                       "PTLLND_PORTAL", PTLLND_PORTAL);
249         if (rc != 0)
250                 return rc;
251
252         rc = ptllnd_parse_int_tunable(&temp,
253                                       "PTLLND_PID", PTLLND_PID);
254         if (rc != 0)
255                 return rc;
256         plni->plni_ptllnd_pid = (ptl_pid_t)temp;
257
258         rc = ptllnd_parse_int_tunable(&plni->plni_peer_credits,
259                                       "PTLLND_PEERCREDITS", PTLLND_PEERCREDITS);
260         if (rc != 0)
261                 return rc;
262
263         rc = ptllnd_parse_int_tunable(&max_msg_size,
264                                       "PTLLND_MAX_MSG_SIZE",
265                                       PTLLND_MAX_MSG_SIZE);
266         if (rc != 0)
267                 return rc;
268
269         rc = ptllnd_parse_int_tunable(&msgs_per_buffer,
270                                       "PTLLND_MSGS_PER_BUFFER",
271                                       PTLLND_MSGS_PER_BUFFER);
272         if (rc != 0)
273                 return rc;
274
275         rc = ptllnd_parse_int_tunable(&plni->plni_msgs_spare,
276                                       "PTLLND_MSGS_SPARE",
277                                       PTLLND_MSGS_SPARE);
278         if (rc != 0)
279                 return rc;
280
281         rc = ptllnd_parse_int_tunable(&plni->plni_peer_hash_size,
282                                       "PTLLND_PEER_HASH_SIZE",
283                                       PTLLND_PEER_HASH_SIZE);
284         if (rc != 0)
285                 return rc;
286
287
288         rc = ptllnd_parse_int_tunable(&plni->plni_eq_size,
289                                       "PTLLND_EQ_SIZE", PTLLND_EQ_SIZE);
290         if (rc != 0)
291                 return rc;
292
293         rc = ptllnd_parse_int_tunable(&plni->plni_checksum,
294                                       "PTLLND_CHECKSUM", 0);
295         if (rc != 0)
296                 return rc;
297
298         rc = ptllnd_parse_int_tunable(&plni->plni_max_tx_history,
299                                       "PTLLND_TX_HISTORY", PTLLND_TX_HISTORY);
300         if (rc != 0)
301                 return rc;
302
303         rc = ptllnd_parse_int_tunable(&plni->plni_abort_on_nak,
304                                       "PTLLND_ABORT_ON_NAK",
305                                       PTLLND_ABORT_ON_NAK);
306         if (rc != 0)
307                 return rc;
308
309         plni->plni_max_msg_size = max_msg_size & ~7;
310         if (plni->plni_max_msg_size < sizeof(kptl_msg_t))
311                 plni->plni_max_msg_size = (sizeof(kptl_msg_t) + 7) & ~7;
312
313         plni->plni_buffer_size = plni->plni_max_msg_size * msgs_per_buffer;
314
315         CDEBUG(D_NET, "portal          = %d\n",plni->plni_portal);
316         CDEBUG(D_NET, "ptllnd_pid      = %d\n",plni->plni_ptllnd_pid);
317         CDEBUG(D_NET, "max_msg_size    = %d\n",max_msg_size);
318         CDEBUG(D_NET, "msgs_per_buffer = %d\n",msgs_per_buffer);
319         CDEBUG(D_NET, "msgs_spare      = %d\n",plni->plni_msgs_spare);
320         CDEBUG(D_NET, "peer_hash_size  = %d\n",plni->plni_peer_hash_size);
321         CDEBUG(D_NET, "eq_size         = %d\n",plni->plni_eq_size);
322         CDEBUG(D_NET, "max_msg_size    = %d\n",plni->plni_max_msg_size);
323         CDEBUG(D_NET, "buffer_size     = %d\n",plni->plni_buffer_size);
324
325         return 0;
326 }
327
328 ptllnd_buffer_t *
329 ptllnd_create_buffer (lnet_ni_t *ni)
330 {
331         ptllnd_ni_t     *plni = ni->ni_data;
332         ptllnd_buffer_t *buf;
333
334         LIBCFS_ALLOC(buf, sizeof(*buf));
335         if (buf == NULL) {
336                 CERROR("Can't allocate buffer descriptor\n");
337                 return NULL;
338         }
339
340         buf->plb_ni = ni;
341         buf->plb_posted = 0;
342         CFS_INIT_LIST_HEAD(&buf->plb_list);
343
344         LIBCFS_ALLOC(buf->plb_buffer, plni->plni_buffer_size);
345         if (buf->plb_buffer == NULL) {
346                 CERROR("Can't allocate buffer size %d\n",
347                        plni->plni_buffer_size);
348                 LIBCFS_FREE(buf, sizeof(*buf));
349                 return NULL;
350         }
351
352         list_add(&buf->plb_list, &plni->plni_buffers);
353         plni->plni_nbuffers++;
354
355         return buf;
356 }
357
358 void
359 ptllnd_destroy_buffer (ptllnd_buffer_t *buf)
360 {
361         ptllnd_ni_t     *plni = buf->plb_ni->ni_data;
362
363         LASSERT (!buf->plb_posted);
364
365         plni->plni_nbuffers--;
366         list_del(&buf->plb_list);
367         LIBCFS_FREE(buf->plb_buffer, plni->plni_buffer_size);
368         LIBCFS_FREE(buf, sizeof(*buf));
369 }
370
371 int
372 ptllnd_grow_buffers (lnet_ni_t *ni)
373 {
374         ptllnd_ni_t     *plni = ni->ni_data;
375         ptllnd_buffer_t *buf;
376         int              nmsgs;
377         int              nbufs;
378         int              rc;
379
380         CDEBUG(D_NET, "nposted_buffers = %d (before)\n",plni->plni_nposted_buffers);
381         CDEBUG(D_NET, "nbuffers = %d (before)\n",plni->plni_nbuffers);
382
383         nmsgs = plni->plni_npeers * plni->plni_peer_credits +
384                 plni->plni_msgs_spare;
385
386         nbufs = (nmsgs * plni->plni_max_msg_size + plni->plni_buffer_size - 1) /
387                 plni->plni_buffer_size;
388
389         while (nbufs > plni->plni_nbuffers) {
390                 buf = ptllnd_create_buffer(ni);
391
392                 if (buf == NULL)
393                         return -ENOMEM;
394
395                 rc = ptllnd_post_buffer(buf);
396                 if (rc != 0){
397                         /* TODO - this path seems to orpahn the buffer
398                          * in a state where its not posted and will never be
399                          * However it does not leak the buffer as it's
400                          * already been put onto the global buffer list
401                          * and will be cleaned up
402                          */
403                         return rc;
404                 }
405         }
406
407         CDEBUG(D_NET, "nposted_buffers = %d (after)\n",plni->plni_nposted_buffers);
408         CDEBUG(D_NET, "nbuffers = %d (after)\n",plni->plni_nbuffers);
409         return 0;
410 }
411
412 void
413 ptllnd_destroy_buffers (lnet_ni_t *ni)
414 {
415         ptllnd_ni_t       *plni = ni->ni_data;
416         ptllnd_buffer_t   *buf;
417         struct list_head  *tmp;
418         struct list_head  *nxt;
419
420         CDEBUG(D_NET, "nposted_buffers = %d (before)\n",plni->plni_nposted_buffers);
421         CDEBUG(D_NET, "nbuffers = %d (before)\n",plni->plni_nbuffers);
422
423         list_for_each_safe(tmp, nxt, &plni->plni_buffers) {
424                 buf = list_entry(tmp, ptllnd_buffer_t, plb_list);
425
426                 //CDEBUG(D_NET, "buf=%p posted=%d\n",buf,buf->plb_posted);
427
428                 LASSERT (plni->plni_nbuffers > 0);
429                 if (buf->plb_posted) {
430                         time_t   start = cfs_time_current_sec();
431                         int      w = PTLLND_WARN_LONG_WAIT;
432                         
433                         LASSERT (plni->plni_nposted_buffers > 0);
434
435 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
436                         (void) PtlMDUnlink(buf->plb_md);
437
438                         while (buf->plb_posted) {
439                                 if (cfs_time_current_sec() > start + w) {
440                                         CWARN("Waited %ds to unlink buffer\n", w);
441                                         w *= 2;
442                                 }
443                                 ptllnd_wait(ni, w*1000);
444                         }
445 #else
446                         while (buf->plb_posted) {
447                                 rc = PtlMDUnlink(buf->plb_md);
448                                 if (rc == PTL_OK) {
449                                         buf->plb_posted = 0;
450                                         plni->plni_nposted_buffers--;
451                                         break;
452                                 }
453                                 LASSERT (rc == PTL_MD_IN_USE);
454                                 if (cfs_time_current_sec() > start + w) {
455                                         CWARN("Waited %ds to unlink buffer\n", w);
456                                         w *= 2;
457                                 }
458                                 ptllnd_wait(ni, w*1000);
459                         }
460 #endif
461                 }
462                 ptllnd_destroy_buffer(buf);
463         }
464
465         CDEBUG(D_NET, "nposted_buffers = %d (after)\n",plni->plni_nposted_buffers);
466         CDEBUG(D_NET, "nbuffers = %d (after)\n",plni->plni_nbuffers);
467
468         LASSERT (plni->plni_nposted_buffers == 0);
469         LASSERT (plni->plni_nbuffers == 0);
470 }
471
472 int
473 ptllnd_create_peer_hash (lnet_ni_t *ni)
474 {
475         ptllnd_ni_t *plni = ni->ni_data;
476         int          i;
477
478         plni->plni_npeers = 0;
479
480         LIBCFS_ALLOC(plni->plni_peer_hash,
481                      plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
482         if (plni->plni_peer_hash == NULL) {
483                 CERROR("Can't allocate ptllnd peer hash (size %d)\n",
484                        plni->plni_peer_hash_size);
485                 return -ENOMEM;
486         }
487
488         for (i = 0; i < plni->plni_peer_hash_size; i++)
489                 CFS_INIT_LIST_HEAD(&plni->plni_peer_hash[i]);
490
491         return 0;
492 }
493
494 void
495 ptllnd_destroy_peer_hash (lnet_ni_t *ni)
496 {
497         ptllnd_ni_t    *plni = ni->ni_data;
498         int             i;
499
500         LASSERT( plni->plni_npeers == 0);
501
502         for (i = 0; i < plni->plni_peer_hash_size; i++)
503                 LASSERT (list_empty(&plni->plni_peer_hash[i]));
504
505         LIBCFS_FREE(plni->plni_peer_hash,
506                     plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
507 }
508
509 void
510 ptllnd_close_peers (lnet_ni_t *ni)
511 {
512         ptllnd_ni_t    *plni = ni->ni_data;
513         ptllnd_peer_t  *plp;
514         int             i;
515
516         for (i = 0; i < plni->plni_peer_hash_size; i++)
517                 while (!list_empty(&plni->plni_peer_hash[i])) {
518                         plp = list_entry(plni->plni_peer_hash[i].next,
519                                          ptllnd_peer_t, plp_list);
520
521                         ptllnd_close_peer(plp, 0);
522                 }
523 }
524
525 int
526 ptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
527 {
528         switch (cmd) {
529         case IOC_LIBCFS_DEBUG_PEER:
530                 ptllnd_debug_peer(ni, *((lnet_process_id_t *)arg));
531                 return 0;
532                 
533         default:
534                 return -EINVAL;
535         }
536 }
537
538 __u64
539 ptllnd_get_timestamp(void)
540 {
541         struct timeval  tv;
542         int             rc = gettimeofday(&tv, NULL);
543
544         LASSERT (rc == 0);
545         return ((__u64)tv.tv_sec) * 1000000 + tv.tv_usec;
546 }
547
548 void
549 ptllnd_shutdown (lnet_ni_t *ni)
550 {
551         ptllnd_ni_t *plni = ni->ni_data;
552         int          rc;
553         time_t       start = cfs_time_current_sec();
554         int          w = PTLLND_WARN_LONG_WAIT;
555
556         LASSERT (ptllnd_ni_count == 1);
557         plni->plni_max_tx_history = 0;
558
559         ptllnd_cull_tx_history(plni);
560
561         ptllnd_destroy_buffers(ni);
562         ptllnd_close_peers(ni);
563
564         while (plni->plni_npeers > 0) {
565                 if (cfs_time_current_sec() > start + w) {
566                         CWARN("Waited %ds for peers to shutdown\n", w);
567                         w *= 2;
568                 }
569                 ptllnd_wait(ni, w*1000);
570         }
571
572         LASSERT (plni->plni_ntxs == 0);
573         LASSERT (plni->plni_nrxs == 0);
574
575         rc = PtlEQFree(plni->plni_eqh);
576         LASSERT (rc == PTL_OK);
577
578         rc = PtlNIFini(plni->plni_nih);
579         LASSERT (rc == PTL_OK);
580
581         ptllnd_destroy_peer_hash(ni);
582         LIBCFS_FREE(plni, sizeof(*plni));
583         ptllnd_ni_count--;
584 }
585
586 int
587 ptllnd_startup (lnet_ni_t *ni)
588 {
589         ptllnd_ni_t *plni;
590         int          rc;
591
592         /* could get limits from portals I guess... */
593         ni->ni_maxtxcredits =
594         ni->ni_peertxcredits = 1000;
595
596         if (ptllnd_ni_count != 0) {
597                 CERROR("Can't have > 1 instance of ptllnd\n");
598                 return -EPERM;
599         }
600
601         ptllnd_ni_count++;
602
603         rc = ptllnd_history_init();
604         if (rc != 0) {
605                 CERROR("Can't init history\n");
606                 goto failed0;
607         }
608         
609         LIBCFS_ALLOC(plni, sizeof(*plni));
610         if (plni == NULL) {
611                 CERROR("Can't allocate ptllnd state\n");
612                 rc = -ENOMEM;
613                 goto failed0;
614         }
615
616         ni->ni_data = plni;
617
618         plni->plni_stamp = ptllnd_get_timestamp();
619         plni->plni_nrxs = 0;
620         plni->plni_ntxs = 0;
621         plni->plni_ntx_history = 0;
622         CFS_INIT_LIST_HEAD(&plni->plni_zombie_txs);
623         CFS_INIT_LIST_HEAD(&plni->plni_tx_history);
624
625         /*
626          *  Initilize buffer related data structures
627          */
628         CFS_INIT_LIST_HEAD(&plni->plni_buffers);
629         plni->plni_nbuffers = 0;
630         plni->plni_nposted_buffers = 0;
631
632         rc = ptllnd_get_tunables(ni);
633         if (rc != 0)
634                 goto failed1;
635
636         rc = ptllnd_create_peer_hash(ni);
637         if (rc != 0)
638                 goto failed1;
639
640         /* NB I most probably won't get the PID I requested here.  It doesn't
641          * matter because I don't need a fixed PID (only connection acceptors
642          * need a "well known" PID). */
643
644         rc = PtlNIInit(PTL_IFACE_DEFAULT, plni->plni_ptllnd_pid,
645                        NULL, NULL, &plni->plni_nih);
646         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
647                 CERROR("PtlNIInit failed: %d\n", rc);
648                 rc = -ENODEV;
649                 goto failed2;
650         }
651
652         rc = PtlEQAlloc(plni->plni_nih, plni->plni_eq_size,
653                         PTL_EQ_HANDLER_NONE, &plni->plni_eqh);
654         if (rc != PTL_OK) {
655                 CERROR("PtlEQAlloc failed: %d\n", rc);
656                 rc = -ENODEV;
657                 goto failed3;
658         }
659
660         /*
661          * Fetch the Portals NID
662          */
663         if(rc != PtlGetId(plni->plni_nih,&plni->plni_portals_id)){
664                 CERROR ("PtlGetID failed : %d\n", rc);
665                 rc = -EINVAL;
666                 goto failed4;
667         }
668
669         CDEBUG(D_NET, "lnet nid=" LPX64 " (passed in)\n",ni->ni_nid);
670
671         /*
672          * Create the new NID.  Based on the LND network type
673          * and the lower ni's address data.
674          */
675         ni->ni_nid = ptllnd_ptl2lnetnid(ni, plni->plni_portals_id.nid);
676
677         CDEBUG(D_NET, "ptl id  =%s\n", ptllnd_ptlid2str(plni->plni_portals_id));
678         CDEBUG(D_NET, "lnet id =%s (passed back)\n",
679                libcfs_id2str((lnet_process_id_t) {
680                        .nid = ni->ni_nid, .pid = the_lnet.ln_pid}));
681
682         rc = ptllnd_grow_buffers(ni);
683         if (rc != 0)
684                 goto failed4;
685
686         return 0;
687
688  failed4:
689         ptllnd_destroy_buffers(ni);
690         PtlEQFree(plni->plni_eqh);
691  failed3:
692         PtlNIFini(plni->plni_nih);
693  failed2:
694         ptllnd_destroy_peer_hash(ni);
695  failed1:
696         LIBCFS_FREE(plni, sizeof(*plni));
697  failed0:
698         ptllnd_history_fini();
699         ptllnd_ni_count--;
700         CDEBUG(D_NET, "<<< rc=%d\n",rc);
701         return rc;
702 }
703
704 const char *ptllnd_evtype2str(int type)
705 {
706 #define DO_TYPE(x) case x: return #x;
707         switch(type)
708         {
709                 DO_TYPE(PTL_EVENT_GET_START);
710                 DO_TYPE(PTL_EVENT_GET_END);
711                 DO_TYPE(PTL_EVENT_PUT_START);
712                 DO_TYPE(PTL_EVENT_PUT_END);
713                 DO_TYPE(PTL_EVENT_REPLY_START);
714                 DO_TYPE(PTL_EVENT_REPLY_END);
715                 DO_TYPE(PTL_EVENT_ACK);
716                 DO_TYPE(PTL_EVENT_SEND_START);
717                 DO_TYPE(PTL_EVENT_SEND_END);
718                 DO_TYPE(PTL_EVENT_UNLINK);
719         default:
720                 return "";
721         }
722 #undef DO_TYPE
723 }
724
725 const char *ptllnd_msgtype2str(int type)
726 {
727 #define DO_TYPE(x) case x: return #x;
728         switch(type)
729         {
730                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
731                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
732                 DO_TYPE(PTLLND_MSG_TYPE_GET);
733                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
734                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
735                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
736                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
737         default:
738                 return "";
739         }
740 #undef DO_TYPE
741 }