Whamcloud - gitweb
5723c8aaec07e2bec939c5c5608b8159c6e39545
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 lnd_t kptllnd_lnd = {
22         .lnd_type       = PTLLND,
23         .lnd_startup    = kptllnd_startup,
24         .lnd_shutdown   = kptllnd_shutdown,
25         .lnd_ctl        = kptllnd_ctl,
26         .lnd_send       = kptllnd_send,
27         .lnd_recv       = kptllnd_recv,
28         .lnd_eager_recv = kptllnd_eager_recv,
29 };
30
31 kptl_data_t kptllnd_data;
32
33 char *
34 kptllnd_ptlid2str(ptl_process_id_t id)
35 {
36         static char    strs[64][32];
37         static int     idx = 0;
38
39         unsigned long  flags;
40         char          *str;
41         
42         spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
43         str = strs[idx++];
44         if (idx >= sizeof(strs)/sizeof(strs[0]))
45                 idx = 0;
46         spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
47
48         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
49         return str;
50 }
51
52 void 
53 kptllnd_assert_wire_constants (void)
54 {
55         /* Wire protocol assertions generated by 'wirecheck'
56          * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
57          * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
58
59
60         /* Constants... */
61         CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
62         CLASSERT (LNET_MSG_MATCHBITS == 0);
63         CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
64         CLASSERT (PTLLND_MSG_VERSION == 0x04);
65         CLASSERT (PTLLND_RDMA_OK == 0x00);
66         CLASSERT (PTLLND_RDMA_FAIL == 0x01);
67         CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
68         CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
69         CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
70         CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
71         CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
72         CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
73         CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
74
75         /* Checks for struct kptl_msg_t */
76         CLASSERT ((int)sizeof(kptl_msg_t) == 136);
77         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
78         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
79         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
80         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
81         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
82         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
83         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
84         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
85         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
86         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
87         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
88         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
89         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
90         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
91         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
92         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
93         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
94         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
95         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
96         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
97         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
98         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
99         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
100         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
101         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
102         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
103         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
104         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
105         CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
106         CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
107
108         /* Checks for struct kptl_immediate_msg_t */
109         CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
110         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
111         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
112         CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
113         CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
114
115         /* Checks for struct kptl_rdma_msg_t */
116         CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
117         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
118         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
119         CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
120         CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
121
122         /* Checks for struct kptl_hello_msg_t */
123         CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
124         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
125         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
126         CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
127         CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
128 }
129
130 const char *kptllnd_evtype2str(int type)
131 {
132 #define DO_TYPE(x) case x: return #x;
133         switch(type)
134         {
135                 DO_TYPE(PTL_EVENT_GET_START);
136                 DO_TYPE(PTL_EVENT_GET_END);
137                 DO_TYPE(PTL_EVENT_PUT_START);
138                 DO_TYPE(PTL_EVENT_PUT_END);
139                 DO_TYPE(PTL_EVENT_REPLY_START);
140                 DO_TYPE(PTL_EVENT_REPLY_END);
141                 DO_TYPE(PTL_EVENT_ACK);
142                 DO_TYPE(PTL_EVENT_SEND_START);
143                 DO_TYPE(PTL_EVENT_SEND_END);
144                 DO_TYPE(PTL_EVENT_UNLINK);
145         default:
146                 return "<unknown event type>";
147         }
148 #undef DO_TYPE
149 }
150
151 const char *kptllnd_msgtype2str(int type)
152 {
153 #define DO_TYPE(x) case x: return #x;
154         switch(type)
155         {
156                 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
157                 DO_TYPE(PTLLND_MSG_TYPE_PUT);
158                 DO_TYPE(PTLLND_MSG_TYPE_GET);
159                 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
160                 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
161                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
162                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
163         default:
164                 return "<unknown msg type>";
165         }
166 #undef DO_TYPE
167 }
168
169 __u32
170 kptllnd_cksum (void *ptr, int nob)
171 {
172         char  *c  = ptr;
173         __u32  sum = 0;
174
175         while (nob-- > 0)
176                 sum = ((sum << 1) | (sum >> 31)) + *c++;
177
178         /* ensure I don't return 0 (== no checksum) */
179         return (sum == 0) ? 1 : sum;
180 }
181
182 void
183 kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
184 {
185         msg->ptlm_type = type;
186         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
187         
188         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
189 }
190
191 void
192 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
193 {
194         msg->ptlm_magic    = PTLLND_MSG_MAGIC;
195         msg->ptlm_version  = PTLLND_MSG_VERSION;
196         /* msg->ptlm_type  Filled in kptllnd_init_msg()  */
197         msg->ptlm_credits  = peer->peer_outstanding_credits;
198         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
199         msg->ptlm_cksum    = 0;
200         msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
201         msg->ptlm_srcstamp = peer->peer_myincarnation;
202         msg->ptlm_dstnid   = peer->peer_id.nid;
203         msg->ptlm_dststamp = peer->peer_incarnation;
204         msg->ptlm_srcpid   = the_lnet.ln_pid;
205         msg->ptlm_dstpid   = peer->peer_id.pid;
206
207         if (*kptllnd_tunables.kptl_checksum) {
208                 /* NB ptlm_cksum zero while computing cksum */
209                 msg->ptlm_cksum = kptllnd_cksum(msg, 
210                                                 offsetof(kptl_msg_t, ptlm_u));
211         }
212 }
213
214 int
215 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
216 {
217         const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
218         __u32     msg_cksum;
219         __u16     msg_version;
220         int       flip;
221
222         /* 6 bytes are enough to have received magic + version */
223         if (nob < 6) {
224                 CERROR("Very Short message: %d\n", nob);
225                 return -EPROTO;
226         }
227
228         /*
229          * Determine if we need to flip
230          */
231         if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
232                 flip = 0;
233         } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
234                 flip = 1;
235         } else {
236                 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
237                 return -EPROTO;
238         }
239
240         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
241
242         if (msg_version != PTLLND_MSG_VERSION) {
243                 CERROR("Bad version: got %04x expected %04x\n",
244                         (__u32)msg_version, PTLLND_MSG_VERSION);
245                 return -EPROTO;
246         }
247
248         if (nob < hdr_size) {
249                 CERROR("Short message: got %d, wanted at least %d\n",
250                        nob, hdr_size);
251                 return -EPROTO;
252         }
253
254         /* checksum must be computed with
255          * 1) ptlm_cksum zero and
256          * 2) BEFORE anything gets modified/flipped
257          */
258         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
259         msg->ptlm_cksum = 0;
260         if (msg_cksum != 0 &&
261             msg_cksum != kptllnd_cksum(msg, hdr_size)) {
262                 CERROR("Bad checksum\n");
263                 return -EPROTO;
264         }
265
266         msg->ptlm_version = msg_version;
267         msg->ptlm_cksum = msg_cksum;
268         
269         if (flip) {
270                 /* These two are 1 byte long so we don't swap them
271                    But check this assumtion*/
272                 CLASSERT (sizeof(msg->ptlm_type) == 1);
273                 CLASSERT (sizeof(msg->ptlm_credits) == 1);
274                 /* src & dst stamps are opaque cookies */
275                 __swab32s(&msg->ptlm_nob);
276                 __swab64s(&msg->ptlm_srcnid);
277                 __swab64s(&msg->ptlm_dstnid);
278                 __swab32s(&msg->ptlm_srcpid);
279                 __swab32s(&msg->ptlm_dstpid);
280         }
281
282         if (msg->ptlm_nob != nob) {
283                 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
284                        msg->ptlm_nob, nob);
285                 return -EPROTO;
286         }
287
288         switch(msg->ptlm_type)
289         {
290         case PTLLND_MSG_TYPE_PUT:
291         case PTLLND_MSG_TYPE_GET:
292                 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
293                         CERROR("Short rdma request: got %d, want %d\n",
294                                nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
295                         return -EPROTO;
296                 }
297
298                 if (flip)
299                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
300
301                 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
302                         CERROR("Bad matchbits "LPX64"\n",
303                                msg->ptlm_u.rdma.kptlrm_matchbits);
304                         return -EPROTO;
305                 }
306                 break;
307
308         case PTLLND_MSG_TYPE_IMMEDIATE:
309                 if (nob < offsetof(kptl_msg_t, 
310                                    ptlm_u.immediate.kptlim_payload)) {
311                         CERROR("Short immediate: got %d, want %d\n", nob,
312                                (int)offsetof(kptl_msg_t, 
313                                              ptlm_u.immediate.kptlim_payload));
314                         return -EPROTO;
315                 }
316                 /* Do nothing */
317                 break;
318                         
319         case PTLLND_MSG_TYPE_NOOP:
320         case PTLLND_MSG_TYPE_NAK:
321                 /* Do nothing */
322                 break;
323
324         case PTLLND_MSG_TYPE_HELLO:
325                 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
326                         CERROR("Short hello: got %d want %d\n",
327                                nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
328                         return -EPROTO;
329                 }
330                 if (flip) {
331                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
332                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
333                 }
334                 break;
335
336         default:
337                 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
338                 return -EPROTO;
339         }
340
341         return 0;
342 }
343
344 int
345 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
346 {
347         struct libcfs_ioctl_data *data = arg;
348         int          rc = -EINVAL;
349
350         CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
351
352         /*
353          * Validate that the context block is actually
354          * pointing to this interface
355          */
356         LASSERT (ni == kptllnd_data.kptl_ni);
357
358         switch(cmd) {
359         case IOC_LIBCFS_DEL_PEER: {
360                 lnet_process_id_t id;
361                 
362                 id.nid = data->ioc_nid;
363                 id.pid = data->ioc_u32[1];
364                 
365                 rc = kptllnd_peer_del(id);
366                 break;
367         }
368
369         case IOC_LIBCFS_GET_PEER: {
370                 lnet_process_id_t   id = {.nid = LNET_NID_ANY,
371                                           .pid = LNET_PID_ANY};
372                 __u64               incarnation = 0;
373                 __u64               next_matchbits = 0;
374                 __u64               last_matchbits_seen = 0;
375                 int                 state = 0;
376                 int                 sent_hello = 0;
377                 int                 refcount = 0;
378                 int                 nsendq = 0;
379                 int                 nactiveq = 0;
380                 int                 credits = 0;
381                 int                 outstanding_credits = 0;
382
383                 rc = kptllnd_get_peer_info(data->ioc_count, &id,
384                                            &state, &sent_hello,
385                                            &refcount, &incarnation,
386                                            &next_matchbits, &last_matchbits_seen,
387                                            &nsendq, &nactiveq,
388                                            &credits, &outstanding_credits);
389                 /* wince... */
390                 data->ioc_nid = id.nid;
391                 data->ioc_net = state;
392                 data->ioc_flags  = sent_hello;
393                 data->ioc_count = refcount;
394                 data->ioc_u64[0] = incarnation;
395                 data->ioc_u32[0] = (__u32)next_matchbits;
396                 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
397                 data->ioc_u32[2] = (__u32)last_matchbits_seen;
398                 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
399                 data->ioc_u32[4] = id.pid;
400                 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
401                 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
402                 break;
403         }
404                 
405         default:
406                 rc=-EINVAL;
407                 break;
408         }
409         CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
410         return rc;
411 }
412
413 int
414 kptllnd_startup (lnet_ni_t *ni)
415 {
416         int             rc;
417         int             i;
418         int             spares;
419         struct timeval  tv;
420         ptl_err_t       ptl_rc;
421
422         LASSERT (ni->ni_lnd == &kptllnd_lnd);
423
424         if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
425                 CERROR("Only 1 instance supported\n");
426                 return -EPERM;
427         }
428
429         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
430                 CERROR("max_procs_per_node must be > 1\n");
431                 return -EINVAL;
432         }
433
434         *kptllnd_tunables.kptl_max_msg_size &= ~7;
435         if (*kptllnd_tunables.kptl_max_msg_size < sizeof(kptl_msg_t))
436                 *kptllnd_tunables.kptl_max_msg_size =
437                         (sizeof(kptl_msg_t) + 7) & ~7;
438         /*
439          * zero pointers, flags etc
440          * put everything into a known state.
441          */
442         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
443         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
444         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
445
446         /*
447          * Uptick the module reference count
448          */
449         PORTAL_MODULE_USE;
450
451         /*
452          * Setup pointers between the ni and context data block
453          */
454         kptllnd_data.kptl_ni = ni;
455         ni->ni_data = &kptllnd_data;
456
457         /*
458          * Setup Credits
459          */
460         ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
461         ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits;
462
463         kptllnd_data.kptl_expected_peers =
464                 *kptllnd_tunables.kptl_max_nodes *
465                 *kptllnd_tunables.kptl_max_procs_per_node;
466         
467         /*
468          * Initialize the Network interface instance
469          * We use the default because we don't have any
470          * way to choose a better interface.
471          * Requested and actual limits are ignored.
472          */
473         ptl_rc = PtlNIInit(
474 #ifdef _USING_LUSTRE_PORTALS_
475                 PTL_IFACE_DEFAULT,
476 #else
477                 CRAY_KERN_NAL,
478 #endif
479                 *kptllnd_tunables.kptl_pid, NULL, NULL,
480                 &kptllnd_data.kptl_nih);
481
482         /*
483          * Note: PTL_IFACE_DUP simply means that the requested
484          * interface was already inited and that we're sharing it.
485          * Which is ok.
486          */
487         if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
488                 CERROR ("PtlNIInit: error %d\n", ptl_rc);
489                 rc = -EINVAL;
490                 goto failed;
491         }
492
493         /* NB eq size irrelevant if using a callback */
494         ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
495                             8,                       /* size */
496                             kptllnd_eq_callback,     /* handler callback */
497                             &kptllnd_data.kptl_eqh); /* output handle */
498         if (ptl_rc != PTL_OK) {
499                 CERROR("PtlEQAlloc failed %d\n", ptl_rc);
500                 rc = -ENOMEM;
501                 goto failed;
502         }
503
504         /*
505          * Fetch the lower NID
506          */
507         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
508                           &kptllnd_data.kptl_portals_id);
509         if (ptl_rc != PTL_OK) {
510                 CERROR ("PtlGetID: error %d\n", ptl_rc);
511                 rc = -EINVAL;
512                 goto failed;
513         }
514
515         if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
516                 /* The kernel ptllnd must have the expected PID */
517                 CERROR("Unexpected PID: %u (%u expected)\n",
518                        kptllnd_data.kptl_portals_id.pid,
519                        *kptllnd_tunables.kptl_pid);
520                 rc = -EINVAL;
521                 goto failed;
522         }
523
524         ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
525
526         CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
527                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
528                libcfs_nid2str(ni->ni_nid));
529
530         /* Initialized the incarnation - it must be for-all-time unique, even
531          * accounting for the fact that we increment it when we disconnect a
532          * peer that's using it */
533         do_gettimeofday(&tv);
534         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
535                                         tv.tv_usec;
536         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
537
538         /*
539          * Setup the sched locks/lists/waitq
540          */
541         spin_lock_init(&kptllnd_data.kptl_sched_lock);
542         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
543         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
544         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
545         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
546
547         /*
548          * Setup the tx locks/lists
549          */
550         spin_lock_init(&kptllnd_data.kptl_tx_lock);
551         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
552         atomic_set(&kptllnd_data.kptl_ntx, 0);
553
554         /*
555          * Allocate and setup the peer hash table
556          */
557         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
558         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
559         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
560         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
561
562         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
563
564         kptllnd_data.kptl_peer_hash_size =
565                 *kptllnd_tunables.kptl_peer_hash_table_size;
566         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
567                      (kptllnd_data.kptl_peer_hash_size * 
568                       sizeof(struct list_head)));
569         if (kptllnd_data.kptl_peers == NULL) {
570                 CERROR("Failed to allocate space for peer hash table size=%d\n",
571                         kptllnd_data.kptl_peer_hash_size);
572                 rc = -ENOMEM;
573                 goto failed;
574         }
575         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
576                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
577
578         LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
579         if (kptllnd_data.kptl_nak_msg == NULL) {
580                 CERROR("Can't allocate NAK msg\n");
581                 rc = -ENOMEM;
582                 goto failed;
583         }
584         memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
585         kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
586         kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
587         kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
588         kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
589         kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
590         kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
591         kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
592         kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
593
594         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
595
596         kptllnd_data.kptl_rx_cache = 
597                 cfs_mem_cache_create("ptllnd_rx",
598                                      sizeof(kptl_rx_t) + 
599                                      *kptllnd_tunables.kptl_max_msg_size,
600                                      0,    /* offset */
601                                      0);   /* flags */
602         if (kptllnd_data.kptl_rx_cache == NULL) {
603                 CERROR("Can't create slab for RX descriptors\n");
604                 rc = -ENOMEM;
605                 goto failed;
606         }
607
608         /* lists/ptrs/locks initialised */
609         kptllnd_data.kptl_init = PTLLND_INIT_DATA;
610
611         /*****************************************************/
612
613         rc = kptllnd_setup_tx_descs();
614         if (rc != 0) {
615                 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
616                        *kptllnd_tunables.kptl_ntx, rc);
617                 goto failed;
618         }
619         
620         /* Start the scheduler threads for handling incoming requests.  No need
621          * to advance the state because this will be automatically cleaned up
622          * now that PTLNAT_INIT_DATA state has been entered */
623         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
624         for (i = 0; i < PTLLND_N_SCHED; i++) {
625                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
626                 if (rc != 0) {
627                         CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
628                         goto failed;
629                 }
630         }
631
632         rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
633         if (rc != 0) {
634                 CERROR("Can't spawn watchdog: %d\n", rc);
635                 goto failed;
636         }
637
638         /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
639          * and we will still have enough buffers posted for all our peers */
640         spares = *kptllnd_tunables.kptl_rxb_nspare *
641                  ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
642                   *kptllnd_tunables.kptl_max_msg_size);
643
644         /* reserve and post the buffers */
645         rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
646                                             kptllnd_data.kptl_expected_peers +
647                                             spares);
648         if (rc != 0) {
649                 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
650                 goto failed;
651         }
652
653         /* flag everything initialised */
654         kptllnd_data.kptl_init = PTLLND_INIT_ALL;
655
656         /*****************************************************/
657
658         if (*kptllnd_tunables.kptl_checksum)
659                 CWARN("Checksumming enabled\n");
660         
661         CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
662         return 0;
663
664  failed:
665         CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
666         kptllnd_shutdown(ni);
667         return rc;
668 }
669
670 void
671 kptllnd_shutdown (lnet_ni_t *ni)
672 {
673         int               i;
674         ptl_err_t         prc;
675         lnet_process_id_t process_id;
676         unsigned long     flags;
677
678         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
679                atomic_read (&libcfs_kmemory));
680
681         LASSERT (ni == kptllnd_data.kptl_ni);
682
683         switch (kptllnd_data.kptl_init) {
684         default:
685                 LBUG();
686
687         case PTLLND_INIT_ALL:
688         case PTLLND_INIT_DATA:
689                 /* Stop receiving */
690                 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
691                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
692                 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
693
694                 /* Hold peertable lock to interleave cleanly with peer birth/death */
695                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
696
697                 LASSERT (kptllnd_data.kptl_shutdown == 0);
698                 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
699
700                 /* no new peers possible now */
701                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
702                                         flags);
703
704                 /* nuke all existing peers */
705                 process_id.nid = LNET_NID_ANY;
706                 process_id.pid = LNET_PID_ANY;
707                 kptllnd_peer_del(process_id);
708
709                 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
710
711                 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
712
713                 i = 2;
714                 while (kptllnd_data.kptl_npeers != 0) {
715                         i++;
716                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
717                                "Waiting for %d peers to terminate\n",
718                                kptllnd_data.kptl_npeers);
719
720                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
721                                                flags);
722
723                         cfs_pause(cfs_time_seconds(1));
724
725                         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
726                                           flags);
727                 }
728
729                 LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
730                 LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
731                 LASSERT (kptllnd_data.kptl_peers != NULL);
732                 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
733                         LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
734
735                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
736                 CDEBUG(D_NET, "All peers deleted\n");
737
738                 /* Shutdown phase 2: kill the daemons... */
739                 kptllnd_data.kptl_shutdown = 2;
740                 mb();
741                 
742                 i = 2;
743                 while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
744                         /* Wake up all threads*/
745                         wake_up_all(&kptllnd_data.kptl_sched_waitq);
746                         wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
747
748                         i++;
749                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
750                                "Waiting for %d threads to terminate\n",
751                                atomic_read(&kptllnd_data.kptl_nthreads));
752                         cfs_pause(cfs_time_seconds(1));
753                 }
754
755                 CDEBUG(D_NET, "All Threads stopped\n");
756                 LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
757
758                 kptllnd_cleanup_tx_descs();
759
760                 /* Nothing here now, but libcfs might soon require
761                  * us to explicitly destroy wait queues and semaphores
762                  * that would be done here */
763
764                 /* fall through */
765
766         case PTLLND_INIT_NOTHING:
767                 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
768                 break;
769         }
770
771         if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
772                 prc = PtlEQFree(kptllnd_data.kptl_eqh);
773                 if (prc != PTL_OK)
774                         CERROR("Error %d freeing portals EQ\n", prc);
775         }
776
777         if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
778                 prc = PtlNIFini(kptllnd_data.kptl_nih);
779                 if (prc != PTL_OK)
780                         CERROR("Error %d finalizing portals NI\n", prc);
781         }
782         
783         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
784         LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
785
786         if (kptllnd_data.kptl_rx_cache != NULL)
787                 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
788
789         if (kptllnd_data.kptl_peers != NULL)
790                 LIBCFS_FREE (kptllnd_data.kptl_peers,
791                              sizeof (struct list_head) *
792                              kptllnd_data.kptl_peer_hash_size);
793
794         if (kptllnd_data.kptl_nak_msg != NULL)
795                 LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
796                              offsetof(kptl_msg_t, ptlm_u));
797
798         memset(&kptllnd_data, 0, sizeof(kptllnd_data));
799
800         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
801                atomic_read (&libcfs_kmemory));
802
803         PORTAL_MODULE_UNUSE;
804 }
805
806 int __init
807 kptllnd_module_init (void)
808 {
809         int    rc;
810
811         kptllnd_assert_wire_constants();
812
813         rc = kptllnd_tunables_init();
814         if (rc != 0)
815                 return rc;
816
817         kptllnd_init_ptltrace();
818
819         lnet_register_lnd(&kptllnd_lnd);
820
821         return 0;
822 }
823
824 void __exit
825 kptllnd_module_fini (void)
826 {
827         lnet_unregister_lnd(&kptllnd_lnd);
828         kptllnd_tunables_fini();
829 }
830
831 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
832 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
833 MODULE_LICENSE("GPL");
834
835 module_init(kptllnd_module_init);
836 module_exit(kptllnd_module_fini);