Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/ptllnd/ptllnd.c
38  *
39  * Author: PJ Kirner <pjkirner@clusterfs.com>
40  */
41
42 #include "ptllnd.h"
43
44 lnd_t kptllnd_lnd = {
45         .lnd_type       = PTLLND,
46         .lnd_startup    = kptllnd_startup,
47         .lnd_shutdown   = kptllnd_shutdown,
48         .lnd_ctl        = kptllnd_ctl,
49         .lnd_send       = kptllnd_send,
50         .lnd_recv       = kptllnd_recv,
51         .lnd_eager_recv = kptllnd_eager_recv,
52 };
53
54 kptl_data_t kptllnd_data;
55
56 char *
57 kptllnd_ptlid2str(ptl_process_id_t id)
58 {
59         static char    strs[64][32];
60         static int     idx = 0;
61
62         unsigned long  flags;
63         char          *str;
64         
65         spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
66         str = strs[idx++];
67         if (idx >= sizeof(strs)/sizeof(strs[0]))
68                 idx = 0;
69         spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
70
71         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
72         return str;
73 }
74
75 void 
76 kptllnd_assert_wire_constants (void)
77 {
78         /* Wire protocol assertions generated by 'wirecheck'
79          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
80          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
81
82
83         /* Constants... */
84         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
85         CLASSERT (LNET_MSG_MATCHBITS == 0);
86         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
87         CLASSERT (PTLLND_MSG_VERSION == 0x04);
88         CLASSERT (PTLLND_RDMA_OK == 0x00);
89         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
90         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
91         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
92         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
93         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
94         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
95         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
96         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
97
98         /* Checks for struct kptl_msg_t */
99         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
100         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
101         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
102         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
103         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
104         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
105         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
106         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
107         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
108         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
109         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
110         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
111         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
112         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
113         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
114         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
115         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
116         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
117         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
118         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
119         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
120         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
121         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
122         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
123         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
124         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
125         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
126         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
127         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
128         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
129         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
130
131         /* Checks for struct kptl_immediate_msg_t */
132         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
133         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
134         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
135         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
136         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
137
138         /* Checks for struct kptl_rdma_msg_t */
139         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
140         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
141         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
142         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
143         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
144
145         /* Checks for struct kptl_hello_msg_t */
146         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
147         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
148         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
149         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
150         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
151 }
152
153 const char *kptllnd_evtype2str(int type)
154 {
155 #define DO_TYPE(x) case x: return #x;
156         switch(type)
157         {
158                 DO_TYPE(PTL_EVENT_GET_START);
159                 DO_TYPE(PTL_EVENT_GET_END);
160                 DO_TYPE(PTL_EVENT_PUT_START);
161                 DO_TYPE(PTL_EVENT_PUT_END);
162                 DO_TYPE(PTL_EVENT_REPLY_START);
163                 DO_TYPE(PTL_EVENT_REPLY_END);
164                 DO_TYPE(PTL_EVENT_ACK);
165                 DO_TYPE(PTL_EVENT_SEND_START);
166                 DO_TYPE(PTL_EVENT_SEND_END);
167                 DO_TYPE(PTL_EVENT_UNLINK);
168         default:
169                 return "<unknown event type>";
170         }
171 #undef DO_TYPE
172 }
173
174 const char *kptllnd_msgtype2str(int type)
175 {
176 #define DO_TYPE(x) case x: return #x;
177         switch(type)
178         {
179                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
180                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
181                 DO_TYPE(PTLLND_MSG_TYPE_GET);
182                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
183                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
184                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
185                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
186         default:
187                 return "<unknown msg type>";
188         }
189 #undef DO_TYPE
190 }
191
192 const char *kptllnd_errtype2str(int type)
193 {
194 #define DO_TYPE(x) case x: return #x;
195         switch(type)
196         {
197                 DO_TYPE(PTL_OK);
198                 DO_TYPE(PTL_SEGV);
199                 DO_TYPE(PTL_NO_SPACE);
200                 DO_TYPE(PTL_ME_IN_USE);
201                 DO_TYPE(PTL_NAL_FAILED);
202                 DO_TYPE(PTL_NO_INIT);
203                 DO_TYPE(PTL_IFACE_DUP);
204                 DO_TYPE(PTL_IFACE_INVALID);
205                 DO_TYPE(PTL_HANDLE_INVALID);
206                 DO_TYPE(PTL_MD_INVALID);
207                 DO_TYPE(PTL_ME_INVALID);
208                 DO_TYPE(PTL_PROCESS_INVALID);
209                 DO_TYPE(PTL_PT_INDEX_INVALID);
210                 DO_TYPE(PTL_SR_INDEX_INVALID);
211                 DO_TYPE(PTL_EQ_INVALID);
212                 DO_TYPE(PTL_EQ_DROPPED);
213                 DO_TYPE(PTL_EQ_EMPTY);
214                 DO_TYPE(PTL_MD_NO_UPDATE);
215                 DO_TYPE(PTL_FAIL);
216                 DO_TYPE(PTL_AC_INDEX_INVALID);
217                 DO_TYPE(PTL_MD_ILLEGAL);
218                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
219                 DO_TYPE(PTL_MD_IN_USE);
220                 DO_TYPE(PTL_NI_INVALID);
221                 DO_TYPE(PTL_PID_INVALID);
222                 DO_TYPE(PTL_PT_FULL);
223                 DO_TYPE(PTL_VAL_FAILED);
224                 DO_TYPE(PTL_NOT_IMPLEMENTED);
225                 DO_TYPE(PTL_NO_ACK);
226                 DO_TYPE(PTL_EQ_IN_USE);
227                 DO_TYPE(PTL_PID_IN_USE);
228                 DO_TYPE(PTL_INV_EQ_SIZE);
229                 DO_TYPE(PTL_AGAIN);
230         default:
231                 return "<unknown event type>";
232         }
233 #undef DO_TYPE
234 }
235
236 __u32
237 kptllnd_cksum (void *ptr, int nob)
238 {
239         char  *c  = ptr;
240         __u32  sum = 0;
241
242         while (nob-- > 0)
243                 sum = ((sum << 1) | (sum >> 31)) + *c++;
244
245         /* ensure I don't return 0 (== no checksum) */
246         return (sum == 0) ? 1 : sum;
247 }
248
249 void
250 kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
251 {
252         msg->ptlm_type = type;
253         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
254         
255         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
256 }
257
258 void
259 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
260 {
261         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
262         msg->ptlm_version  = PTLLND_MSG_VERSION;
263         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
264         msg->ptlm_credits  = peer->peer_outstanding_credits;
265         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
266         msg->ptlm_cksum    = 0;
267         msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
268         msg->ptlm_srcstamp = peer->peer_myincarnation;
269         msg->ptlm_dstnid   = peer->peer_id.nid;
270         msg->ptlm_dststamp = peer->peer_incarnation;
271         msg->ptlm_srcpid   = the_lnet.ln_pid;
272         msg->ptlm_dstpid   = peer->peer_id.pid;
273
274         if (*kptllnd_tunables.kptl_checksum) {
275                 /* NB ptlm_cksum zero while computing cksum */
276                 msg->ptlm_cksum = kptllnd_cksum(msg, 
277                                                 offsetof(kptl_msg_t, ptlm_u));
278         }
279 }
280
281 int
282 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
283 {
284         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
285         __u32     msg_cksum;
286         __u16     msg_version;
287         int       flip;
288
289         /* 6 bytes are enough to have received magic + version */
290         if (nob < 6) {
291                 CERROR("Very Short message: %d\n", nob);
292                 return -EPROTO;
293         }
294
295         /*
296          * Determine if we need to flip
297          */
298         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
299                 flip = 0;
300         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
301                 flip = 1;
302         } else {
303                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
304                 return -EPROTO;
305         }
306
307         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
308
309         if (msg_version != PTLLND_MSG_VERSION) {
310                 CERROR("Bad version: got %04x expected %04x\n",
311                         (__u32)msg_version, PTLLND_MSG_VERSION);
312                 return -EPROTO;
313         }
314
315         if (nob < hdr_size) {
316                 CERROR("Short message: got %d, wanted at least %d\n",
317                        nob, hdr_size);
318                 return -EPROTO;
319         }
320
321         /* checksum must be computed with
322          * 1) ptlm_cksum zero and
323          * 2) BEFORE anything gets modified/flipped
324          */
325         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
326         msg->ptlm_cksum = 0;
327         if (msg_cksum != 0 &&
328             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
329                 CERROR("Bad checksum\n");
330                 return -EPROTO;
331         }
332
333         msg->ptlm_version = msg_version;
334         msg->ptlm_cksum = msg_cksum;
335         
336         if (flip) {
337                 /* These two are 1 byte long so we don't swap them
338                    But check this assumtion*/
339                 CLASSERT (sizeof(msg->ptlm_type) == 1);
340                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
341                 /* src & dst stamps are opaque cookies */
342                 __swab32s(&msg->ptlm_nob);
343                 __swab64s(&msg->ptlm_srcnid);
344                 __swab64s(&msg->ptlm_dstnid);
345                 __swab32s(&msg->ptlm_srcpid);
346                 __swab32s(&msg->ptlm_dstpid);
347         }
348
349         if (msg->ptlm_nob != nob) {
350                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
351                        msg->ptlm_nob, nob);
352                 return -EPROTO;
353         }
354
355         switch(msg->ptlm_type)
356         {
357         case PTLLND_MSG_TYPE_PUT:
358         case PTLLND_MSG_TYPE_GET:
359                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
360                         CERROR("Short rdma request: got %d, want %d\n",
361                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
362                         return -EPROTO;
363                 }
364
365                 if (flip)
366                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
367
368                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
369                         CERROR("Bad matchbits "LPX64"\n",
370                                msg->ptlm_u.rdma.kptlrm_matchbits);
371                         return -EPROTO;
372                 }
373                 break;
374
375         case PTLLND_MSG_TYPE_IMMEDIATE:
376                 if (nob < offsetof(kptl_msg_t, 
377                                    ptlm_u.immediate.kptlim_payload)) {
378                         CERROR("Short immediate: got %d, want %d\n", nob,
379                                (int)offsetof(kptl_msg_t, 
380                                              ptlm_u.immediate.kptlim_payload));
381                         return -EPROTO;
382                 }
383                 /* Do nothing */
384                 break;
385                         
386         case PTLLND_MSG_TYPE_NOOP:
387         case PTLLND_MSG_TYPE_NAK:
388                 /* Do nothing */
389                 break;
390
391         case PTLLND_MSG_TYPE_HELLO:
392                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
393                         CERROR("Short hello: got %d want %d\n",
394                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
395                         return -EPROTO;
396                 }
397                 if (flip) {
398                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
399                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
400                 }
401                 break;
402
403         default:
404                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
405                 return -EPROTO;
406         }
407
408         return 0;
409 }
410
411 int
412 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
413 {
414         struct libcfs_ioctl_data *data = arg;
415         int          rc = -EINVAL;
416
417         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
418
419         /*
420          * Validate that the context block is actually
421          * pointing to this interface
422          */
423         LASSERT (ni == kptllnd_data.kptl_ni);
424
425         switch(cmd) {
426         case IOC_LIBCFS_DEL_PEER: {
427                 lnet_process_id_t id;
428                 
429                 id.nid = data->ioc_nid;
430                 id.pid = data->ioc_u32[1];
431                 
432                 rc = kptllnd_peer_del(id);
433                 break;
434         }
435
436         case IOC_LIBCFS_GET_PEER: {
437                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
438                                           .pid = LNET_PID_ANY};
439                 __u64               incarnation = 0;
440                 __u64               next_matchbits = 0;
441                 __u64               last_matchbits_seen = 0;
442                 int                 state = 0;
443                 int                 sent_hello = 0;
444                 int                 refcount = 0;
445                 int                 nsendq = 0;
446                 int                 nactiveq = 0;
447                 int                 credits = 0;
448                 int                 outstanding_credits = 0;
449
450                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
451                                            &state, &sent_hello,
452                                            &refcount, &incarnation,
453                                            &next_matchbits, &last_matchbits_seen,
454                                            &nsendq, &nactiveq,
455                                            &credits, &outstanding_credits);
456                 /* wince... */
457                 data->ioc_nid = id.nid;
458                 data->ioc_net = state;
459                 data->ioc_flags  = sent_hello;
460                 data->ioc_count = refcount;
461                 data->ioc_u64[0] = incarnation;
462                 data->ioc_u32[0] = (__u32)next_matchbits;
463                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
464                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
465                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
466                 data->ioc_u32[4] = id.pid;
467                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
468                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
469                 break;
470         }
471                 
472         default:
473                 rc=-EINVAL;
474                 break;
475         }
476         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
477         return rc;
478 }
479
480 int
481 kptllnd_startup (lnet_ni_t *ni)
482 {
483         int             rc;
484         int             i;
485         int             spares;
486         struct timeval  tv;
487         ptl_err_t       ptl_rc;
488
489         LASSERT (ni->ni_lnd == &kptllnd_lnd);
490
491         if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
492                 CERROR("Only 1 instance supported\n");
493                 return -EPERM;
494         }
495
496         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
497                 CERROR("max_procs_per_node must be >= 1\n");
498                 return -EINVAL;
499         }
500
501         /* kptl_msg_t::ptlm_credits is only a __u8 */
502         if (*kptllnd_tunables.kptl_peercredits > 255) {
503                 CERROR("kptl_peercredits must be <= 255\n");
504                 return -EINVAL;
505         }
506
507         *kptllnd_tunables.kptl_max_msg_size &= ~7;
508         if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
509                 *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
510
511         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
512         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
513
514         /*
515          * zero pointers, flags etc
516          * put everything into a known state.
517          */
518         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
519         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
520         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
521
522         /*
523          * Setup the sched locks/lists/waitq
524          */
525         spin_lock_init(&kptllnd_data.kptl_sched_lock);
526         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
527         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
528         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
529         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
530
531         /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
532         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
533
534         /*
535          * Setup the tx locks/lists
536          */
537         spin_lock_init(&kptllnd_data.kptl_tx_lock);
538         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
539         atomic_set(&kptllnd_data.kptl_ntx, 0);
540
541         /*
542          * Uptick the module reference count
543          */
544         PORTAL_MODULE_USE;
545
546         /*
547          * Setup pointers between the ni and context data block
548          */
549         kptllnd_data.kptl_ni = ni;
550         ni->ni_data = &kptllnd_data;
551
552         /*
553          * Setup Credits
554          */
555         ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
556         ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits;
557
558         kptllnd_data.kptl_expected_peers =
559                 *kptllnd_tunables.kptl_max_nodes *
560                 *kptllnd_tunables.kptl_max_procs_per_node;
561         
562         /*
563          * Initialize the Network interface instance
564          * We use the default because we don't have any
565          * way to choose a better interface.
566          * Requested and actual limits are ignored.
567          */
568         ptl_rc = PtlNIInit(
569 #ifdef _USING_LUSTRE_PORTALS_
570                 PTL_IFACE_DEFAULT,
571 #else
572                 CRAY_KERN_NAL,
573 #endif
574                 *kptllnd_tunables.kptl_pid, NULL, NULL,
575                 &kptllnd_data.kptl_nih);
576
577         /*
578          * Note: PTL_IFACE_DUP simply means that the requested
579          * interface was already inited and that we're sharing it.
580          * Which is ok.
581          */
582         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
583                 CERROR ("PtlNIInit: error %s(%d)\n",
584                         kptllnd_errtype2str(ptl_rc), ptl_rc);
585                 rc = -EINVAL;
586                 goto failed;
587         }
588
589         /* NB eq size irrelevant if using a callback */
590         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
591                             8,                       /* size */
592                             kptllnd_eq_callback,     /* handler callback */
593                             &kptllnd_data.kptl_eqh); /* output handle */
594         if (ptl_rc != PTL_OK) {
595                 CERROR("PtlEQAlloc failed %s(%d)\n",
596                        kptllnd_errtype2str(ptl_rc), ptl_rc);
597                 rc = -ENOMEM;
598                 goto failed;
599         }
600
601         /*
602          * Fetch the lower NID
603          */
604         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
605                           &kptllnd_data.kptl_portals_id);
606         if (ptl_rc != PTL_OK) {
607                 CERROR ("PtlGetID: error %s(%d)\n",
608                         kptllnd_errtype2str(ptl_rc), ptl_rc);
609                 rc = -EINVAL;
610                 goto failed;
611         }
612
613         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
614                 /* The kernel ptllnd must have the expected PID */
615                 CERROR("Unexpected PID: %u (%u expected)\n",
616                        kptllnd_data.kptl_portals_id.pid,
617                        *kptllnd_tunables.kptl_pid);
618                 rc = -EINVAL;
619                 goto failed;
620         }
621
622         ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
623
624         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
625                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
626                libcfs_nid2str(ni->ni_nid));
627
628         /* Initialized the incarnation - it must be for-all-time unique, even
629          * accounting for the fact that we increment it when we disconnect a
630          * peer that's using it */
631         do_gettimeofday(&tv);
632         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
633                                         tv.tv_usec;
634         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
635
636         /*
637          * Allocate and setup the peer hash table
638          */
639         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
640         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
641         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
642         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
643
644         kptllnd_data.kptl_peer_hash_size =
645                 *kptllnd_tunables.kptl_peer_hash_table_size;
646         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
647                      (kptllnd_data.kptl_peer_hash_size * 
648                       sizeof(struct list_head)));
649         if (kptllnd_data.kptl_peers == NULL) {
650                 CERROR("Failed to allocate space for peer hash table size=%d\n",
651                         kptllnd_data.kptl_peer_hash_size);
652                 rc = -ENOMEM;
653                 goto failed;
654         }
655         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
656                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
657
658         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
659         if (kptllnd_data.kptl_nak_msg == NULL) {
660                 CERROR("Can't allocate NAK msg\n");
661                 rc = -ENOMEM;
662                 goto failed;
663         }
664         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
665         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
666         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
667         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
668         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
669         kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
670         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
671         kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
672         kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
673
674         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
675
676         kptllnd_data.kptl_rx_cache = 
677                 cfs_mem_cache_create("ptllnd_rx",
678                                      sizeof(kptl_rx_t) + 
679                                      *kptllnd_tunables.kptl_max_msg_size,
680                                      0,    /* offset */
681                                      0);   /* flags */
682         if (kptllnd_data.kptl_rx_cache == NULL) {
683                 CERROR("Can't create slab for RX descriptors\n");
684                 rc = -ENOMEM;
685                 goto failed;
686         }
687
688         /* lists/ptrs/locks initialised */
689         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
690
691         /*****************************************************/
692
693         rc = kptllnd_setup_tx_descs();
694         if (rc != 0) {
695                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
696                        *kptllnd_tunables.kptl_ntx, rc);
697                 goto failed;
698         }
699         
700         /* Start the scheduler threads for handling incoming requests.  No need
701          * to advance the state because this will be automatically cleaned up
702          * now that PTLNAT_INIT_DATA state has been entered */
703         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
704         for (i = 0; i < PTLLND_N_SCHED; i++) {
705                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
706                 if (rc != 0) {
707                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
708                         goto failed;
709                 }
710         }
711
712         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
713         if (rc != 0) {
714                 CERROR("Can't spawn watchdog: %d\n", rc);
715                 goto failed;
716         }
717
718         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
719          * and we will still have enough buffers posted for all our peers */
720         spares = *kptllnd_tunables.kptl_rxb_nspare *
721                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
722                   *kptllnd_tunables.kptl_max_msg_size);
723
724         /* reserve and post the buffers */
725         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
726                                             kptllnd_data.kptl_expected_peers +
727                                             spares);
728         if (rc != 0) {
729                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
730                 goto failed;
731         }
732
733         /* flag everything initialised */
734         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
735
736         /*****************************************************/
737
738         if (*kptllnd_tunables.kptl_checksum)
739                 CWARN("Checksumming enabled\n");
740         
741         CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
742         return 0;
743
744  failed:
745         CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
746         kptllnd_shutdown(ni);
747         return rc;
748 }
749
750 void
751 kptllnd_shutdown (lnet_ni_t *ni)
752 {
753         int               i;
754         ptl_err_t         prc;
755         lnet_process_id_t process_id;
756         unsigned long     flags;
757
758         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
759                atomic_read (&libcfs_kmemory));
760
761         LASSERT (ni == kptllnd_data.kptl_ni);
762
763         switch (kptllnd_data.kptl_init) {
764         default:
765                 LBUG();
766
767         case PTLLND_INIT_ALL:
768         case PTLLND_INIT_DATA:
769                 /* Stop receiving */
770                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
771                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
772                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
773
774                 /* Hold peertable lock to interleave cleanly with peer birth/death */
775                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
776
777                 LASSERT (kptllnd_data.kptl_shutdown == 0);
778                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
779
780                 /* no new peers possible now */
781                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
782                                         flags);
783
784                 /* nuke all existing peers */
785                 process_id.nid = LNET_NID_ANY;
786                 process_id.pid = LNET_PID_ANY;
787                 kptllnd_peer_del(process_id);
788
789                 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
790
791                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
792
793                 i = 2;
794                 while (kptllnd_data.kptl_npeers != 0) {
795                         i++;
796                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
797                                "Waiting for %d peers to terminate\n",
798                                kptllnd_data.kptl_npeers);
799
800                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
801                                                flags);
802
803                         cfs_pause(cfs_time_seconds(1));
804
805                         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
806                                           flags);
807                 }
808
809                 LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
810                 LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
811                 LASSERT (kptllnd_data.kptl_peers != NULL);
812                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
813                         LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
814
815                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
816                 CDEBUG(D_NET, "All peers deleted\n");
817
818                 /* Shutdown phase 2: kill the daemons... */
819                 kptllnd_data.kptl_shutdown = 2;
820                 mb();
821                 
822                 i = 2;
823                 while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
824                         /* Wake up all threads*/
825                         wake_up_all(&kptllnd_data.kptl_sched_waitq);
826                         wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
827
828                         i++;
829                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
830                                "Waiting for %d threads to terminate\n",
831                                atomic_read(&kptllnd_data.kptl_nthreads));
832                         cfs_pause(cfs_time_seconds(1));
833                 }
834
835                 CDEBUG(D_NET, "All Threads stopped\n");
836                 LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
837
838                 kptllnd_cleanup_tx_descs();
839
840                 /* Nothing here now, but libcfs might soon require
841                  * us to explicitly destroy wait queues and semaphores
842                  * that would be done here */
843
844                 /* fall through */
845
846         case PTLLND_INIT_NOTHING:
847                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
848                 break;
849         }
850
851         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
852                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
853                 if (prc != PTL_OK)
854                         CERROR("Error %s(%d) freeing portals EQ\n",
855                                kptllnd_errtype2str(prc), prc);
856         }
857
858         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
859                 prc = PtlNIFini(kptllnd_data.kptl_nih);
860                 if (prc != PTL_OK)
861                         CERROR("Error %s(%d) finalizing portals NI\n",
862                                kptllnd_errtype2str(prc), prc);
863         }
864         
865         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
866         LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
867
868         if (kptllnd_data.kptl_rx_cache != NULL)
869                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
870
871         if (kptllnd_data.kptl_peers != NULL)
872                 LIBCFS_FREE (kptllnd_data.kptl_peers,
873                              sizeof (struct list_head) *
874                              kptllnd_data.kptl_peer_hash_size);
875
876         if (kptllnd_data.kptl_nak_msg != NULL)
877                 LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
878                              offsetof(kptl_msg_t, ptlm_u));
879
880         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
881
882         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
883                atomic_read (&libcfs_kmemory));
884
885         PORTAL_MODULE_UNUSE;
886 }
887
888 int __init
889 kptllnd_module_init (void)
890 {
891         int    rc;
892
893         kptllnd_assert_wire_constants();
894
895         rc = kptllnd_tunables_init();
896         if (rc != 0)
897                 return rc;
898
899         kptllnd_init_ptltrace();
900
901         lnet_register_lnd(&kptllnd_lnd);
902
903         return 0;
904 }
905
906 void __exit
907 kptllnd_module_fini (void)
908 {
909         lnet_unregister_lnd(&kptllnd_lnd);
910         kptllnd_tunables_fini();
911 }
912
913 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
914 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
915 MODULE_LICENSE("GPL");
916
917 module_init(kptllnd_module_init);
918 module_exit(kptllnd_module_fini);