Whamcloud - gitweb
7f2e959b10855626f8cfe0d052c4a7558bf07453
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41 #include "ptllnd.h"
42
43 lnd_t kptllnd_lnd = {
44         .lnd_type       = PTLLND,
45         .lnd_startup    = kptllnd_startup,
46         .lnd_shutdown   = kptllnd_shutdown,
47         .lnd_ctl        = kptllnd_ctl,
48         .lnd_send       = kptllnd_send,
49         .lnd_recv       = kptllnd_recv,
50         .lnd_eager_recv = kptllnd_eager_recv,
51 };
52
53 kptl_data_t kptllnd_data;
54
55 char *
56 kptllnd_ptlid2str(ptl_process_id_t id)
57 {
58         static char    strs[64][32];
59         static int     idx = 0;
60
61         unsigned long  flags;
62         char          *str;
63         
64         spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
65         str = strs[idx++];
66         if (idx >= sizeof(strs)/sizeof(strs[0]))
67                 idx = 0;
68         spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
69
70         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
71         return str;
72 }
73
74 void 
75 kptllnd_assert_wire_constants (void)
76 {
77         /* Wire protocol assertions generated by 'wirecheck'
78          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
79          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
80
81
82         /* Constants... */
83         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
84         CLASSERT (LNET_MSG_MATCHBITS == 0);
85         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
86         CLASSERT (PTLLND_MSG_VERSION == 0x04);
87         CLASSERT (PTLLND_RDMA_OK == 0x00);
88         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
89         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
90         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
91         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
92         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
93         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
94         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
95         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
96
97         /* Checks for struct kptl_msg_t */
98         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
99         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
100         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
101         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
102         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
103         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
104         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
105         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
106         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
107         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
108         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
109         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
110         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
111         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
112         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
113         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
114         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
115         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
116         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
117         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
118         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
119         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
120         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
121         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
122         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
123         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
124         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
125         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
126         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
127         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
128         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
129
130         /* Checks for struct kptl_immediate_msg_t */
131         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
132         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
133         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
134         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
135         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
136
137         /* Checks for struct kptl_rdma_msg_t */
138         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
139         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
140         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
141         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
142         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
143
144         /* Checks for struct kptl_hello_msg_t */
145         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
146         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
147         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
148         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
149         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
150 }
151
152 const char *kptllnd_evtype2str(int type)
153 {
154 #define DO_TYPE(x) case x: return #x;
155         switch(type)
156         {
157                 DO_TYPE(PTL_EVENT_GET_START);
158                 DO_TYPE(PTL_EVENT_GET_END);
159                 DO_TYPE(PTL_EVENT_PUT_START);
160                 DO_TYPE(PTL_EVENT_PUT_END);
161                 DO_TYPE(PTL_EVENT_REPLY_START);
162                 DO_TYPE(PTL_EVENT_REPLY_END);
163                 DO_TYPE(PTL_EVENT_ACK);
164                 DO_TYPE(PTL_EVENT_SEND_START);
165                 DO_TYPE(PTL_EVENT_SEND_END);
166                 DO_TYPE(PTL_EVENT_UNLINK);
167         default:
168                 return "<unknown event type>";
169         }
170 #undef DO_TYPE
171 }
172
173 const char *kptllnd_msgtype2str(int type)
174 {
175 #define DO_TYPE(x) case x: return #x;
176         switch(type)
177         {
178                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
179                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
180                 DO_TYPE(PTLLND_MSG_TYPE_GET);
181                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
182                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
183                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
184                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
185         default:
186                 return "<unknown msg type>";
187         }
188 #undef DO_TYPE
189 }
190
191 const char *kptllnd_errtype2str(int type)
192 {
193 #define DO_TYPE(x) case x: return #x;
194         switch(type)
195         {
196                 DO_TYPE(PTL_OK);
197                 DO_TYPE(PTL_SEGV);
198                 DO_TYPE(PTL_NO_SPACE);
199                 DO_TYPE(PTL_ME_IN_USE);
200                 DO_TYPE(PTL_NAL_FAILED);
201                 DO_TYPE(PTL_NO_INIT);
202                 DO_TYPE(PTL_IFACE_DUP);
203                 DO_TYPE(PTL_IFACE_INVALID);
204                 DO_TYPE(PTL_HANDLE_INVALID);
205                 DO_TYPE(PTL_MD_INVALID);
206                 DO_TYPE(PTL_ME_INVALID);
207                 DO_TYPE(PTL_PROCESS_INVALID);
208                 DO_TYPE(PTL_PT_INDEX_INVALID);
209                 DO_TYPE(PTL_SR_INDEX_INVALID);
210                 DO_TYPE(PTL_EQ_INVALID);
211                 DO_TYPE(PTL_EQ_DROPPED);
212                 DO_TYPE(PTL_EQ_EMPTY);
213                 DO_TYPE(PTL_MD_NO_UPDATE);
214                 DO_TYPE(PTL_FAIL);
215                 DO_TYPE(PTL_AC_INDEX_INVALID);
216                 DO_TYPE(PTL_MD_ILLEGAL);
217                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
218                 DO_TYPE(PTL_MD_IN_USE);
219                 DO_TYPE(PTL_NI_INVALID);
220                 DO_TYPE(PTL_PID_INVALID);
221                 DO_TYPE(PTL_PT_FULL);
222                 DO_TYPE(PTL_VAL_FAILED);
223                 DO_TYPE(PTL_NOT_IMPLEMENTED);
224                 DO_TYPE(PTL_NO_ACK);
225                 DO_TYPE(PTL_EQ_IN_USE);
226                 DO_TYPE(PTL_PID_IN_USE);
227                 DO_TYPE(PTL_INV_EQ_SIZE);
228                 DO_TYPE(PTL_AGAIN);
229         default:
230                 return "<unknown event type>";
231         }
232 #undef DO_TYPE
233 }
234
235 __u32
236 kptllnd_cksum (void *ptr, int nob)
237 {
238         char  *c  = ptr;
239         __u32  sum = 0;
240
241         while (nob-- > 0)
242                 sum = ((sum << 1) | (sum >> 31)) + *c++;
243
244         /* ensure I don't return 0 (== no checksum) */
245         return (sum == 0) ? 1 : sum;
246 }
247
248 void
249 kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
250 {
251         msg->ptlm_type = type;
252         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
253         
254         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
255 }
256
257 void
258 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
259 {
260         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
261         msg->ptlm_version  = PTLLND_MSG_VERSION;
262         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
263         msg->ptlm_credits  = peer->peer_outstanding_credits;
264         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
265         msg->ptlm_cksum    = 0;
266         msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
267         msg->ptlm_srcstamp = peer->peer_myincarnation;
268         msg->ptlm_dstnid   = peer->peer_id.nid;
269         msg->ptlm_dststamp = peer->peer_incarnation;
270         msg->ptlm_srcpid   = the_lnet.ln_pid;
271         msg->ptlm_dstpid   = peer->peer_id.pid;
272
273         if (*kptllnd_tunables.kptl_checksum) {
274                 /* NB ptlm_cksum zero while computing cksum */
275                 msg->ptlm_cksum = kptllnd_cksum(msg, 
276                                                 offsetof(kptl_msg_t, ptlm_u));
277         }
278 }
279
280 int
281 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
282 {
283         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
284         __u32     msg_cksum;
285         __u16     msg_version;
286         int       flip;
287
288         /* 6 bytes are enough to have received magic + version */
289         if (nob < 6) {
290                 CERROR("Very Short message: %d\n", nob);
291                 return -EPROTO;
292         }
293
294         /*
295          * Determine if we need to flip
296          */
297         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
298                 flip = 0;
299         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
300                 flip = 1;
301         } else {
302                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
303                 return -EPROTO;
304         }
305
306         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
307
308         if (msg_version != PTLLND_MSG_VERSION) {
309                 CERROR("Bad version: got %04x expected %04x\n",
310                         (__u32)msg_version, PTLLND_MSG_VERSION);
311                 return -EPROTO;
312         }
313
314         if (nob < hdr_size) {
315                 CERROR("Short message: got %d, wanted at least %d\n",
316                        nob, hdr_size);
317                 return -EPROTO;
318         }
319
320         /* checksum must be computed with
321          * 1) ptlm_cksum zero and
322          * 2) BEFORE anything gets modified/flipped
323          */
324         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
325         msg->ptlm_cksum = 0;
326         if (msg_cksum != 0 &&
327             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
328                 CERROR("Bad checksum\n");
329                 return -EPROTO;
330         }
331
332         msg->ptlm_version = msg_version;
333         msg->ptlm_cksum = msg_cksum;
334         
335         if (flip) {
336                 /* These two are 1 byte long so we don't swap them
337                    But check this assumtion*/
338                 CLASSERT (sizeof(msg->ptlm_type) == 1);
339                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
340                 /* src & dst stamps are opaque cookies */
341                 __swab32s(&msg->ptlm_nob);
342                 __swab64s(&msg->ptlm_srcnid);
343                 __swab64s(&msg->ptlm_dstnid);
344                 __swab32s(&msg->ptlm_srcpid);
345                 __swab32s(&msg->ptlm_dstpid);
346         }
347
348         if (msg->ptlm_nob != nob) {
349                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
350                        msg->ptlm_nob, nob);
351                 return -EPROTO;
352         }
353
354         switch(msg->ptlm_type)
355         {
356         case PTLLND_MSG_TYPE_PUT:
357         case PTLLND_MSG_TYPE_GET:
358                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
359                         CERROR("Short rdma request: got %d, want %d\n",
360                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
361                         return -EPROTO;
362                 }
363
364                 if (flip)
365                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
366
367                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
368                         CERROR("Bad matchbits "LPX64"\n",
369                                msg->ptlm_u.rdma.kptlrm_matchbits);
370                         return -EPROTO;
371                 }
372                 break;
373
374         case PTLLND_MSG_TYPE_IMMEDIATE:
375                 if (nob < offsetof(kptl_msg_t, 
376                                    ptlm_u.immediate.kptlim_payload)) {
377                         CERROR("Short immediate: got %d, want %d\n", nob,
378                                (int)offsetof(kptl_msg_t, 
379                                              ptlm_u.immediate.kptlim_payload));
380                         return -EPROTO;
381                 }
382                 /* Do nothing */
383                 break;
384                         
385         case PTLLND_MSG_TYPE_NOOP:
386         case PTLLND_MSG_TYPE_NAK:
387                 /* Do nothing */
388                 break;
389
390         case PTLLND_MSG_TYPE_HELLO:
391                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
392                         CERROR("Short hello: got %d want %d\n",
393                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
394                         return -EPROTO;
395                 }
396                 if (flip) {
397                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
398                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
399                 }
400                 break;
401
402         default:
403                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
404                 return -EPROTO;
405         }
406
407         return 0;
408 }
409
410 int
411 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
412 {
413         struct libcfs_ioctl_data *data = arg;
414         int          rc = -EINVAL;
415
416         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
417
418         /*
419          * Validate that the context block is actually
420          * pointing to this interface
421          */
422         LASSERT (ni == kptllnd_data.kptl_ni);
423
424         switch(cmd) {
425         case IOC_LIBCFS_DEL_PEER: {
426                 lnet_process_id_t id;
427                 
428                 id.nid = data->ioc_nid;
429                 id.pid = data->ioc_u32[1];
430                 
431                 rc = kptllnd_peer_del(id);
432                 break;
433         }
434
435         case IOC_LIBCFS_GET_PEER: {
436                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
437                                           .pid = LNET_PID_ANY};
438                 __u64               incarnation = 0;
439                 __u64               next_matchbits = 0;
440                 __u64               last_matchbits_seen = 0;
441                 int                 state = 0;
442                 int                 sent_hello = 0;
443                 int                 refcount = 0;
444                 int                 nsendq = 0;
445                 int                 nactiveq = 0;
446                 int                 credits = 0;
447                 int                 outstanding_credits = 0;
448
449                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
450                                            &state, &sent_hello,
451                                            &refcount, &incarnation,
452                                            &next_matchbits, &last_matchbits_seen,
453                                            &nsendq, &nactiveq,
454                                            &credits, &outstanding_credits);
455                 /* wince... */
456                 data->ioc_nid = id.nid;
457                 data->ioc_net = state;
458                 data->ioc_flags  = sent_hello;
459                 data->ioc_count = refcount;
460                 data->ioc_u64[0] = incarnation;
461                 data->ioc_u32[0] = (__u32)next_matchbits;
462                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
463                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
464                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
465                 data->ioc_u32[4] = id.pid;
466                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
467                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
468                 break;
469         }
470                 
471         default:
472                 rc=-EINVAL;
473                 break;
474         }
475         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
476         return rc;
477 }
478
479 int
480 kptllnd_startup (lnet_ni_t *ni)
481 {
482         int             rc;
483         int             i;
484         int             spares;
485         struct timeval  tv;
486         ptl_err_t       ptl_rc;
487
488         LASSERT (ni->ni_lnd == &kptllnd_lnd);
489
490         if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
491                 CERROR("Only 1 instance supported\n");
492                 return -EPERM;
493         }
494
495         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
496                 CERROR("max_procs_per_node must be >= 1\n");
497                 return -EINVAL;
498         }
499
500         /* kptl_msg_t::ptlm_credits is only a __u8 */
501         if (*kptllnd_tunables.kptl_peercredits > 255) {
502                 CERROR("kptl_peercredits must be <= 255\n");
503                 return -EINVAL;
504         }
505
506         *kptllnd_tunables.kptl_max_msg_size &= ~7;
507         if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
508                 *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
509
510         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
511         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
512
513         /*
514          * zero pointers, flags etc
515          * put everything into a known state.
516          */
517         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
518         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
519         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
520
521         /*
522          * Setup the sched locks/lists/waitq
523          */
524         spin_lock_init(&kptllnd_data.kptl_sched_lock);
525         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
526         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
527         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
528         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
529
530         /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
531         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
532
533         /*
534          * Setup the tx locks/lists
535          */
536         spin_lock_init(&kptllnd_data.kptl_tx_lock);
537         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
538         atomic_set(&kptllnd_data.kptl_ntx, 0);
539
540         /*
541          * Uptick the module reference count
542          */
543         PORTAL_MODULE_USE;
544
545         /*
546          * Setup pointers between the ni and context data block
547          */
548         kptllnd_data.kptl_ni = ni;
549         ni->ni_data = &kptllnd_data;
550
551         /*
552          * Setup Credits
553          */
554         ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
555         ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits;
556
557         kptllnd_data.kptl_expected_peers =
558                 *kptllnd_tunables.kptl_max_nodes *
559                 *kptllnd_tunables.kptl_max_procs_per_node;
560         
561         /*
562          * Initialize the Network interface instance
563          * We use the default because we don't have any
564          * way to choose a better interface.
565          * Requested and actual limits are ignored.
566          */
567         ptl_rc = PtlNIInit(
568 #ifdef _USING_LUSTRE_PORTALS_
569                 PTL_IFACE_DEFAULT,
570 #else
571                 CRAY_KERN_NAL,
572 #endif
573                 *kptllnd_tunables.kptl_pid, NULL, NULL,
574                 &kptllnd_data.kptl_nih);
575
576         /*
577          * Note: PTL_IFACE_DUP simply means that the requested
578          * interface was already inited and that we're sharing it.
579          * Which is ok.
580          */
581         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
582                 CERROR ("PtlNIInit: error %s(%d)\n",
583                         kptllnd_errtype2str(ptl_rc), ptl_rc);
584                 rc = -EINVAL;
585                 goto failed;
586         }
587
588         /* NB eq size irrelevant if using a callback */
589         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
590                             8,                       /* size */
591                             kptllnd_eq_callback,     /* handler callback */
592                             &kptllnd_data.kptl_eqh); /* output handle */
593         if (ptl_rc != PTL_OK) {
594                 CERROR("PtlEQAlloc failed %s(%d)\n",
595                        kptllnd_errtype2str(ptl_rc), ptl_rc);
596                 rc = -ENOMEM;
597                 goto failed;
598         }
599
600         /*
601          * Fetch the lower NID
602          */
603         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
604                           &kptllnd_data.kptl_portals_id);
605         if (ptl_rc != PTL_OK) {
606                 CERROR ("PtlGetID: error %s(%d)\n",
607                         kptllnd_errtype2str(ptl_rc), ptl_rc);
608                 rc = -EINVAL;
609                 goto failed;
610         }
611
612         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
613                 /* The kernel ptllnd must have the expected PID */
614                 CERROR("Unexpected PID: %u (%u expected)\n",
615                        kptllnd_data.kptl_portals_id.pid,
616                        *kptllnd_tunables.kptl_pid);
617                 rc = -EINVAL;
618                 goto failed;
619         }
620
621         ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
622
623         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
624                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
625                libcfs_nid2str(ni->ni_nid));
626
627         /* Initialized the incarnation - it must be for-all-time unique, even
628          * accounting for the fact that we increment it when we disconnect a
629          * peer that's using it */
630         do_gettimeofday(&tv);
631         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
632                                         tv.tv_usec;
633         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
634
635         /*
636          * Allocate and setup the peer hash table
637          */
638         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
639         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
640         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
641         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
642
643         kptllnd_data.kptl_peer_hash_size =
644                 *kptllnd_tunables.kptl_peer_hash_table_size;
645         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
646                      (kptllnd_data.kptl_peer_hash_size * 
647                       sizeof(struct list_head)));
648         if (kptllnd_data.kptl_peers == NULL) {
649                 CERROR("Failed to allocate space for peer hash table size=%d\n",
650                         kptllnd_data.kptl_peer_hash_size);
651                 rc = -ENOMEM;
652                 goto failed;
653         }
654         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
655                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
656
657         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
658         if (kptllnd_data.kptl_nak_msg == NULL) {
659                 CERROR("Can't allocate NAK msg\n");
660                 rc = -ENOMEM;
661                 goto failed;
662         }
663         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
664         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
665         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
666         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
667         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
668         kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
669         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
670         kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
671         kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
672
673         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
674
675         kptllnd_data.kptl_rx_cache = 
676                 cfs_mem_cache_create("ptllnd_rx",
677                                      sizeof(kptl_rx_t) + 
678                                      *kptllnd_tunables.kptl_max_msg_size,
679                                      0,    /* offset */
680                                      0);   /* flags */
681         if (kptllnd_data.kptl_rx_cache == NULL) {
682                 CERROR("Can't create slab for RX descriptors\n");
683                 rc = -ENOMEM;
684                 goto failed;
685         }
686
687         /* lists/ptrs/locks initialised */
688         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
689
690         /*****************************************************/
691
692         rc = kptllnd_setup_tx_descs();
693         if (rc != 0) {
694                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
695                        *kptllnd_tunables.kptl_ntx, rc);
696                 goto failed;
697         }
698         
699         /* Start the scheduler threads for handling incoming requests.  No need
700          * to advance the state because this will be automatically cleaned up
701          * now that PTLNAT_INIT_DATA state has been entered */
702         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
703         for (i = 0; i < PTLLND_N_SCHED; i++) {
704                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
705                 if (rc != 0) {
706                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
707                         goto failed;
708                 }
709         }
710
711         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
712         if (rc != 0) {
713                 CERROR("Can't spawn watchdog: %d\n", rc);
714                 goto failed;
715         }
716
717         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
718          * and we will still have enough buffers posted for all our peers */
719         spares = *kptllnd_tunables.kptl_rxb_nspare *
720                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
721                   *kptllnd_tunables.kptl_max_msg_size);
722
723         /* reserve and post the buffers */
724         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
725                                             kptllnd_data.kptl_expected_peers +
726                                             spares);
727         if (rc != 0) {
728                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
729                 goto failed;
730         }
731
732         /* flag everything initialised */
733         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
734
735         /*****************************************************/
736
737         if (*kptllnd_tunables.kptl_checksum)
738                 CWARN("Checksumming enabled\n");
739         
740         CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
741         return 0;
742
743  failed:
744         CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
745         kptllnd_shutdown(ni);
746         return rc;
747 }
748
749 void
750 kptllnd_shutdown (lnet_ni_t *ni)
751 {
752         int               i;
753         ptl_err_t         prc;
754         lnet_process_id_t process_id;
755         unsigned long     flags;
756
757         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
758                atomic_read (&libcfs_kmemory));
759
760         LASSERT (ni == kptllnd_data.kptl_ni);
761
762         switch (kptllnd_data.kptl_init) {
763         default:
764                 LBUG();
765
766         case PTLLND_INIT_ALL:
767         case PTLLND_INIT_DATA:
768                 /* Stop receiving */
769                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
770                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
771                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
772
773                 /* Hold peertable lock to interleave cleanly with peer birth/death */
774                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
775
776                 LASSERT (kptllnd_data.kptl_shutdown == 0);
777                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
778
779                 /* no new peers possible now */
780                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
781                                         flags);
782
783                 /* nuke all existing peers */
784                 process_id.nid = LNET_NID_ANY;
785                 process_id.pid = LNET_PID_ANY;
786                 kptllnd_peer_del(process_id);
787
788                 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
789
790                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
791
792                 i = 2;
793                 while (kptllnd_data.kptl_npeers != 0) {
794                         i++;
795                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
796                                "Waiting for %d peers to terminate\n",
797                                kptllnd_data.kptl_npeers);
798
799                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
800                                                flags);
801
802                         cfs_pause(cfs_time_seconds(1));
803
804                         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
805                                           flags);
806                 }
807
808                 LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
809                 LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
810                 LASSERT (kptllnd_data.kptl_peers != NULL);
811                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
812                         LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
813
814                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
815                 CDEBUG(D_NET, "All peers deleted\n");
816
817                 /* Shutdown phase 2: kill the daemons... */
818                 kptllnd_data.kptl_shutdown = 2;
819                 mb();
820                 
821                 i = 2;
822                 while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
823                         /* Wake up all threads*/
824                         wake_up_all(&kptllnd_data.kptl_sched_waitq);
825                         wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
826
827                         i++;
828                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
829                                "Waiting for %d threads to terminate\n",
830                                atomic_read(&kptllnd_data.kptl_nthreads));
831                         cfs_pause(cfs_time_seconds(1));
832                 }
833
834                 CDEBUG(D_NET, "All Threads stopped\n");
835                 LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
836
837                 kptllnd_cleanup_tx_descs();
838
839                 /* Nothing here now, but libcfs might soon require
840                  * us to explicitly destroy wait queues and semaphores
841                  * that would be done here */
842
843                 /* fall through */
844
845         case PTLLND_INIT_NOTHING:
846                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
847                 break;
848         }
849
850         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
851                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
852                 if (prc != PTL_OK)
853                         CERROR("Error %s(%d) freeing portals EQ\n",
854                                kptllnd_errtype2str(prc), prc);
855         }
856
857         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
858                 prc = PtlNIFini(kptllnd_data.kptl_nih);
859                 if (prc != PTL_OK)
860                         CERROR("Error %s(%d) finalizing portals NI\n",
861                                kptllnd_errtype2str(prc), prc);
862         }
863         
864         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
865         LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
866
867         if (kptllnd_data.kptl_rx_cache != NULL)
868                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
869
870         if (kptllnd_data.kptl_peers != NULL)
871                 LIBCFS_FREE (kptllnd_data.kptl_peers,
872                              sizeof (struct list_head) *
873                              kptllnd_data.kptl_peer_hash_size);
874
875         if (kptllnd_data.kptl_nak_msg != NULL)
876                 LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
877                              offsetof(kptl_msg_t, ptlm_u));
878
879         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
880
881         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
882                atomic_read (&libcfs_kmemory));
883
884         PORTAL_MODULE_UNUSE;
885 }
886
887 int __init
888 kptllnd_module_init (void)
889 {
890         int    rc;
891
892         kptllnd_assert_wire_constants();
893
894         rc = kptllnd_tunables_init();
895         if (rc != 0)
896                 return rc;
897
898         kptllnd_init_ptltrace();
899
900         lnet_register_lnd(&kptllnd_lnd);
901
902         return 0;
903 }
904
905 void __exit
906 kptllnd_module_fini (void)
907 {
908         lnet_unregister_lnd(&kptllnd_lnd);
909         kptllnd_tunables_fini();
910 }
911
912 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
913 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
914 MODULE_LICENSE("GPL");
915
916 module_init(kptllnd_module_init);
917 module_exit(kptllnd_module_fini);