Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  */
38
39 #include "ptllnd.h"
40
41 lnd_t kptllnd_lnd = {
42         .lnd_type       = PTLLND,
43         .lnd_startup    = kptllnd_startup,
44         .lnd_shutdown   = kptllnd_shutdown,
45         .lnd_ctl        = kptllnd_ctl,
46         .lnd_query      = kptllnd_query,
47         .lnd_send       = kptllnd_send,
48         .lnd_recv       = kptllnd_recv,
49         .lnd_eager_recv = kptllnd_eager_recv,
50 };
51
52 kptl_data_t kptllnd_data;
53
54 char *
55 kptllnd_ptlid2str(ptl_process_id_t id)
56 {
57         static char    strs[64][32];
58         static int     idx = 0;
59
60         unsigned long  flags;
61         char          *str;
62
63         spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
64         str = strs[idx++];
65         if (idx >= sizeof(strs)/sizeof(strs[0]))
66                 idx = 0;
67         spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
68
69         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
70         return str;
71 }
72
73 void 
74 kptllnd_assert_wire_constants (void)
75 {
76         /* Wire protocol assertions generated by 'wirecheck'
77          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
78          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
79
80
81         /* Constants... */
82         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
83         CLASSERT (LNET_MSG_MATCHBITS == 0);
84         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
85         CLASSERT (PTLLND_MSG_VERSION == 0x04);
86         CLASSERT (PTLLND_RDMA_OK == 0x00);
87         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
88         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
89         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
90         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
91         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
92         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
93         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
94         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
95
96         /* Checks for struct kptl_msg_t */
97         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
98         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
99         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
100         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
101         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
102         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
103         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
104         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
105         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
106         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
107         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
108         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
109         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
110         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
111         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
112         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
113         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
114         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
115         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
116         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
117         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
118         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
119         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
120         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
121         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
122         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
123         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
124         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
125         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
126         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
127         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
128
129         /* Checks for struct kptl_immediate_msg_t */
130         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
131         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
132         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
133         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
134         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
135
136         /* Checks for struct kptl_rdma_msg_t */
137         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
138         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
139         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
140         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
141         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
142
143         /* Checks for struct kptl_hello_msg_t */
144         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
145         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
146         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
147         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
148         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
149 }
150
151 const char *kptllnd_evtype2str(int type)
152 {
153 #define DO_TYPE(x) case x: return #x;
154         switch(type)
155         {
156                 DO_TYPE(PTL_EVENT_GET_START);
157                 DO_TYPE(PTL_EVENT_GET_END);
158                 DO_TYPE(PTL_EVENT_PUT_START);
159                 DO_TYPE(PTL_EVENT_PUT_END);
160                 DO_TYPE(PTL_EVENT_REPLY_START);
161                 DO_TYPE(PTL_EVENT_REPLY_END);
162                 DO_TYPE(PTL_EVENT_ACK);
163                 DO_TYPE(PTL_EVENT_SEND_START);
164                 DO_TYPE(PTL_EVENT_SEND_END);
165                 DO_TYPE(PTL_EVENT_UNLINK);
166         default:
167                 return "<unknown event type>";
168         }
169 #undef DO_TYPE
170 }
171
172 const char *kptllnd_msgtype2str(int type)
173 {
174 #define DO_TYPE(x) case x: return #x;
175         switch(type)
176         {
177                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
178                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
179                 DO_TYPE(PTLLND_MSG_TYPE_GET);
180                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
181                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
182                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
183                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
184         default:
185                 return "<unknown msg type>";
186         }
187 #undef DO_TYPE
188 }
189
190 const char *kptllnd_errtype2str(int type)
191 {
192 #define DO_TYPE(x) case x: return #x;
193         switch(type)
194         {
195                 DO_TYPE(PTL_OK);
196                 DO_TYPE(PTL_SEGV);
197                 DO_TYPE(PTL_NO_SPACE);
198                 DO_TYPE(PTL_ME_IN_USE);
199                 DO_TYPE(PTL_NAL_FAILED);
200                 DO_TYPE(PTL_NO_INIT);
201                 DO_TYPE(PTL_IFACE_DUP);
202                 DO_TYPE(PTL_IFACE_INVALID);
203                 DO_TYPE(PTL_HANDLE_INVALID);
204                 DO_TYPE(PTL_MD_INVALID);
205                 DO_TYPE(PTL_ME_INVALID);
206                 DO_TYPE(PTL_PROCESS_INVALID);
207                 DO_TYPE(PTL_PT_INDEX_INVALID);
208                 DO_TYPE(PTL_SR_INDEX_INVALID);
209                 DO_TYPE(PTL_EQ_INVALID);
210                 DO_TYPE(PTL_EQ_DROPPED);
211                 DO_TYPE(PTL_EQ_EMPTY);
212                 DO_TYPE(PTL_MD_NO_UPDATE);
213                 DO_TYPE(PTL_FAIL);
214                 DO_TYPE(PTL_AC_INDEX_INVALID);
215                 DO_TYPE(PTL_MD_ILLEGAL);
216                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
217                 DO_TYPE(PTL_MD_IN_USE);
218                 DO_TYPE(PTL_NI_INVALID);
219                 DO_TYPE(PTL_PID_INVALID);
220                 DO_TYPE(PTL_PT_FULL);
221                 DO_TYPE(PTL_VAL_FAILED);
222                 DO_TYPE(PTL_NOT_IMPLEMENTED);
223                 DO_TYPE(PTL_NO_ACK);
224                 DO_TYPE(PTL_EQ_IN_USE);
225                 DO_TYPE(PTL_PID_IN_USE);
226                 DO_TYPE(PTL_INV_EQ_SIZE);
227                 DO_TYPE(PTL_AGAIN);
228         default:
229                 return "<unknown event type>";
230         }
231 #undef DO_TYPE
232 }
233
234 __u32
235 kptllnd_cksum (void *ptr, int nob)
236 {
237         char  *c  = ptr;
238         __u32  sum = 0;
239
240         while (nob-- > 0)
241                 sum = ((sum << 1) | (sum >> 31)) + *c++;
242
243         /* ensure I don't return 0 (== no checksum) */
244         return (sum == 0) ? 1 : sum;
245 }
246
247 void
248 kptllnd_init_msg(kptl_msg_t *msg, int type,
249                  lnet_process_id_t target, int body_nob)
250 {
251         msg->ptlm_type = type;
252         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
253         msg->ptlm_dstpid = target.pid;
254         msg->ptlm_dstnid = target.nid;
255         msg->ptlm_srcpid = the_lnet.ln_pid;
256         msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
257                                                kptllnd_data.kptl_portals_id.nid);
258         
259         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
260 }
261
262 void
263 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
264 {
265         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
266         msg->ptlm_version  = PTLLND_MSG_VERSION;
267         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
268         msg->ptlm_credits  = peer->peer_outstanding_credits;
269         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
270         msg->ptlm_cksum    = 0;
271         /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
272         msg->ptlm_srcstamp = peer->peer_myincarnation;
273         msg->ptlm_dststamp = peer->peer_incarnation;
274
275         if (*kptllnd_tunables.kptl_checksum) {
276                 /* NB ptlm_cksum zero while computing cksum */
277                 msg->ptlm_cksum = kptllnd_cksum(msg, 
278                                                 offsetof(kptl_msg_t, ptlm_u));
279         }
280 }
281
282 int
283 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
284 {
285         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
286         __u32     msg_cksum;
287         __u16     msg_version;
288         int       flip;
289
290         /* 6 bytes are enough to have received magic + version */
291         if (nob < 6) {
292                 CERROR("Very Short message: %d\n", nob);
293                 return -EPROTO;
294         }
295
296         /*
297          * Determine if we need to flip
298          */
299         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
300                 flip = 0;
301         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
302                 flip = 1;
303         } else {
304                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
305                 return -EPROTO;
306         }
307
308         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
309
310         if (msg_version != PTLLND_MSG_VERSION) {
311                 CERROR("Bad version: got %04x expected %04x\n",
312                         (__u32)msg_version, PTLLND_MSG_VERSION);
313                 return -EPROTO;
314         }
315
316         if (nob < hdr_size) {
317                 CERROR("Short message: got %d, wanted at least %d\n",
318                        nob, hdr_size);
319                 return -EPROTO;
320         }
321
322         /* checksum must be computed with
323          * 1) ptlm_cksum zero and
324          * 2) BEFORE anything gets modified/flipped
325          */
326         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
327         msg->ptlm_cksum = 0;
328         if (msg_cksum != 0 &&
329             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
330                 CERROR("Bad checksum\n");
331                 return -EPROTO;
332         }
333
334         msg->ptlm_version = msg_version;
335         msg->ptlm_cksum = msg_cksum;
336         
337         if (flip) {
338                 /* These two are 1 byte long so we don't swap them
339                    But check this assumtion*/
340                 CLASSERT (sizeof(msg->ptlm_type) == 1);
341                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
342                 /* src & dst stamps are opaque cookies */
343                 __swab32s(&msg->ptlm_nob);
344                 __swab64s(&msg->ptlm_srcnid);
345                 __swab64s(&msg->ptlm_dstnid);
346                 __swab32s(&msg->ptlm_srcpid);
347                 __swab32s(&msg->ptlm_dstpid);
348         }
349
350         if (msg->ptlm_nob != nob) {
351                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
352                        msg->ptlm_nob, nob);
353                 return -EPROTO;
354         }
355
356         switch(msg->ptlm_type)
357         {
358         case PTLLND_MSG_TYPE_PUT:
359         case PTLLND_MSG_TYPE_GET:
360                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
361                         CERROR("Short rdma request: got %d, want %d\n",
362                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
363                         return -EPROTO;
364                 }
365
366                 if (flip)
367                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
368
369                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
370                         CERROR("Bad matchbits "LPX64"\n",
371                                msg->ptlm_u.rdma.kptlrm_matchbits);
372                         return -EPROTO;
373                 }
374                 break;
375
376         case PTLLND_MSG_TYPE_IMMEDIATE:
377                 if (nob < offsetof(kptl_msg_t, 
378                                    ptlm_u.immediate.kptlim_payload)) {
379                         CERROR("Short immediate: got %d, want %d\n", nob,
380                                (int)offsetof(kptl_msg_t, 
381                                              ptlm_u.immediate.kptlim_payload));
382                         return -EPROTO;
383                 }
384                 /* Do nothing */
385                 break;
386                         
387         case PTLLND_MSG_TYPE_NOOP:
388         case PTLLND_MSG_TYPE_NAK:
389                 /* Do nothing */
390                 break;
391
392         case PTLLND_MSG_TYPE_HELLO:
393                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
394                         CERROR("Short hello: got %d want %d\n",
395                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
396                         return -EPROTO;
397                 }
398                 if (flip) {
399                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
400                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
401                 }
402                 break;
403
404         default:
405                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
406                 return -EPROTO;
407         }
408
409         return 0;
410 }
411
412 int
413 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
414 {
415         kptl_net_t  *net = ni->ni_data;
416         struct libcfs_ioctl_data *data = arg;
417         int          rc = -EINVAL;
418
419         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
420
421         /*
422          * Validate that the context block is actually
423          * pointing to this interface
424          */
425         LASSERT (ni == net->net_ni);
426
427         switch(cmd) {
428         case IOC_LIBCFS_DEL_PEER: {
429                 lnet_process_id_t id;
430                 
431                 id.nid = data->ioc_nid;
432                 id.pid = data->ioc_u32[1];
433                 
434                 rc = kptllnd_peer_del(id);
435                 break;
436         }
437
438         case IOC_LIBCFS_GET_PEER: {
439                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
440                                           .pid = LNET_PID_ANY};
441                 __u64               incarnation = 0;
442                 __u64               next_matchbits = 0;
443                 __u64               last_matchbits_seen = 0;
444                 int                 state = 0;
445                 int                 sent_hello = 0;
446                 int                 refcount = 0;
447                 int                 nsendq = 0;
448                 int                 nactiveq = 0;
449                 int                 credits = 0;
450                 int                 outstanding_credits = 0;
451
452                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
453                                            &state, &sent_hello,
454                                            &refcount, &incarnation,
455                                            &next_matchbits, &last_matchbits_seen,
456                                            &nsendq, &nactiveq,
457                                            &credits, &outstanding_credits);
458                 /* wince... */
459                 data->ioc_nid = id.nid;
460                 data->ioc_net = state;
461                 data->ioc_flags  = sent_hello;
462                 data->ioc_count = refcount;
463                 data->ioc_u64[0] = incarnation;
464                 data->ioc_u32[0] = (__u32)next_matchbits;
465                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
466                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
467                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
468                 data->ioc_u32[4] = id.pid;
469                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
470                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
471                 break;
472         }
473                 
474         default:
475                 rc=-EINVAL;
476                 break;
477         }
478         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
479         return rc;
480 }
481
482 void
483 kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when)
484 {
485         kptl_net_t        *net = ni->ni_data;
486         kptl_peer_t       *peer = NULL;
487         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
488         unsigned long      flags;
489
490         /* NB: kptllnd_find_target connects to peer if necessary */
491         if (kptllnd_find_target(net, id, &peer) != 0)
492                 return;
493
494         spin_lock_irqsave(&peer->peer_lock, flags);
495         if (peer->peer_last_alive != 0)
496                 *when = peer->peer_last_alive;
497         spin_unlock_irqrestore(&peer->peer_lock, flags);
498         kptllnd_peer_decref(peer);
499         return;
500 }
501
502 void
503 kptllnd_base_shutdown (void)
504 {
505         int               i;
506         ptl_err_t         prc;
507         unsigned long     flags;
508         lnet_process_id_t process_id;
509
510         read_lock(&kptllnd_data.kptl_net_rw_lock);
511         LASSERT (cfs_list_empty(&kptllnd_data.kptl_nets));
512         read_unlock(&kptllnd_data.kptl_net_rw_lock);
513
514         switch (kptllnd_data.kptl_init) {
515         default:
516                 LBUG();
517
518         case PTLLND_INIT_ALL:
519         case PTLLND_INIT_DATA:
520                 /* stop receiving */
521                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
522                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxq));
523                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxbq));
524
525                 /* lock to interleave cleanly with peer birth/death */
526                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
527                 LASSERT (kptllnd_data.kptl_shutdown == 0);
528                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
529                 /* no new peers possible now */
530                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
531                                             flags);
532
533                 /* nuke all existing peers */
534                 process_id.nid = LNET_NID_ANY;
535                 process_id.pid = LNET_PID_ANY;
536                 kptllnd_peer_del(process_id);
537
538                 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
539
540                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
541
542                 i = 2;
543                 while (kptllnd_data.kptl_npeers != 0) {
544                         i++;
545                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
546                                "Waiting for %d peers to terminate\n",
547                                kptllnd_data.kptl_npeers);
548
549                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
550                                                    flags);
551
552                         cfs_pause(cfs_time_seconds(1));
553
554                         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
555                                               flags);
556                 }
557
558                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_closing_peers));
559                 LASSERT (cfs_list_empty(&kptllnd_data.kptl_zombie_peers));
560                 LASSERT (kptllnd_data.kptl_peers != NULL);
561                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
562                         LASSERT (cfs_list_empty (&kptllnd_data.kptl_peers[i]));
563
564                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
565                                            flags);
566                 CDEBUG(D_NET, "All peers deleted\n");
567
568                 /* Shutdown phase 2: kill the daemons... */
569                 kptllnd_data.kptl_shutdown = 2;
570                 cfs_mb();
571
572                 i = 2;
573                 while (cfs_atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
574                         /* Wake up all threads*/
575                         cfs_waitq_broadcast(&kptllnd_data.kptl_sched_waitq);
576                         cfs_waitq_broadcast(&kptllnd_data.kptl_watchdog_waitq);
577
578                         i++;
579                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
580                                "Waiting for %d threads to terminate\n",
581                                cfs_atomic_read(&kptllnd_data.kptl_nthreads));
582                         cfs_pause(cfs_time_seconds(1));
583                 }
584
585                 CDEBUG(D_NET, "All Threads stopped\n");
586                 LASSERT(cfs_list_empty(&kptllnd_data.kptl_sched_txq));
587
588                 kptllnd_cleanup_tx_descs();
589
590                 /* Nothing here now, but libcfs might soon require
591                  * us to explicitly destroy wait queues and semaphores
592                  * that would be done here */
593
594                 /* fall through */
595
596         case PTLLND_INIT_NOTHING:
597                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
598                 break;
599         }
600
601         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
602                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
603                 if (prc != PTL_OK)
604                         CERROR("Error %s(%d) freeing portals EQ\n",
605                                kptllnd_errtype2str(prc), prc);
606         }
607
608         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
609                 prc = PtlNIFini(kptllnd_data.kptl_nih);
610                 if (prc != PTL_OK)
611                         CERROR("Error %s(%d) finalizing portals NI\n",
612                                kptllnd_errtype2str(prc), prc);
613         }
614
615         LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
616         LASSERT (cfs_list_empty(&kptllnd_data.kptl_idle_txs));
617
618         if (kptllnd_data.kptl_rx_cache != NULL)
619                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
620
621         if (kptllnd_data.kptl_peers != NULL)
622                 LIBCFS_FREE(kptllnd_data.kptl_peers,
623                             sizeof (cfs_list_t) *
624                             kptllnd_data.kptl_peer_hash_size);
625
626         if (kptllnd_data.kptl_nak_msg != NULL)
627                 LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
628                             offsetof(kptl_msg_t, ptlm_u));
629
630         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
631         PORTAL_MODULE_UNUSE;
632         return;
633 }
634
635 int
636 kptllnd_base_startup (void)
637 {
638         int             i;
639         int                rc;
640         int             spares;
641         struct timeval  tv;
642         lnet_process_id_t  target;
643         ptl_err_t       ptl_rc;
644
645         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
646                 CERROR("max_procs_per_node must be >= 1\n");
647                 return -EINVAL;
648         }
649
650         if (*kptllnd_tunables.kptl_peertxcredits > PTLLND_MSG_MAX_CREDITS) {
651                 CERROR("peercredits must be <= %d\n", PTLLND_MSG_MAX_CREDITS);
652                 return -EINVAL;
653         }
654
655         *kptllnd_tunables.kptl_max_msg_size &= ~7;
656         if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
657                 *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
658
659         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
660         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
661
662         /* Zero pointers, flags etc; put everything into a known state. */
663         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
664
665         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
666         if (kptllnd_data.kptl_nak_msg == NULL) {
667                 CERROR("Can't allocate NAK msg\n");
668                 return -ENOMEM;
669         }
670         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
671
672         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
673         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
674
675         rwlock_init(&kptllnd_data.kptl_net_rw_lock);
676         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
677
678         /* Setup the sched locks/lists/waitq */
679         spin_lock_init(&kptllnd_data.kptl_sched_lock);
680         cfs_waitq_init(&kptllnd_data.kptl_sched_waitq);
681         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
682         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
683         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
684
685         /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
686         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
687
688         /* Setup the tx locks/lists */
689         spin_lock_init(&kptllnd_data.kptl_tx_lock);
690         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
691         cfs_atomic_set(&kptllnd_data.kptl_ntx, 0);
692
693         /* Uptick the module reference count */
694         PORTAL_MODULE_USE;
695
696         kptllnd_data.kptl_expected_peers =
697                 *kptllnd_tunables.kptl_max_nodes *
698                 *kptllnd_tunables.kptl_max_procs_per_node;
699
700         /*
701          * Initialize the Network interface instance
702          * We use the default because we don't have any
703          * way to choose a better interface.
704          * Requested and actual limits are ignored.
705          */
706         ptl_rc = PtlNIInit(
707 #ifdef _USING_LUSTRE_PORTALS_
708                 PTL_IFACE_DEFAULT,
709 #else
710                 CRAY_KERN_NAL,
711 #endif
712                 *kptllnd_tunables.kptl_pid, NULL, NULL,
713                 &kptllnd_data.kptl_nih);
714
715         /*
716          * Note: PTL_IFACE_DUP simply means that the requested
717          * interface was already inited and that we're sharing it.
718          * Which is ok.
719          */
720         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
721                 CERROR ("PtlNIInit: error %s(%d)\n",
722                         kptllnd_errtype2str(ptl_rc), ptl_rc);
723                 rc = -EINVAL;
724                 goto failed;
725         }
726
727         /* NB eq size irrelevant if using a callback */
728         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
729                             8,                       /* size */
730                             kptllnd_eq_callback,     /* handler callback */
731                             &kptllnd_data.kptl_eqh); /* output handle */
732         if (ptl_rc != PTL_OK) {
733                 CERROR("PtlEQAlloc failed %s(%d)\n",
734                        kptllnd_errtype2str(ptl_rc), ptl_rc);
735                 rc = -ENOMEM;
736                 goto failed;
737         }
738
739         /* Fetch the lower NID */
740         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
741                           &kptllnd_data.kptl_portals_id);
742         if (ptl_rc != PTL_OK) {
743                 CERROR ("PtlGetID: error %s(%d)\n",
744                         kptllnd_errtype2str(ptl_rc), ptl_rc);
745                 rc = -EINVAL;
746                 goto failed;
747         }
748
749         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
750                 /* The kernel ptllnd must have the expected PID */
751                 CERROR("Unexpected PID: %u (%u expected)\n",
752                        kptllnd_data.kptl_portals_id.pid,
753                        *kptllnd_tunables.kptl_pid);
754                 rc = -EINVAL;
755                 goto failed;
756         }
757
758         /* Initialized the incarnation - it must be for-all-time unique, even
759          * accounting for the fact that we increment it when we disconnect a
760          * peer that's using it */
761         cfs_gettimeofday(&tv);
762         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
763                                         tv.tv_usec;
764         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
765
766         target.nid = LNET_NID_ANY;
767         target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
768         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
769         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
770         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
771         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
772         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
773
774         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
775         cfs_waitq_init(&kptllnd_data.kptl_watchdog_waitq);
776         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
777         CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
778
779         /* Allocate and setup the peer hash table */
780         kptllnd_data.kptl_peer_hash_size =
781                 *kptllnd_tunables.kptl_peer_hash_table_size;
782         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
783                      sizeof(cfs_list_t) *
784                      kptllnd_data.kptl_peer_hash_size);
785         if (kptllnd_data.kptl_peers == NULL) {
786                 CERROR("Failed to allocate space for peer hash table size=%d\n",
787                         kptllnd_data.kptl_peer_hash_size);
788                 rc = -ENOMEM;
789                 goto failed;
790         }
791         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
792                 CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
793
794         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
795
796         kptllnd_data.kptl_rx_cache =
797                 cfs_mem_cache_create("ptllnd_rx",
798                                      sizeof(kptl_rx_t) + 
799                                      *kptllnd_tunables.kptl_max_msg_size,
800                                      0,    /* offset */
801                                      0);   /* flags */
802         if (kptllnd_data.kptl_rx_cache == NULL) {
803                 CERROR("Can't create slab for RX descriptors\n");
804                 rc = -ENOMEM;
805                 goto failed;
806         }
807
808         /* lists/ptrs/locks initialised */
809         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
810
811         /*****************************************************/
812
813         rc = kptllnd_setup_tx_descs();
814         if (rc != 0) {
815                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
816                        *kptllnd_tunables.kptl_ntx, rc);
817                 goto failed;
818         }
819         
820         /* Start the scheduler threads for handling incoming requests.  No need
821          * to advance the state because this will be automatically cleaned up
822          * now that PTLLND_INIT_DATA state has been entered */
823         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
824         for (i = 0; i < PTLLND_N_SCHED; i++) {
825                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
826                 if (rc != 0) {
827                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
828                         goto failed;
829                 }
830         }
831
832         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
833         if (rc != 0) {
834                 CERROR("Can't spawn watchdog: %d\n", rc);
835                 goto failed;
836         }
837
838         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
839          * and we will still have enough buffers posted for all our peers */
840         spares = *kptllnd_tunables.kptl_rxb_nspare *
841                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
842                   *kptllnd_tunables.kptl_max_msg_size);
843
844         /* reserve and post the buffers */
845         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
846                                             kptllnd_data.kptl_expected_peers +
847                                             spares);
848         if (rc != 0) {
849                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
850                 goto failed;
851         }
852
853         /* flag everything initialised */
854         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
855
856         /*****************************************************/
857
858         if (*kptllnd_tunables.kptl_checksum)
859                 CWARN("Checksumming enabled\n");
860
861         CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
862         return 0;
863
864  failed:
865         CERROR("kptllnd_base_startup failed: %d\n", rc);
866         kptllnd_base_shutdown();
867         return rc;
868 }
869
870 int
871 kptllnd_startup (lnet_ni_t *ni)
872 {
873         int         rc;
874         kptl_net_t *net;
875
876         LASSERT (ni->ni_lnd == &kptllnd_lnd);
877
878         if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
879                 rc = kptllnd_base_startup();
880                 if (rc != 0)
881                         return rc;
882         }
883
884         LIBCFS_ALLOC(net, sizeof(*net));
885         ni->ni_data = net;
886         if (net == NULL) {
887                 CERROR("Can't allocate kptl_net_t\n");
888                 rc = -ENOMEM;
889                 goto failed;
890         }
891         memset(net, 0, sizeof(*net));
892         net->net_ni = ni;
893
894         ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
895         ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
896         ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
897         ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
898                                          kptllnd_data.kptl_portals_id.nid);
899         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
900                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
901                libcfs_nid2str(ni->ni_nid));
902
903         /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
904          * multiple NIs */
905         kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
906
907         cfs_atomic_set(&net->net_refcount, 1);
908         write_lock(&kptllnd_data.kptl_net_rw_lock);
909         cfs_list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
910         write_unlock(&kptllnd_data.kptl_net_rw_lock);
911         return 0;
912
913  failed:
914         kptllnd_shutdown(ni);
915         return rc;
916 }
917
918 void
919 kptllnd_shutdown (lnet_ni_t *ni)
920 {
921         kptl_net_t    *net = ni->ni_data;
922         int               i;
923         unsigned long     flags;
924
925         LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
926
927         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
928                cfs_atomic_read (&libcfs_kmemory));
929
930         if (net == NULL)
931                 goto out;
932
933         LASSERT (ni == net->net_ni);
934         LASSERT (!net->net_shutdown);
935         LASSERT (!cfs_list_empty(&net->net_list));
936         LASSERT (cfs_atomic_read(&net->net_refcount) != 0);
937         ni->ni_data = NULL;
938         net->net_ni = NULL;
939
940         write_lock(&kptllnd_data.kptl_net_rw_lock);
941         kptllnd_net_decref(net);
942         cfs_list_del_init(&net->net_list);
943         write_unlock(&kptllnd_data.kptl_net_rw_lock);
944
945         /* Can't nuke peers here - they are shared among all NIs */
946         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
947         net->net_shutdown = 1;   /* Order with peer creation */
948         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
949
950         i = 2;
951         while (cfs_atomic_read(&net->net_refcount) != 0) {
952                         i++;
953                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
954                        "Waiting for %d references to drop\n",
955                        cfs_atomic_read(&net->net_refcount));
956
957                        cfs_pause(cfs_time_seconds(1));
958                 }
959
960         LIBCFS_FREE(net, sizeof(*net));
961 out:
962         /* NB no locking since I don't race with writers */
963         if (cfs_list_empty(&kptllnd_data.kptl_nets))
964                 kptllnd_base_shutdown();
965         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
966                cfs_atomic_read (&libcfs_kmemory));
967         return;
968 }
969
970 int __init
971 kptllnd_module_init (void)
972 {
973         int    rc;
974
975         kptllnd_assert_wire_constants();
976
977         rc = kptllnd_tunables_init();
978         if (rc != 0)
979                 return rc;
980
981         kptllnd_init_ptltrace();
982
983         lnet_register_lnd(&kptllnd_lnd);
984
985         return 0;
986 }
987
988 void __exit
989 kptllnd_module_fini (void)
990 {
991         lnet_unregister_lnd(&kptllnd_lnd);
992         kptllnd_tunables_fini();
993 }
994
995 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
996 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
997 MODULE_LICENSE("GPL");
998
999 module_init(kptllnd_module_init);
1000 module_exit(kptllnd_module_fini);