Whamcloud - gitweb
- should init kptllnd_data.kptl_idle_txs before calling kptllnd_shutdown.
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 lnd_t kptllnd_lnd = {
22         .lnd_type       = PTLLND,
23         .lnd_startup    = kptllnd_startup,
24         .lnd_shutdown   = kptllnd_shutdown,
25         .lnd_ctl        = kptllnd_ctl,
26         .lnd_send       = kptllnd_send,
27         .lnd_recv       = kptllnd_recv,
28         .lnd_eager_recv = kptllnd_eager_recv,
29 };
30
31 kptl_data_t kptllnd_data;
32
33 char *
34 kptllnd_ptlid2str(ptl_process_id_t id)
35 {
36         static char    strs[64][32];
37         static int     idx = 0;
38
39         unsigned long  flags;
40         char          *str;
41         
42         spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
43         str = strs[idx++];
44         if (idx >= sizeof(strs)/sizeof(strs[0]))
45                 idx = 0;
46         spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
47
48         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
49         return str;
50 }
51
52 void 
53 kptllnd_assert_wire_constants (void)
54 {
55         /* Wire protocol assertions generated by 'wirecheck'
56          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
57          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
58
59
60         /* Constants... */
61         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
62         CLASSERT (LNET_MSG_MATCHBITS == 0);
63         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
64         CLASSERT (PTLLND_MSG_VERSION == 0x04);
65         CLASSERT (PTLLND_RDMA_OK == 0x00);
66         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
67         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
68         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
69         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
70         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
71         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
72         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
73         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
74
75         /* Checks for struct kptl_msg_t */
76         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
77         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
78         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
79         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
80         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
81         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
82         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
83         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
84         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
85         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
86         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
87         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
88         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
89         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
90         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
91         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
92         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
93         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
94         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
95         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
96         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
97         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
98         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
99         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
100         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
101         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
102         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
103         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
104         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
105         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
106         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
107
108         /* Checks for struct kptl_immediate_msg_t */
109         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
110         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
111         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
112         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
113         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
114
115         /* Checks for struct kptl_rdma_msg_t */
116         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
117         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
118         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
119         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
120         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
121
122         /* Checks for struct kptl_hello_msg_t */
123         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
124         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
125         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
126         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
127         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
128 }
129
130 const char *kptllnd_evtype2str(int type)
131 {
132 #define DO_TYPE(x) case x: return #x;
133         switch(type)
134         {
135                 DO_TYPE(PTL_EVENT_GET_START);
136                 DO_TYPE(PTL_EVENT_GET_END);
137                 DO_TYPE(PTL_EVENT_PUT_START);
138                 DO_TYPE(PTL_EVENT_PUT_END);
139                 DO_TYPE(PTL_EVENT_REPLY_START);
140                 DO_TYPE(PTL_EVENT_REPLY_END);
141                 DO_TYPE(PTL_EVENT_ACK);
142                 DO_TYPE(PTL_EVENT_SEND_START);
143                 DO_TYPE(PTL_EVENT_SEND_END);
144                 DO_TYPE(PTL_EVENT_UNLINK);
145         default:
146                 return "<unknown event type>";
147         }
148 #undef DO_TYPE
149 }
150
151 const char *kptllnd_msgtype2str(int type)
152 {
153 #define DO_TYPE(x) case x: return #x;
154         switch(type)
155         {
156                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
157                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
158                 DO_TYPE(PTLLND_MSG_TYPE_GET);
159                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
160                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
161                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
162                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
163         default:
164                 return "<unknown msg type>";
165         }
166 #undef DO_TYPE
167 }
168
169 const char *kptllnd_errtype2str(int type)
170 {
171 #define DO_TYPE(x) case x: return #x;
172         switch(type)
173         {
174                 DO_TYPE(PTL_OK);
175                 DO_TYPE(PTL_SEGV);
176                 DO_TYPE(PTL_NO_SPACE);
177                 DO_TYPE(PTL_ME_IN_USE);
178                 DO_TYPE(PTL_NAL_FAILED);
179                 DO_TYPE(PTL_NO_INIT);
180                 DO_TYPE(PTL_IFACE_DUP);
181                 DO_TYPE(PTL_IFACE_INVALID);
182                 DO_TYPE(PTL_HANDLE_INVALID);
183                 DO_TYPE(PTL_MD_INVALID);
184                 DO_TYPE(PTL_ME_INVALID);
185                 DO_TYPE(PTL_PROCESS_INVALID);
186                 DO_TYPE(PTL_PT_INDEX_INVALID);
187                 DO_TYPE(PTL_SR_INDEX_INVALID);
188                 DO_TYPE(PTL_EQ_INVALID);
189                 DO_TYPE(PTL_EQ_DROPPED);
190                 DO_TYPE(PTL_EQ_EMPTY);
191                 DO_TYPE(PTL_MD_NO_UPDATE);
192                 DO_TYPE(PTL_FAIL);
193                 DO_TYPE(PTL_AC_INDEX_INVALID);
194                 DO_TYPE(PTL_MD_ILLEGAL);
195                 DO_TYPE(PTL_ME_LIST_TOO_LONG);
196                 DO_TYPE(PTL_MD_IN_USE);
197                 DO_TYPE(PTL_NI_INVALID);
198                 DO_TYPE(PTL_PID_INVALID);
199                 DO_TYPE(PTL_PT_FULL);
200                 DO_TYPE(PTL_VAL_FAILED);
201                 DO_TYPE(PTL_NOT_IMPLEMENTED);
202                 DO_TYPE(PTL_NO_ACK);
203                 DO_TYPE(PTL_EQ_IN_USE);
204                 DO_TYPE(PTL_PID_IN_USE);
205                 DO_TYPE(PTL_INV_EQ_SIZE);
206                 DO_TYPE(PTL_AGAIN);
207         default:
208                 return "<unknown event type>";
209         }
210 #undef DO_TYPE
211 }
212
213 __u32
214 kptllnd_cksum (void *ptr, int nob)
215 {
216         char  *c  = ptr;
217         __u32  sum = 0;
218
219         while (nob-- > 0)
220                 sum = ((sum << 1) | (sum >> 31)) + *c++;
221
222         /* ensure I don't return 0 (== no checksum) */
223         return (sum == 0) ? 1 : sum;
224 }
225
226 void
227 kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
228 {
229         msg->ptlm_type = type;
230         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
231         
232         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
233 }
234
235 void
236 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
237 {
238         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
239         msg->ptlm_version  = PTLLND_MSG_VERSION;
240         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
241         msg->ptlm_credits  = peer->peer_outstanding_credits;
242         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
243         msg->ptlm_cksum    = 0;
244         msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
245         msg->ptlm_srcstamp = peer->peer_myincarnation;
246         msg->ptlm_dstnid   = peer->peer_id.nid;
247         msg->ptlm_dststamp = peer->peer_incarnation;
248         msg->ptlm_srcpid   = the_lnet.ln_pid;
249         msg->ptlm_dstpid   = peer->peer_id.pid;
250
251         if (*kptllnd_tunables.kptl_checksum) {
252                 /* NB ptlm_cksum zero while computing cksum */
253                 msg->ptlm_cksum = kptllnd_cksum(msg, 
254                                                 offsetof(kptl_msg_t, ptlm_u));
255         }
256 }
257
258 int
259 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
260 {
261         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
262         __u32     msg_cksum;
263         __u16     msg_version;
264         int       flip;
265
266         /* 6 bytes are enough to have received magic + version */
267         if (nob < 6) {
268                 CERROR("Very Short message: %d\n", nob);
269                 return -EPROTO;
270         }
271
272         /*
273          * Determine if we need to flip
274          */
275         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
276                 flip = 0;
277         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
278                 flip = 1;
279         } else {
280                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
281                 return -EPROTO;
282         }
283
284         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
285
286         if (msg_version != PTLLND_MSG_VERSION) {
287                 CERROR("Bad version: got %04x expected %04x\n",
288                         (__u32)msg_version, PTLLND_MSG_VERSION);
289                 return -EPROTO;
290         }
291
292         if (nob < hdr_size) {
293                 CERROR("Short message: got %d, wanted at least %d\n",
294                        nob, hdr_size);
295                 return -EPROTO;
296         }
297
298         /* checksum must be computed with
299          * 1) ptlm_cksum zero and
300          * 2) BEFORE anything gets modified/flipped
301          */
302         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
303         msg->ptlm_cksum = 0;
304         if (msg_cksum != 0 &&
305             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
306                 CERROR("Bad checksum\n");
307                 return -EPROTO;
308         }
309
310         msg->ptlm_version = msg_version;
311         msg->ptlm_cksum = msg_cksum;
312         
313         if (flip) {
314                 /* These two are 1 byte long so we don't swap them
315                    But check this assumtion*/
316                 CLASSERT (sizeof(msg->ptlm_type) == 1);
317                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
318                 /* src & dst stamps are opaque cookies */
319                 __swab32s(&msg->ptlm_nob);
320                 __swab64s(&msg->ptlm_srcnid);
321                 __swab64s(&msg->ptlm_dstnid);
322                 __swab32s(&msg->ptlm_srcpid);
323                 __swab32s(&msg->ptlm_dstpid);
324         }
325
326         if (msg->ptlm_nob != nob) {
327                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
328                        msg->ptlm_nob, nob);
329                 return -EPROTO;
330         }
331
332         switch(msg->ptlm_type)
333         {
334         case PTLLND_MSG_TYPE_PUT:
335         case PTLLND_MSG_TYPE_GET:
336                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
337                         CERROR("Short rdma request: got %d, want %d\n",
338                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
339                         return -EPROTO;
340                 }
341
342                 if (flip)
343                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
344
345                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
346                         CERROR("Bad matchbits "LPX64"\n",
347                                msg->ptlm_u.rdma.kptlrm_matchbits);
348                         return -EPROTO;
349                 }
350                 break;
351
352         case PTLLND_MSG_TYPE_IMMEDIATE:
353                 if (nob < offsetof(kptl_msg_t, 
354                                    ptlm_u.immediate.kptlim_payload)) {
355                         CERROR("Short immediate: got %d, want %d\n", nob,
356                                (int)offsetof(kptl_msg_t, 
357                                              ptlm_u.immediate.kptlim_payload));
358                         return -EPROTO;
359                 }
360                 /* Do nothing */
361                 break;
362                         
363         case PTLLND_MSG_TYPE_NOOP:
364         case PTLLND_MSG_TYPE_NAK:
365                 /* Do nothing */
366                 break;
367
368         case PTLLND_MSG_TYPE_HELLO:
369                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
370                         CERROR("Short hello: got %d want %d\n",
371                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
372                         return -EPROTO;
373                 }
374                 if (flip) {
375                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
376                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
377                 }
378                 break;
379
380         default:
381                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
382                 return -EPROTO;
383         }
384
385         return 0;
386 }
387
388 int
389 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
390 {
391         struct libcfs_ioctl_data *data = arg;
392         int          rc = -EINVAL;
393
394         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
395
396         /*
397          * Validate that the context block is actually
398          * pointing to this interface
399          */
400         LASSERT (ni == kptllnd_data.kptl_ni);
401
402         switch(cmd) {
403         case IOC_LIBCFS_DEL_PEER: {
404                 lnet_process_id_t id;
405                 
406                 id.nid = data->ioc_nid;
407                 id.pid = data->ioc_u32[1];
408                 
409                 rc = kptllnd_peer_del(id);
410                 break;
411         }
412
413         case IOC_LIBCFS_GET_PEER: {
414                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
415                                           .pid = LNET_PID_ANY};
416                 __u64               incarnation = 0;
417                 __u64               next_matchbits = 0;
418                 __u64               last_matchbits_seen = 0;
419                 int                 state = 0;
420                 int                 sent_hello = 0;
421                 int                 refcount = 0;
422                 int                 nsendq = 0;
423                 int                 nactiveq = 0;
424                 int                 credits = 0;
425                 int                 outstanding_credits = 0;
426
427                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
428                                            &state, &sent_hello,
429                                            &refcount, &incarnation,
430                                            &next_matchbits, &last_matchbits_seen,
431                                            &nsendq, &nactiveq,
432                                            &credits, &outstanding_credits);
433                 /* wince... */
434                 data->ioc_nid = id.nid;
435                 data->ioc_net = state;
436                 data->ioc_flags  = sent_hello;
437                 data->ioc_count = refcount;
438                 data->ioc_u64[0] = incarnation;
439                 data->ioc_u32[0] = (__u32)next_matchbits;
440                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
441                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
442                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
443                 data->ioc_u32[4] = id.pid;
444                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
445                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
446                 break;
447         }
448                 
449         default:
450                 rc=-EINVAL;
451                 break;
452         }
453         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
454         return rc;
455 }
456
457 int
458 kptllnd_startup (lnet_ni_t *ni)
459 {
460         int             rc;
461         int             i;
462         int             spares;
463         struct timeval  tv;
464         ptl_err_t       ptl_rc;
465
466         LASSERT (ni->ni_lnd == &kptllnd_lnd);
467
468         if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
469                 CERROR("Only 1 instance supported\n");
470                 return -EPERM;
471         }
472
473         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
474                 CERROR("max_procs_per_node must be >= 1\n");
475                 return -EINVAL;
476         }
477
478         *kptllnd_tunables.kptl_max_msg_size &= ~7;
479         if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
480                 *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
481
482         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
483         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
484
485         /*
486          * zero pointers, flags etc
487          * put everything into a known state.
488          */
489         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
490         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
491         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
492
493         /*
494          * Setup the sched locks/lists/waitq
495          */
496         spin_lock_init(&kptllnd_data.kptl_sched_lock);
497         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
498         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
499         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
500         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
501
502         /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
503         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
504
505         /*
506          * Setup the tx locks/lists
507          */
508         spin_lock_init(&kptllnd_data.kptl_tx_lock);
509         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
510         atomic_set(&kptllnd_data.kptl_ntx, 0);
511
512         /*
513          * Uptick the module reference count
514          */
515         PORTAL_MODULE_USE;
516
517         /*
518          * Setup pointers between the ni and context data block
519          */
520         kptllnd_data.kptl_ni = ni;
521         ni->ni_data = &kptllnd_data;
522
523         /*
524          * Setup Credits
525          */
526         ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
527         ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits;
528
529         kptllnd_data.kptl_expected_peers =
530                 *kptllnd_tunables.kptl_max_nodes *
531                 *kptllnd_tunables.kptl_max_procs_per_node;
532         
533         /*
534          * Initialize the Network interface instance
535          * We use the default because we don't have any
536          * way to choose a better interface.
537          * Requested and actual limits are ignored.
538          */
539         ptl_rc = PtlNIInit(
540 #ifdef _USING_LUSTRE_PORTALS_
541                 PTL_IFACE_DEFAULT,
542 #else
543                 CRAY_KERN_NAL,
544 #endif
545                 *kptllnd_tunables.kptl_pid, NULL, NULL,
546                 &kptllnd_data.kptl_nih);
547
548         /*
549          * Note: PTL_IFACE_DUP simply means that the requested
550          * interface was already inited and that we're sharing it.
551          * Which is ok.
552          */
553         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
554                 CERROR ("PtlNIInit: error %s(%d)\n",
555                         kptllnd_errtype2str(ptl_rc), ptl_rc);
556                 rc = -EINVAL;
557                 goto failed;
558         }
559
560         /* NB eq size irrelevant if using a callback */
561         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
562                             8,                       /* size */
563                             kptllnd_eq_callback,     /* handler callback */
564                             &kptllnd_data.kptl_eqh); /* output handle */
565         if (ptl_rc != PTL_OK) {
566                 CERROR("PtlEQAlloc failed %s(%d)\n",
567                        kptllnd_errtype2str(ptl_rc), ptl_rc);
568                 rc = -ENOMEM;
569                 goto failed;
570         }
571
572         /*
573          * Fetch the lower NID
574          */
575         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
576                           &kptllnd_data.kptl_portals_id);
577         if (ptl_rc != PTL_OK) {
578                 CERROR ("PtlGetID: error %s(%d)\n",
579                         kptllnd_errtype2str(ptl_rc), ptl_rc);
580                 rc = -EINVAL;
581                 goto failed;
582         }
583
584         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
585                 /* The kernel ptllnd must have the expected PID */
586                 CERROR("Unexpected PID: %u (%u expected)\n",
587                        kptllnd_data.kptl_portals_id.pid,
588                        *kptllnd_tunables.kptl_pid);
589                 rc = -EINVAL;
590                 goto failed;
591         }
592
593         ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
594
595         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
596                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
597                libcfs_nid2str(ni->ni_nid));
598
599         /* Initialized the incarnation - it must be for-all-time unique, even
600          * accounting for the fact that we increment it when we disconnect a
601          * peer that's using it */
602         do_gettimeofday(&tv);
603         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
604                                         tv.tv_usec;
605         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
606
607         /*
608          * Allocate and setup the peer hash table
609          */
610         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
611         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
612         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
613         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
614
615         kptllnd_data.kptl_peer_hash_size =
616                 *kptllnd_tunables.kptl_peer_hash_table_size;
617         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
618                      (kptllnd_data.kptl_peer_hash_size * 
619                       sizeof(struct list_head)));
620         if (kptllnd_data.kptl_peers == NULL) {
621                 CERROR("Failed to allocate space for peer hash table size=%d\n",
622                         kptllnd_data.kptl_peer_hash_size);
623                 rc = -ENOMEM;
624                 goto failed;
625         }
626         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
627                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
628
629         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
630         if (kptllnd_data.kptl_nak_msg == NULL) {
631                 CERROR("Can't allocate NAK msg\n");
632                 rc = -ENOMEM;
633                 goto failed;
634         }
635         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
636         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
637         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
638         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
639         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
640         kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
641         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
642         kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
643         kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
644
645         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
646
647         kptllnd_data.kptl_rx_cache = 
648                 cfs_mem_cache_create("ptllnd_rx",
649                                      sizeof(kptl_rx_t) + 
650                                      *kptllnd_tunables.kptl_max_msg_size,
651                                      0,    /* offset */
652                                      0);   /* flags */
653         if (kptllnd_data.kptl_rx_cache == NULL) {
654                 CERROR("Can't create slab for RX descriptors\n");
655                 rc = -ENOMEM;
656                 goto failed;
657         }
658
659         /* lists/ptrs/locks initialised */
660         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
661
662         /*****************************************************/
663
664         rc = kptllnd_setup_tx_descs();
665         if (rc != 0) {
666                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
667                        *kptllnd_tunables.kptl_ntx, rc);
668                 goto failed;
669         }
670         
671         /* Start the scheduler threads for handling incoming requests.  No need
672          * to advance the state because this will be automatically cleaned up
673          * now that PTLNAT_INIT_DATA state has been entered */
674         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
675         for (i = 0; i < PTLLND_N_SCHED; i++) {
676                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
677                 if (rc != 0) {
678                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
679                         goto failed;
680                 }
681         }
682
683         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
684         if (rc != 0) {
685                 CERROR("Can't spawn watchdog: %d\n", rc);
686                 goto failed;
687         }
688
689         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
690          * and we will still have enough buffers posted for all our peers */
691         spares = *kptllnd_tunables.kptl_rxb_nspare *
692                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
693                   *kptllnd_tunables.kptl_max_msg_size);
694
695         /* reserve and post the buffers */
696         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
697                                             kptllnd_data.kptl_expected_peers +
698                                             spares);
699         if (rc != 0) {
700                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
701                 goto failed;
702         }
703
704         /* flag everything initialised */
705         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
706
707         /*****************************************************/
708
709         if (*kptllnd_tunables.kptl_checksum)
710                 CWARN("Checksumming enabled\n");
711         
712         CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
713         return 0;
714
715  failed:
716         CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
717         kptllnd_shutdown(ni);
718         return rc;
719 }
720
721 void
722 kptllnd_shutdown (lnet_ni_t *ni)
723 {
724         int               i;
725         ptl_err_t         prc;
726         lnet_process_id_t process_id;
727         unsigned long     flags;
728
729         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
730                atomic_read (&libcfs_kmemory));
731
732         LASSERT (ni == kptllnd_data.kptl_ni);
733
734         switch (kptllnd_data.kptl_init) {
735         default:
736                 LBUG();
737
738         case PTLLND_INIT_ALL:
739         case PTLLND_INIT_DATA:
740                 /* Stop receiving */
741                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
742                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
743                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
744
745                 /* Hold peertable lock to interleave cleanly with peer birth/death */
746                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
747
748                 LASSERT (kptllnd_data.kptl_shutdown == 0);
749                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
750
751                 /* no new peers possible now */
752                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
753                                         flags);
754
755                 /* nuke all existing peers */
756                 process_id.nid = LNET_NID_ANY;
757                 process_id.pid = LNET_PID_ANY;
758                 kptllnd_peer_del(process_id);
759
760                 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
761
762                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
763
764                 i = 2;
765                 while (kptllnd_data.kptl_npeers != 0) {
766                         i++;
767                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
768                                "Waiting for %d peers to terminate\n",
769                                kptllnd_data.kptl_npeers);
770
771                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
772                                                flags);
773
774                         cfs_pause(cfs_time_seconds(1));
775
776                         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
777                                           flags);
778                 }
779
780                 LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
781                 LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
782                 LASSERT (kptllnd_data.kptl_peers != NULL);
783                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
784                         LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
785
786                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
787                 CDEBUG(D_NET, "All peers deleted\n");
788
789                 /* Shutdown phase 2: kill the daemons... */
790                 kptllnd_data.kptl_shutdown = 2;
791                 mb();
792                 
793                 i = 2;
794                 while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
795                         /* Wake up all threads*/
796                         wake_up_all(&kptllnd_data.kptl_sched_waitq);
797                         wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
798
799                         i++;
800                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
801                                "Waiting for %d threads to terminate\n",
802                                atomic_read(&kptllnd_data.kptl_nthreads));
803                         cfs_pause(cfs_time_seconds(1));
804                 }
805
806                 CDEBUG(D_NET, "All Threads stopped\n");
807                 LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
808
809                 kptllnd_cleanup_tx_descs();
810
811                 /* Nothing here now, but libcfs might soon require
812                  * us to explicitly destroy wait queues and semaphores
813                  * that would be done here */
814
815                 /* fall through */
816
817         case PTLLND_INIT_NOTHING:
818                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
819                 break;
820         }
821
822         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
823                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
824                 if (prc != PTL_OK)
825                         CERROR("Error %s(%d) freeing portals EQ\n",
826                                kptllnd_errtype2str(prc), prc);
827         }
828
829         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
830                 prc = PtlNIFini(kptllnd_data.kptl_nih);
831                 if (prc != PTL_OK)
832                         CERROR("Error %s(%d) finalizing portals NI\n",
833                                kptllnd_errtype2str(prc), prc);
834         }
835         
836         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
837         LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
838
839         if (kptllnd_data.kptl_rx_cache != NULL)
840                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
841
842         if (kptllnd_data.kptl_peers != NULL)
843                 LIBCFS_FREE (kptllnd_data.kptl_peers,
844                              sizeof (struct list_head) *
845                              kptllnd_data.kptl_peer_hash_size);
846
847         if (kptllnd_data.kptl_nak_msg != NULL)
848                 LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
849                              offsetof(kptl_msg_t, ptlm_u));
850
851         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
852
853         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
854                atomic_read (&libcfs_kmemory));
855
856         PORTAL_MODULE_UNUSE;
857 }
858
859 int __init
860 kptllnd_module_init (void)
861 {
862         int    rc;
863
864         kptllnd_assert_wire_constants();
865
866         rc = kptllnd_tunables_init();
867         if (rc != 0)
868                 return rc;
869
870         kptllnd_init_ptltrace();
871
872         lnet_register_lnd(&kptllnd_lnd);
873
874         return 0;
875 }
876
877 void __exit
878 kptllnd_module_fini (void)
879 {
880         lnet_unregister_lnd(&kptllnd_lnd);
881         kptllnd_tunables_fini();
882 }
883
884 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
885 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
886 MODULE_LICENSE("GPL");
887
888 module_init(kptllnd_module_init);
889 module_exit(kptllnd_module_fini);