Whamcloud - gitweb
LU-244 Cray portals compile fixes for Lustre 2.X
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41 #include "ptllnd.h"
42
43 lnd_t kptllnd_lnd = {
44         .lnd_type       = PTLLND,
45         .lnd_startup    = kptllnd_startup,
46         .lnd_shutdown   = kptllnd_shutdown,
47         .lnd_ctl        = kptllnd_ctl,
48         .lnd_query      = kptllnd_query,
49         .lnd_send       = kptllnd_send,
50         .lnd_recv       = kptllnd_recv,
51         .lnd_eager_recv = kptllnd_eager_recv,
52 };
53
54 kptl_data_t kptllnd_data;
55
56 char *
57 kptllnd_ptlid2str(ptl_process_id_t id)
58 {
59         static char    strs[64][32];
60         static int     idx = 0;
61
62         unsigned long  flags;
63         char          *str;
64         
65         cfs_spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
66         str = strs[idx++];
67         if (idx >= sizeof(strs)/sizeof(strs[0]))
68                 idx = 0;
69         cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
70
71         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
72         return str;
73 }
74
75 void 
76 kptllnd_assert_wire_constants (void)
77 {
78         /* Wire protocol assertions generated by 'wirecheck'
79          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
80          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
81
82
83         /* Constants... */
84         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
85         CLASSERT (LNET_MSG_MATCHBITS == 0);
86         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
87         CLASSERT (PTLLND_MSG_VERSION == 0x04);
88         CLASSERT (PTLLND_RDMA_OK == 0x00);
89         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
90         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
91         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
92         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
93         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
94         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
95         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
96         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
97
98         /* Checks for struct kptl_msg_t */
99         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
100         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
101         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
102         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
103         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
104         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
105         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
106         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
107         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
108         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
109         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
110         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
111         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
112         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
113         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
114         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
115         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
116         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
117         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
118         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
119         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
120         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
121         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
122         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
123         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
124         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
125         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
126         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
127         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
128         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
129         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
130
131         /* Checks for struct kptl_immediate_msg_t */
132         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
133         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
134         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
135         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
136         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
137
138         /* Checks for struct kptl_rdma_msg_t */
139         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
140         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
141         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
142         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
143         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
144
145         /* Checks for struct kptl_hello_msg_t */
146         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
147         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
148         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
149         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
150         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
151 }
152
153 const char *kptllnd_evtype2str(int type)
154 {
155 #define DO_TYPE(x) case x: return #x;
156         switch(type)
157         {
158                 DO_TYPE(PTL_EVENT_GET_START);
159                 DO_TYPE(PTL_EVENT_GET_END);
160                 DO_TYPE(PTL_EVENT_PUT_START);
161                 DO_TYPE(PTL_EVENT_PUT_END);
162                 DO_TYPE(PTL_EVENT_REPLY_START);
163                 DO_TYPE(PTL_EVENT_REPLY_END);
164                 DO_TYPE(PTL_EVENT_ACK);
165                 DO_TYPE(PTL_EVENT_SEND_START);
166                 DO_TYPE(PTL_EVENT_SEND_END);
167                 DO_TYPE(PTL_EVENT_UNLINK);
168         default:
169                 return "<unknown event type>";
170         }
171 #undef DO_TYPE
172 }
173
174 const char *kptllnd_msgtype2str(int type)
175 {
176 #define DO_TYPE(x) case x: return #x;
177         switch(type)
178         {
179                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
180                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
181                 DO_TYPE(PTLLND_MSG_TYPE_GET);
182                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
183                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
184                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
185                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
186         default:
187                 return "<unknown msg type>";
188         }
189 #undef DO_TYPE
190 }
191
192 const char *kptllnd_errtype2str(int type)
193 {
194 #define DO_TYPE(x) case x: return #x;
195         switch(type)
196         {
197                 DO_TYPE(PTL_OK);
198                 DO_TYPE(PTL_SEGV);
199                 DO_TYPE(PTL_NO_SPACE);
200                 DO_TYPE(PTL_ME_IN_USE);
201                 DO_TYPE(PTL_NAL_FAILED);
202                 DO_TYPE(PTL_NO_INIT);
203                 DO_TYPE(PTL_IFACE_DUP);
204                 DO_TYPE(PTL_IFACE_INVALID);
205                 DO_TYPE(PTL_HANDLE_INVALID);
206                 DO_TYPE(PTL_MD_INVALID);
207                 DO_TYPE(PTL_ME_INVALID);
208                 DO_TYPE(PTL_PROCESS_INVALID);
209                 DO_TYPE(PTL_PT_INDEX_INVALID);
210                 DO_TYPE(PTL_SR_INDEX_INVALID);
211                 DO_TYPE(PTL_EQ_INVALID);
212                 DO_TYPE(PTL_EQ_DROPPED);
213                 DO_TYPE(PTL_EQ_EMPTY);
214                 DO_TYPE(PTL_MD_NO_UPDATE);
215                 DO_TYPE(PTL_FAIL);
216                 DO_TYPE(PTL_AC_INDEX_INVALID);
217                 DO_TYPE(PTL_MD_ILLEGAL);
218                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
219                 DO_TYPE(PTL_MD_IN_USE);
220                 DO_TYPE(PTL_NI_INVALID);
221                 DO_TYPE(PTL_PID_INVALID);
222                 DO_TYPE(PTL_PT_FULL);
223                 DO_TYPE(PTL_VAL_FAILED);
224                 DO_TYPE(PTL_NOT_IMPLEMENTED);
225                 DO_TYPE(PTL_NO_ACK);
226                 DO_TYPE(PTL_EQ_IN_USE);
227                 DO_TYPE(PTL_PID_IN_USE);
228                 DO_TYPE(PTL_INV_EQ_SIZE);
229                 DO_TYPE(PTL_AGAIN);
230         default:
231                 return "<unknown event type>";
232         }
233 #undef DO_TYPE
234 }
235
236 __u32
237 kptllnd_cksum (void *ptr, int nob)
238 {
239         char  *c  = ptr;
240         __u32  sum = 0;
241
242         while (nob-- > 0)
243                 sum = ((sum << 1) | (sum >> 31)) + *c++;
244
245         /* ensure I don't return 0 (== no checksum) */
246         return (sum == 0) ? 1 : sum;
247 }
248
249 void
250 kptllnd_init_msg(kptl_msg_t *msg, int type,
251                  lnet_process_id_t target, int body_nob)
252 {
253         msg->ptlm_type = type;
254         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
255         msg->ptlm_dstpid = target.pid;
256         msg->ptlm_dstnid = target.nid;
257         msg->ptlm_srcpid = the_lnet.ln_pid;
258         msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
259                                                kptllnd_data.kptl_portals_id.nid);
260         
261         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
262 }
263
264 void
265 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
266 {
267         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
268         msg->ptlm_version  = PTLLND_MSG_VERSION;
269         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
270         msg->ptlm_credits  = peer->peer_outstanding_credits;
271         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
272         msg->ptlm_cksum    = 0;
273         /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
274         msg->ptlm_srcstamp = peer->peer_myincarnation;
275         msg->ptlm_dststamp = peer->peer_incarnation;
276
277         if (*kptllnd_tunables.kptl_checksum) {
278                 /* NB ptlm_cksum zero while computing cksum */
279                 msg->ptlm_cksum = kptllnd_cksum(msg, 
280                                                 offsetof(kptl_msg_t, ptlm_u));
281         }
282 }
283
284 int
285 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
286 {
287         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
288         __u32     msg_cksum;
289         __u16     msg_version;
290         int       flip;
291
292         /* 6 bytes are enough to have received magic + version */
293         if (nob < 6) {
294                 CERROR("Very Short message: %d\n", nob);
295                 return -EPROTO;
296         }
297
298         /*
299          * Determine if we need to flip
300          */
301         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
302                 flip = 0;
303         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
304                 flip = 1;
305         } else {
306                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
307                 return -EPROTO;
308         }
309
310         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
311
312         if (msg_version != PTLLND_MSG_VERSION) {
313                 CERROR("Bad version: got %04x expected %04x\n",
314                         (__u32)msg_version, PTLLND_MSG_VERSION);
315                 return -EPROTO;
316         }
317
318         if (nob < hdr_size) {
319                 CERROR("Short message: got %d, wanted at least %d\n",
320                        nob, hdr_size);
321                 return -EPROTO;
322         }
323
324         /* checksum must be computed with
325          * 1) ptlm_cksum zero and
326          * 2) BEFORE anything gets modified/flipped
327          */
328         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
329         msg->ptlm_cksum = 0;
330         if (msg_cksum != 0 &&
331             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
332                 CERROR("Bad checksum\n");
333                 return -EPROTO;
334         }
335
336         msg->ptlm_version = msg_version;
337         msg->ptlm_cksum = msg_cksum;
338         
339         if (flip) {
340                 /* These two are 1 byte long so we don't swap them
341                    But check this assumtion*/
342                 CLASSERT (sizeof(msg->ptlm_type) == 1);
343                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
344                 /* src & dst stamps are opaque cookies */
345                 __swab32s(&msg->ptlm_nob);
346                 __swab64s(&msg->ptlm_srcnid);
347                 __swab64s(&msg->ptlm_dstnid);
348                 __swab32s(&msg->ptlm_srcpid);
349                 __swab32s(&msg->ptlm_dstpid);
350         }
351
352         if (msg->ptlm_nob != nob) {
353                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
354                        msg->ptlm_nob, nob);
355                 return -EPROTO;
356         }
357
358         switch(msg->ptlm_type)
359         {
360         case PTLLND_MSG_TYPE_PUT:
361         case PTLLND_MSG_TYPE_GET:
362                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
363                         CERROR("Short rdma request: got %d, want %d\n",
364                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
365                         return -EPROTO;
366                 }
367
368                 if (flip)
369                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
370
371                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
372                         CERROR("Bad matchbits "LPX64"\n",
373                                msg->ptlm_u.rdma.kptlrm_matchbits);
374                         return -EPROTO;
375                 }
376                 break;
377
378         case PTLLND_MSG_TYPE_IMMEDIATE:
379                 if (nob < offsetof(kptl_msg_t, 
380                                    ptlm_u.immediate.kptlim_payload)) {
381                         CERROR("Short immediate: got %d, want %d\n", nob,
382                                (int)offsetof(kptl_msg_t, 
383                                              ptlm_u.immediate.kptlim_payload));
384                         return -EPROTO;
385                 }
386                 /* Do nothing */
387                 break;
388                         
389         case PTLLND_MSG_TYPE_NOOP:
390         case PTLLND_MSG_TYPE_NAK:
391                 /* Do nothing */
392                 break;
393
394         case PTLLND_MSG_TYPE_HELLO:
395                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
396                         CERROR("Short hello: got %d want %d\n",
397                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
398                         return -EPROTO;
399                 }
400                 if (flip) {
401                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
402                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
403                 }
404                 break;
405
406         default:
407                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
408                 return -EPROTO;
409         }
410
411         return 0;
412 }
413
414 int
415 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
416 {
417         kptl_net_t  *net = ni->ni_data;
418         struct libcfs_ioctl_data *data = arg;
419         int          rc = -EINVAL;
420
421         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
422
423         /*
424          * Validate that the context block is actually
425          * pointing to this interface
426          */
427         LASSERT (ni == net->net_ni);
428
429         switch(cmd) {
430         case IOC_LIBCFS_DEL_PEER: {
431                 lnet_process_id_t id;
432                 
433                 id.nid = data->ioc_nid;
434                 id.pid = data->ioc_u32[1];
435                 
436                 rc = kptllnd_peer_del(id);
437                 break;
438         }
439
440         case IOC_LIBCFS_GET_PEER: {
441                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
442                                           .pid = LNET_PID_ANY};
443                 __u64               incarnation = 0;
444                 __u64               next_matchbits = 0;
445                 __u64               last_matchbits_seen = 0;
446                 int                 state = 0;
447                 int                 sent_hello = 0;
448                 int                 refcount = 0;
449                 int                 nsendq = 0;
450                 int                 nactiveq = 0;
451                 int                 credits = 0;
452                 int                 outstanding_credits = 0;
453
454                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
455                                            &state, &sent_hello,
456                                            &refcount, &incarnation,
457                                            &next_matchbits, &last_matchbits_seen,
458                                            &nsendq, &nactiveq,
459                                            &credits, &outstanding_credits);
460                 /* wince... */
461                 data->ioc_nid = id.nid;
462                 data->ioc_net = state;
463                 data->ioc_flags  = sent_hello;
464                 data->ioc_count = refcount;
465                 data->ioc_u64[0] = incarnation;
466                 data->ioc_u32[0] = (__u32)next_matchbits;
467                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
468                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
469                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
470                 data->ioc_u32[4] = id.pid;
471                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
472                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
473                 break;
474         }
475                 
476         default:
477                 rc=-EINVAL;
478                 break;
479         }
480         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
481         return rc;
482 }
483
484 void
485 kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when)
486 {
487         kptl_net_t        *net = ni->ni_data;
488         kptl_peer_t       *peer = NULL;
489         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
490         unsigned long      flags;
491
492         /* NB: kptllnd_find_target connects to peer if necessary */
493         if (kptllnd_find_target(net, id, &peer) != 0)
494                 return;
495
496         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
497         if (peer->peer_last_alive != 0)
498                 *when = peer->peer_last_alive;
499         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
500         kptllnd_peer_decref(peer);
501         return;
502 }
503
504 void
505 kptllnd_base_shutdown (void)
506 {
507         int               i;
508         ptl_err_t         prc;
509         unsigned long     flags;
510         lnet_process_id_t process_id;
511
512         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
513         LASSERT (cfs_list_empty(&kptllnd_data.kptl_nets));
514         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
515
516         switch (kptllnd_data.kptl_init) {
517         default:
518                 LBUG();
519
520         case PTLLND_INIT_ALL:
521         case PTLLND_INIT_DATA:
522                 /* stop receiving */
523                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
524                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxq));
525                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxbq));
526
527                 /* lock to interleave cleanly with peer birth/death */
528                 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
529                 LASSERT (kptllnd_data.kptl_shutdown == 0);
530                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
531                 /* no new peers possible now */
532                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
533                                             flags);
534
535                 /* nuke all existing peers */
536                 process_id.nid = LNET_NID_ANY;
537                 process_id.pid = LNET_PID_ANY;
538                 kptllnd_peer_del(process_id);
539
540                 cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
541
542                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
543
544                 i = 2;
545                 while (kptllnd_data.kptl_npeers != 0) {
546                         i++;
547                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
548                                "Waiting for %d peers to terminate\n",
549                                kptllnd_data.kptl_npeers);
550
551                         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
552                                                    flags);
553
554                         cfs_pause(cfs_time_seconds(1));
555
556                         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
557                                               flags);
558                 }
559
560                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_closing_peers));
561                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_zombie_peers));
562                 LASSERT (kptllnd_data.kptl_peers != NULL);
563                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
564                         LASSERT (cfs_list_empty (&kptllnd_data.kptl_peers[i]));
565
566                 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
567                                            flags);
568                 CDEBUG(D_NET, "All peers deleted\n");
569
570                 /* Shutdown phase 2: kill the daemons... */
571                 kptllnd_data.kptl_shutdown = 2;
572                 cfs_mb();
573
574                 i = 2;
575                 while (cfs_atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
576                         /* Wake up all threads*/
577                         cfs_waitq_broadcast(&kptllnd_data.kptl_sched_waitq);
578                         cfs_waitq_broadcast(&kptllnd_data.kptl_watchdog_waitq);
579
580                         i++;
581                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
582                                "Waiting for %d threads to terminate\n",
583                                cfs_atomic_read(&kptllnd_data.kptl_nthreads));
584                         cfs_pause(cfs_time_seconds(1));
585                 }
586
587                 CDEBUG(D_NET, "All Threads stopped\n");
588                 LASSERT(cfs_list_empty(&kptllnd_data.kptl_sched_txq));
589
590                 kptllnd_cleanup_tx_descs();
591
592                 /* Nothing here now, but libcfs might soon require
593                  * us to explicitly destroy wait queues and semaphores
594                  * that would be done here */
595
596                 /* fall through */
597
598         case PTLLND_INIT_NOTHING:
599                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
600                 break;
601         }
602
603         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
604                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
605                 if (prc != PTL_OK)
606                         CERROR("Error %s(%d) freeing portals EQ\n",
607                                kptllnd_errtype2str(prc), prc);
608         }
609
610         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
611                 prc = PtlNIFini(kptllnd_data.kptl_nih);
612                 if (prc != PTL_OK)
613                         CERROR("Error %s(%d) finalizing portals NI\n",
614                                kptllnd_errtype2str(prc), prc);
615         }
616
617         LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
618         LASSERT (cfs_list_empty(&kptllnd_data.kptl_idle_txs));
619
620         if (kptllnd_data.kptl_rx_cache != NULL)
621                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
622
623         if (kptllnd_data.kptl_peers != NULL)
624                 LIBCFS_FREE(kptllnd_data.kptl_peers,
625                             sizeof (cfs_list_t) *
626                             kptllnd_data.kptl_peer_hash_size);
627
628         if (kptllnd_data.kptl_nak_msg != NULL)
629                 LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
630                             offsetof(kptl_msg_t, ptlm_u));
631
632         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
633         PORTAL_MODULE_UNUSE;
634         return;
635 }
636
637 int
638 kptllnd_base_startup (void)
639 {
640         int             i;
641         int                rc;
642         int             spares;
643         struct timeval  tv;
644         lnet_process_id_t  target;
645         ptl_err_t       ptl_rc;
646
647         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
648                 CERROR("max_procs_per_node must be >= 1\n");
649                 return -EINVAL;
650         }
651
652         if (*kptllnd_tunables.kptl_peertxcredits > PTLLND_MSG_MAX_CREDITS) {
653                 CERROR("peercredits must be <= %d\n", PTLLND_MSG_MAX_CREDITS);
654                 return -EINVAL;
655         }
656
657         *kptllnd_tunables.kptl_max_msg_size &= ~7;
658         if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
659                 *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
660
661         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
662         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
663
664         /* Zero pointers, flags etc; put everything into a known state. */
665         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
666
667         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
668         if (kptllnd_data.kptl_nak_msg == NULL) {
669                 CERROR("Can't allocate NAK msg\n");
670                 return -ENOMEM;
671         }
672         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
673
674         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
675         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
676
677         cfs_rwlock_init(&kptllnd_data.kptl_net_rw_lock);
678         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
679
680         /* Setup the sched locks/lists/waitq */
681         cfs_spin_lock_init(&kptllnd_data.kptl_sched_lock);
682         cfs_waitq_init(&kptllnd_data.kptl_sched_waitq);
683         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
684         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
685         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
686
687         /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
688         cfs_spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
689
690         /* Setup the tx locks/lists */
691         cfs_spin_lock_init(&kptllnd_data.kptl_tx_lock);
692         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
693         cfs_atomic_set(&kptllnd_data.kptl_ntx, 0);
694
695         /* Uptick the module reference count */
696         PORTAL_MODULE_USE;
697
698         kptllnd_data.kptl_expected_peers =
699                 *kptllnd_tunables.kptl_max_nodes *
700                 *kptllnd_tunables.kptl_max_procs_per_node;
701
702         /*
703          * Initialize the Network interface instance
704          * We use the default because we don't have any
705          * way to choose a better interface.
706          * Requested and actual limits are ignored.
707          */
708         ptl_rc = PtlNIInit(
709 #ifdef _USING_LUSTRE_PORTALS_
710                 PTL_IFACE_DEFAULT,
711 #else
712                 CRAY_KERN_NAL,
713 #endif
714                 *kptllnd_tunables.kptl_pid, NULL, NULL,
715                 &kptllnd_data.kptl_nih);
716
717         /*
718          * Note: PTL_IFACE_DUP simply means that the requested
719          * interface was already inited and that we're sharing it.
720          * Which is ok.
721          */
722         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
723                 CERROR ("PtlNIInit: error %s(%d)\n",
724                         kptllnd_errtype2str(ptl_rc), ptl_rc);
725                 rc = -EINVAL;
726                 goto failed;
727         }
728
729         /* NB eq size irrelevant if using a callback */
730         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
731                             8,                       /* size */
732                             kptllnd_eq_callback,     /* handler callback */
733                             &kptllnd_data.kptl_eqh); /* output handle */
734         if (ptl_rc != PTL_OK) {
735                 CERROR("PtlEQAlloc failed %s(%d)\n",
736                        kptllnd_errtype2str(ptl_rc), ptl_rc);
737                 rc = -ENOMEM;
738                 goto failed;
739         }
740
741         /* Fetch the lower NID */
742         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
743                           &kptllnd_data.kptl_portals_id);
744         if (ptl_rc != PTL_OK) {
745                 CERROR ("PtlGetID: error %s(%d)\n",
746                         kptllnd_errtype2str(ptl_rc), ptl_rc);
747                 rc = -EINVAL;
748                 goto failed;
749         }
750
751         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
752                 /* The kernel ptllnd must have the expected PID */
753                 CERROR("Unexpected PID: %u (%u expected)\n",
754                        kptllnd_data.kptl_portals_id.pid,
755                        *kptllnd_tunables.kptl_pid);
756                 rc = -EINVAL;
757                 goto failed;
758         }
759
760         /* Initialized the incarnation - it must be for-all-time unique, even
761          * accounting for the fact that we increment it when we disconnect a
762          * peer that's using it */
763         cfs_gettimeofday(&tv);
764         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
765                                         tv.tv_usec;
766         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
767
768         target.nid = LNET_NID_ANY;
769         target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
770         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
771         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
772         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
773         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
774         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
775
776         cfs_rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
777         cfs_waitq_init(&kptllnd_data.kptl_watchdog_waitq);
778         cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
779         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
780         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
781
782         /* Allocate and setup the peer hash table */
783         kptllnd_data.kptl_peer_hash_size =
784                 *kptllnd_tunables.kptl_peer_hash_table_size;
785         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
786                      sizeof(cfs_list_t) *
787                      kptllnd_data.kptl_peer_hash_size);
788         if (kptllnd_data.kptl_peers == NULL) {
789                 CERROR("Failed to allocate space for peer hash table size=%d\n",
790                         kptllnd_data.kptl_peer_hash_size);
791                 rc = -ENOMEM;
792                 goto failed;
793         }
794         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
795                 CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
796
797         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
798
799         kptllnd_data.kptl_rx_cache =
800                 cfs_mem_cache_create("ptllnd_rx",
801                                      sizeof(kptl_rx_t) + 
802                                      *kptllnd_tunables.kptl_max_msg_size,
803                                      0,    /* offset */
804                                      0);   /* flags */
805         if (kptllnd_data.kptl_rx_cache == NULL) {
806                 CERROR("Can't create slab for RX descriptors\n");
807                 rc = -ENOMEM;
808                 goto failed;
809         }
810
811         /* lists/ptrs/locks initialised */
812         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
813
814         /*****************************************************/
815
816         rc = kptllnd_setup_tx_descs();
817         if (rc != 0) {
818                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
819                        *kptllnd_tunables.kptl_ntx, rc);
820                 goto failed;
821         }
822         
823         /* Start the scheduler threads for handling incoming requests.  No need
824          * to advance the state because this will be automatically cleaned up
825          * now that PTLLND_INIT_DATA state has been entered */
826         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
827         for (i = 0; i < PTLLND_N_SCHED; i++) {
828                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
829                 if (rc != 0) {
830                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
831                         goto failed;
832                 }
833         }
834
835         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
836         if (rc != 0) {
837                 CERROR("Can't spawn watchdog: %d\n", rc);
838                 goto failed;
839         }
840
841         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
842          * and we will still have enough buffers posted for all our peers */
843         spares = *kptllnd_tunables.kptl_rxb_nspare *
844                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
845                   *kptllnd_tunables.kptl_max_msg_size);
846
847         /* reserve and post the buffers */
848         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
849                                             kptllnd_data.kptl_expected_peers +
850                                             spares);
851         if (rc != 0) {
852                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
853                 goto failed;
854         }
855
856         /* flag everything initialised */
857         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
858
859         /*****************************************************/
860
861         if (*kptllnd_tunables.kptl_checksum)
862                 CWARN("Checksumming enabled\n");
863
864         CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
865         return 0;
866
867  failed:
868         CERROR("kptllnd_base_startup failed: %d\n", rc);
869         kptllnd_base_shutdown();
870         return rc;
871 }
872
873 int
874 kptllnd_startup (lnet_ni_t *ni)
875 {
876         int         rc;
877         kptl_net_t *net;
878
879         LASSERT (ni->ni_lnd == &kptllnd_lnd);
880
881         if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
882                 rc = kptllnd_base_startup();
883                 if (rc != 0)
884                         return rc;
885         }
886
887         LIBCFS_ALLOC(net, sizeof(*net));
888         ni->ni_data = net;
889         if (net == NULL) {
890                 CERROR("Can't allocate kptl_net_t\n");
891                 rc = -ENOMEM;
892                 goto failed;
893         }
894         memset(net, 0, sizeof(*net));
895         net->net_ni = ni;
896
897         ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
898         ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
899         ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
900         ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
901                                          kptllnd_data.kptl_portals_id.nid);
902         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
903                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
904                libcfs_nid2str(ni->ni_nid));
905
906         /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
907          * multiple NIs */
908         kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
909
910         cfs_atomic_set(&net->net_refcount, 1);
911         cfs_write_lock(&kptllnd_data.kptl_net_rw_lock);
912         cfs_list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
913         cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock);
914         return 0;
915
916  failed:
917         kptllnd_shutdown(ni);
918         return rc;
919 }
920
921 void
922 kptllnd_shutdown (lnet_ni_t *ni)
923 {
924         kptl_net_t    *net = ni->ni_data;
925         int               i;
926         unsigned long     flags;
927
928         LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
929
930         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
931                cfs_atomic_read (&libcfs_kmemory));
932
933         if (net == NULL)
934                 goto out;
935
936         LASSERT (ni == net->net_ni);
937         LASSERT (!net->net_shutdown);
938         LASSERT (!cfs_list_empty(&net->net_list));
939         LASSERT (cfs_atomic_read(&net->net_refcount) != 0);
940         ni->ni_data = NULL;
941         net->net_ni = NULL;
942
943         cfs_write_lock(&kptllnd_data.kptl_net_rw_lock);
944         kptllnd_net_decref(net);
945         cfs_list_del_init(&net->net_list);
946         cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock);
947
948         /* Can't nuke peers here - they are shared among all NIs */
949         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
950         net->net_shutdown = 1;   /* Order with peer creation */
951         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
952
953         i = 2;
954         while (cfs_atomic_read(&net->net_refcount) != 0) {
955                         i++;
956                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
957                        "Waiting for %d references to drop\n",
958                        cfs_atomic_read(&net->net_refcount));
959
960                        cfs_pause(cfs_time_seconds(1));
961                 }
962
963         LIBCFS_FREE(net, sizeof(*net));
964 out:
965         /* NB no locking since I don't race with writers */
966         if (cfs_list_empty(&kptllnd_data.kptl_nets))
967                 kptllnd_base_shutdown();
968         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
969                cfs_atomic_read (&libcfs_kmemory));
970         return;
971 }
972
973 int __init
974 kptllnd_module_init (void)
975 {
976         int    rc;
977
978         kptllnd_assert_wire_constants();
979
980         rc = kptllnd_tunables_init();
981         if (rc != 0)
982                 return rc;
983
984         kptllnd_init_ptltrace();
985
986         lnet_register_lnd(&kptllnd_lnd);
987
988         return 0;
989 }
990
991 void __exit
992 kptllnd_module_fini (void)
993 {
994         lnet_unregister_lnd(&kptllnd_lnd);
995         kptllnd_tunables_fini();
996 }
997
998 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
999 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
1000 MODULE_LICENSE("GPL");
1001
1002 module_init(kptllnd_module_init);
1003 module_exit(kptllnd_module_fini);