1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
23 .lnd_startup = kptllnd_startup,
24 .lnd_shutdown = kptllnd_shutdown,
25 .lnd_ctl = kptllnd_ctl,
26 .lnd_send = kptllnd_send,
27 .lnd_recv = kptllnd_recv,
28 .lnd_eager_recv = kptllnd_eager_recv,
31 kptl_data_t kptllnd_data;
34 kptllnd_ptlid2str(ptl_process_id_t id)
36 static char strs[64][32];
42 spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
44 if (idx >= sizeof(strs)/sizeof(strs[0]))
46 spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
48 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
53 kptllnd_assert_wire_constants (void)
55 /* Wire protocol assertions generated by 'wirecheck'
56 * running on Linux fedora 2.6.11-co-0.6.4 #1 Mon Jun 19 05:36:13 UTC 2006 i686 i686 i386 GNU
57 * with gcc version 4.1.1 20060525 (Red Hat 4.1.1-1) */
61 CLASSERT (PTL_RESERVED_MATCHBITS == 0x100);
62 CLASSERT (LNET_MSG_MATCHBITS == 0);
63 CLASSERT (PTLLND_MSG_MAGIC == 0x50746C4E);
64 CLASSERT (PTLLND_MSG_VERSION == 0x04);
65 CLASSERT (PTLLND_RDMA_OK == 0x00);
66 CLASSERT (PTLLND_RDMA_FAIL == 0x01);
67 CLASSERT (PTLLND_MSG_TYPE_INVALID == 0x00);
68 CLASSERT (PTLLND_MSG_TYPE_PUT == 0x01);
69 CLASSERT (PTLLND_MSG_TYPE_GET == 0x02);
70 CLASSERT (PTLLND_MSG_TYPE_IMMEDIATE == 0x03);
71 CLASSERT (PTLLND_MSG_TYPE_NOOP == 0x04);
72 CLASSERT (PTLLND_MSG_TYPE_HELLO == 0x05);
73 CLASSERT (PTLLND_MSG_TYPE_NAK == 0x06);
75 /* Checks for struct kptl_msg_t */
76 CLASSERT ((int)sizeof(kptl_msg_t) == 136);
77 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_magic) == 0);
78 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_magic) == 4);
79 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_version) == 4);
80 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_version) == 2);
81 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_type) == 6);
82 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_type) == 1);
83 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_credits) == 7);
84 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_credits) == 1);
85 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_nob) == 8);
86 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_nob) == 4);
87 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_cksum) == 12);
88 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_cksum) == 4);
89 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcnid) == 16);
90 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcnid) == 8);
91 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcstamp) == 24);
92 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcstamp) == 8);
93 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstnid) == 32);
94 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstnid) == 8);
95 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dststamp) == 40);
96 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dststamp) == 8);
97 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_srcpid) == 48);
98 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_srcpid) == 4);
99 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_dstpid) == 52);
100 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_dstpid) == 4);
101 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.immediate) == 56);
102 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.immediate) == 72);
103 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.rdma) == 56);
104 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.rdma) == 80);
105 CLASSERT ((int)offsetof(kptl_msg_t, ptlm_u.hello) == 56);
106 CLASSERT ((int)sizeof(((kptl_msg_t *)0)->ptlm_u.hello) == 12);
108 /* Checks for struct kptl_immediate_msg_t */
109 CLASSERT ((int)sizeof(kptl_immediate_msg_t) == 72);
110 CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_hdr) == 0);
111 CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_hdr) == 72);
112 CLASSERT ((int)offsetof(kptl_immediate_msg_t, kptlim_payload[13]) == 85);
113 CLASSERT ((int)sizeof(((kptl_immediate_msg_t *)0)->kptlim_payload[13]) == 1);
115 /* Checks for struct kptl_rdma_msg_t */
116 CLASSERT ((int)sizeof(kptl_rdma_msg_t) == 80);
117 CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_hdr) == 0);
118 CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_hdr) == 72);
119 CLASSERT ((int)offsetof(kptl_rdma_msg_t, kptlrm_matchbits) == 72);
120 CLASSERT ((int)sizeof(((kptl_rdma_msg_t *)0)->kptlrm_matchbits) == 8);
122 /* Checks for struct kptl_hello_msg_t */
123 CLASSERT ((int)sizeof(kptl_hello_msg_t) == 12);
124 CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_matchbits) == 0);
125 CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_matchbits) == 8);
126 CLASSERT ((int)offsetof(kptl_hello_msg_t, kptlhm_max_msg_size) == 8);
127 CLASSERT ((int)sizeof(((kptl_hello_msg_t *)0)->kptlhm_max_msg_size) == 4);
130 const char *kptllnd_evtype2str(int type)
132 #define DO_TYPE(x) case x: return #x;
135 DO_TYPE(PTL_EVENT_GET_START);
136 DO_TYPE(PTL_EVENT_GET_END);
137 DO_TYPE(PTL_EVENT_PUT_START);
138 DO_TYPE(PTL_EVENT_PUT_END);
139 DO_TYPE(PTL_EVENT_REPLY_START);
140 DO_TYPE(PTL_EVENT_REPLY_END);
141 DO_TYPE(PTL_EVENT_ACK);
142 DO_TYPE(PTL_EVENT_SEND_START);
143 DO_TYPE(PTL_EVENT_SEND_END);
144 DO_TYPE(PTL_EVENT_UNLINK);
146 return "<unknown event type>";
151 const char *kptllnd_msgtype2str(int type)
153 #define DO_TYPE(x) case x: return #x;
156 DO_TYPE(PTLLND_MSG_TYPE_INVALID);
157 DO_TYPE(PTLLND_MSG_TYPE_PUT);
158 DO_TYPE(PTLLND_MSG_TYPE_GET);
159 DO_TYPE(PTLLND_MSG_TYPE_IMMEDIATE);
160 DO_TYPE(PTLLND_MSG_TYPE_HELLO);
161 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
162 DO_TYPE(PTLLND_MSG_TYPE_NAK);
164 return "<unknown msg type>";
170 kptllnd_cksum (void *ptr, int nob)
176 sum = ((sum << 1) | (sum >> 31)) + *c++;
178 /* ensure I don't return 0 (== no checksum) */
179 return (sum == 0) ? 1 : sum;
183 kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
185 msg->ptlm_type = type;
186 msg->ptlm_nob = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
188 LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
192 kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
194 msg->ptlm_magic = PTLLND_MSG_MAGIC;
195 msg->ptlm_version = PTLLND_MSG_VERSION;
196 /* msg->ptlm_type Filled in kptllnd_init_msg() */
197 msg->ptlm_credits = peer->peer_outstanding_credits;
198 /* msg->ptlm_nob Filled in kptllnd_init_msg() */
200 msg->ptlm_srcnid = kptllnd_data.kptl_ni->ni_nid;
201 msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
202 msg->ptlm_dstnid = peer->peer_id.nid;
203 msg->ptlm_dststamp = peer->peer_incarnation;
204 msg->ptlm_srcpid = the_lnet.ln_pid;
205 msg->ptlm_dstpid = peer->peer_id.pid;
207 if (*kptllnd_tunables.kptl_checksum) {
208 /* NB ptlm_cksum zero while computing cksum */
209 msg->ptlm_cksum = kptllnd_cksum(msg,
210 offsetof(kptl_msg_t, ptlm_u));
215 kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
217 const int hdr_size = offsetof(kptl_msg_t, ptlm_u);
222 /* 6 bytes are enough to have received magic + version */
224 CERROR("Very Short message: %d\n", nob);
229 * Determine if we need to flip
231 if (msg->ptlm_magic == PTLLND_MSG_MAGIC) {
233 } else if (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC)) {
236 CERROR("Bad magic: %08x\n", msg->ptlm_magic);
240 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
242 if (msg_version != PTLLND_MSG_VERSION) {
243 CERROR("Bad version: got %04x expected %04x\n",
244 (__u32)msg_version, PTLLND_MSG_VERSION);
248 if (nob < hdr_size) {
249 CERROR("Short message: got %d, wanted at least %d\n",
254 /* checksum must be computed with
255 * 1) ptlm_cksum zero and
256 * 2) BEFORE anything gets modified/flipped
258 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
260 if (msg_cksum != 0 &&
261 msg_cksum != kptllnd_cksum(msg, hdr_size)) {
262 CERROR("Bad checksum\n");
266 msg->ptlm_version = msg_version;
267 msg->ptlm_cksum = msg_cksum;
270 /* These two are 1 byte long so we don't swap them
271 But check this assumtion*/
272 CLASSERT (sizeof(msg->ptlm_type) == 1);
273 CLASSERT (sizeof(msg->ptlm_credits) == 1);
274 /* src & dst stamps are opaque cookies */
275 __swab32s(&msg->ptlm_nob);
276 __swab64s(&msg->ptlm_srcnid);
277 __swab64s(&msg->ptlm_dstnid);
278 __swab32s(&msg->ptlm_srcpid);
279 __swab32s(&msg->ptlm_dstpid);
282 if (msg->ptlm_nob != nob) {
283 CERROR("msg_nob corrupt: got 0x%08x, wanted %08x\n",
288 switch(msg->ptlm_type)
290 case PTLLND_MSG_TYPE_PUT:
291 case PTLLND_MSG_TYPE_GET:
292 if (nob < hdr_size + sizeof(kptl_rdma_msg_t)) {
293 CERROR("Short rdma request: got %d, want %d\n",
294 nob, hdr_size + (int)sizeof(kptl_rdma_msg_t));
299 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
301 if (msg->ptlm_u.rdma.kptlrm_matchbits < PTL_RESERVED_MATCHBITS) {
302 CERROR("Bad matchbits "LPX64"\n",
303 msg->ptlm_u.rdma.kptlrm_matchbits);
308 case PTLLND_MSG_TYPE_IMMEDIATE:
309 if (nob < offsetof(kptl_msg_t,
310 ptlm_u.immediate.kptlim_payload)) {
311 CERROR("Short immediate: got %d, want %d\n", nob,
312 (int)offsetof(kptl_msg_t,
313 ptlm_u.immediate.kptlim_payload));
319 case PTLLND_MSG_TYPE_NOOP:
320 case PTLLND_MSG_TYPE_NAK:
324 case PTLLND_MSG_TYPE_HELLO:
325 if (nob < hdr_size + sizeof(kptl_hello_msg_t)) {
326 CERROR("Short hello: got %d want %d\n",
327 nob, hdr_size + (int)sizeof(kptl_hello_msg_t));
331 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
332 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
337 CERROR("Bad message type: 0x%02x\n", (__u32)msg->ptlm_type);
345 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
347 struct libcfs_ioctl_data *data = arg;
350 CDEBUG(D_NET, ">>> kptllnd_ctl cmd=%u arg=%p\n", cmd, arg);
353 * Validate that the context block is actually
354 * pointing to this interface
356 LASSERT (ni == kptllnd_data.kptl_ni);
359 case IOC_LIBCFS_DEL_PEER: {
360 lnet_process_id_t id;
362 id.nid = data->ioc_nid;
363 id.pid = data->ioc_u32[1];
365 rc = kptllnd_peer_del(id);
369 case IOC_LIBCFS_GET_PEER: {
370 lnet_process_id_t id = {.nid = LNET_NID_ANY,
371 .pid = LNET_PID_ANY};
372 __u64 incarnation = 0;
373 __u64 next_matchbits = 0;
374 __u64 last_matchbits_seen = 0;
381 int outstanding_credits = 0;
383 rc = kptllnd_get_peer_info(data->ioc_count, &id,
385 &refcount, &incarnation,
386 &next_matchbits, &last_matchbits_seen,
388 &credits, &outstanding_credits);
390 data->ioc_nid = id.nid;
391 data->ioc_net = state;
392 data->ioc_flags = sent_hello;
393 data->ioc_count = refcount;
394 data->ioc_u64[0] = incarnation;
395 data->ioc_u32[0] = (__u32)next_matchbits;
396 data->ioc_u32[1] = (__u32)(next_matchbits >> 32);
397 data->ioc_u32[2] = (__u32)last_matchbits_seen;
398 data->ioc_u32[3] = (__u32)(last_matchbits_seen >> 32);
399 data->ioc_u32[4] = id.pid;
400 data->ioc_u32[5] = (nsendq << 16) | nactiveq;
401 data->ioc_u32[6] = (credits << 16) | outstanding_credits;
409 CDEBUG(D_NET, "<<< kptllnd_ctl rc=%d\n", rc);
414 kptllnd_startup (lnet_ni_t *ni)
422 LASSERT (ni->ni_lnd == &kptllnd_lnd);
424 if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
425 CERROR("Only 1 instance supported\n");
429 if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
430 CERROR("max_procs_per_node must be > 1\n");
434 *kptllnd_tunables.kptl_max_msg_size &= ~7;
435 if (*kptllnd_tunables.kptl_max_msg_size < sizeof(kptl_msg_t))
436 *kptllnd_tunables.kptl_max_msg_size =
437 (sizeof(kptl_msg_t) + 7) & ~7;
439 * zero pointers, flags etc
440 * put everything into a known state.
442 memset (&kptllnd_data, 0, sizeof (kptllnd_data));
443 kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
444 kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
447 * Uptick the module reference count
452 * Setup pointers between the ni and context data block
454 kptllnd_data.kptl_ni = ni;
455 ni->ni_data = &kptllnd_data;
460 ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
461 ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits;
463 kptllnd_data.kptl_expected_peers =
464 *kptllnd_tunables.kptl_max_nodes *
465 *kptllnd_tunables.kptl_max_procs_per_node;
468 * Initialize the Network interface instance
469 * We use the default because we don't have any
470 * way to choose a better interface.
471 * Requested and actual limits are ignored.
474 #ifdef _USING_LUSTRE_PORTALS_
479 *kptllnd_tunables.kptl_pid, NULL, NULL,
480 &kptllnd_data.kptl_nih);
483 * Note: PTL_IFACE_DUP simply means that the requested
484 * interface was already inited and that we're sharing it.
487 if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) {
488 CERROR ("PtlNIInit: error %d\n", ptl_rc);
493 /* NB eq size irrelevant if using a callback */
494 ptl_rc = PtlEQAlloc(kptllnd_data.kptl_nih,
496 kptllnd_eq_callback, /* handler callback */
497 &kptllnd_data.kptl_eqh); /* output handle */
498 if (ptl_rc != PTL_OK) {
499 CERROR("PtlEQAlloc failed %d\n", ptl_rc);
505 * Fetch the lower NID
507 ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
508 &kptllnd_data.kptl_portals_id);
509 if (ptl_rc != PTL_OK) {
510 CERROR ("PtlGetID: error %d\n", ptl_rc);
515 if (kptllnd_data.kptl_portals_id.pid != *kptllnd_tunables.kptl_pid) {
516 /* The kernel ptllnd must have the expected PID */
517 CERROR("Unexpected PID: %u (%u expected)\n",
518 kptllnd_data.kptl_portals_id.pid,
519 *kptllnd_tunables.kptl_pid);
524 ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
526 CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
527 kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
528 libcfs_nid2str(ni->ni_nid));
531 * Initialized the incarnation
533 do_gettimeofday(&tv);
534 kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
536 CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
539 * Setup the sched locks/lists/waitq
541 spin_lock_init(&kptllnd_data.kptl_sched_lock);
542 init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
543 INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
544 INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
545 INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
548 * Setup the tx locks/lists
550 spin_lock_init(&kptllnd_data.kptl_tx_lock);
551 INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
552 atomic_set(&kptllnd_data.kptl_ntx, 0);
555 * Allocate and setup the peer hash table
557 rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
558 init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
559 INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
560 INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
562 spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
564 kptllnd_data.kptl_peer_hash_size =
565 *kptllnd_tunables.kptl_peer_hash_table_size;
566 LIBCFS_ALLOC(kptllnd_data.kptl_peers,
567 (kptllnd_data.kptl_peer_hash_size *
568 sizeof(struct list_head)));
569 if (kptllnd_data.kptl_peers == NULL) {
570 CERROR("Failed to allocate space for peer hash table size=%d\n",
571 kptllnd_data.kptl_peer_hash_size);
575 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
576 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
578 LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
579 if (kptllnd_data.kptl_nak_msg == NULL) {
580 CERROR("Can't allocate NAK msg\n");
584 memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
585 kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
586 kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC;
587 kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION;
588 kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid;
589 kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
590 kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
591 kptllnd_data.kptl_nak_msg->ptlm_dstpid = LNET_PID_ANY;
592 kptllnd_data.kptl_nak_msg->ptlm_dstnid = LNET_NID_ANY;
594 kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
596 kptllnd_data.kptl_rx_cache =
597 cfs_mem_cache_create("ptllnd_rx",
599 *kptllnd_tunables.kptl_max_msg_size,
602 if (kptllnd_data.kptl_rx_cache == NULL) {
603 CERROR("Can't create slab for RX descriptors\n");
608 /* lists/ptrs/locks initialised */
609 kptllnd_data.kptl_init = PTLLND_INIT_DATA;
611 /*****************************************************/
613 rc = kptllnd_setup_tx_descs();
615 CERROR("Can't pre-allocate %d TX descriptors: %d\n",
616 *kptllnd_tunables.kptl_ntx, rc);
620 /* Start the scheduler threads for handling incoming requests. No need
621 * to advance the state because this will be automatically cleaned up
622 * now that PTLNAT_INIT_DATA state has been entered */
623 CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
624 for (i = 0; i < PTLLND_N_SCHED; i++) {
625 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
627 CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);
632 rc = kptllnd_thread_start(kptllnd_watchdog, NULL);
634 CERROR("Can't spawn watchdog: %d\n", rc);
638 /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)
639 * and we will still have enough buffers posted for all our peers */
640 spares = *kptllnd_tunables.kptl_rxb_nspare *
641 ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/
642 *kptllnd_tunables.kptl_max_msg_size);
644 /* reserve and post the buffers */
645 rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,
646 kptllnd_data.kptl_expected_peers +
649 CERROR("Can't reserve RX Buffer pool: %d\n", rc);
653 /* flag everything initialised */
654 kptllnd_data.kptl_init = PTLLND_INIT_ALL;
656 /*****************************************************/
658 if (*kptllnd_tunables.kptl_checksum)
659 CWARN("Checksumming enabled\n");
661 CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
665 CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
666 kptllnd_shutdown(ni);
671 kptllnd_shutdown (lnet_ni_t *ni)
675 lnet_process_id_t process_id;
678 CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
679 atomic_read (&libcfs_kmemory));
681 LASSERT (ni == kptllnd_data.kptl_ni);
683 switch (kptllnd_data.kptl_init) {
687 case PTLLND_INIT_ALL:
688 case PTLLND_INIT_DATA:
690 kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
691 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
692 LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
694 /* Hold peertable lock to interleave cleanly with peer birth/death */
695 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
697 LASSERT (kptllnd_data.kptl_shutdown == 0);
698 kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
700 /* no new peers possible now */
701 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
704 /* nuke all existing peers */
705 process_id.nid = LNET_NID_ANY;
706 process_id.pid = LNET_PID_ANY;
707 kptllnd_peer_del(process_id);
709 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
711 LASSERT (kptllnd_data.kptl_n_active_peers == 0);
714 while (kptllnd_data.kptl_npeers != 0) {
716 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
717 "Waiting for %d peers to terminate\n",
718 kptllnd_data.kptl_npeers);
720 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
723 cfs_pause(cfs_time_seconds(1));
725 read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
729 LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
730 LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
731 LASSERT (kptllnd_data.kptl_peers != NULL);
732 for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
733 LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
735 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
736 CDEBUG(D_NET, "All peers deleted\n");
738 /* Shutdown phase 2: kill the daemons... */
739 kptllnd_data.kptl_shutdown = 2;
743 while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
744 /* Wake up all threads*/
745 wake_up_all(&kptllnd_data.kptl_sched_waitq);
746 wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
749 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
750 "Waiting for %d threads to terminate\n",
751 atomic_read(&kptllnd_data.kptl_nthreads));
752 cfs_pause(cfs_time_seconds(1));
755 CDEBUG(D_NET, "All Threads stopped\n");
756 LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
758 kptllnd_cleanup_tx_descs();
760 /* Nothing here now, but libcfs might soon require
761 * us to explicitly destroy wait queues and semaphores
762 * that would be done here */
766 case PTLLND_INIT_NOTHING:
767 CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
771 if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
772 prc = PtlEQFree(kptllnd_data.kptl_eqh);
774 CERROR("Error %d freeing portals EQ\n", prc);
777 if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
778 prc = PtlNIFini(kptllnd_data.kptl_nih);
780 CERROR("Error %d finalizing portals NI\n", prc);
783 LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
784 LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
786 if (kptllnd_data.kptl_rx_cache != NULL)
787 cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
789 if (kptllnd_data.kptl_peers != NULL)
790 LIBCFS_FREE (kptllnd_data.kptl_peers,
791 sizeof (struct list_head) *
792 kptllnd_data.kptl_peer_hash_size);
794 if (kptllnd_data.kptl_nak_msg != NULL)
795 LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
796 offsetof(kptl_msg_t, ptlm_u));
798 memset(&kptllnd_data, 0, sizeof(kptllnd_data));
800 CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
801 atomic_read (&libcfs_kmemory));
807 kptllnd_module_init (void)
811 kptllnd_assert_wire_constants();
813 rc = kptllnd_tunables_init();
817 kptllnd_init_ptltrace();
819 lnet_register_lnd(&kptllnd_lnd);
825 kptllnd_module_fini (void)
827 lnet_unregister_lnd(&kptllnd_lnd);
828 kptllnd_tunables_fini();
831 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
832 MODULE_DESCRIPTION("Kernel Portals LND v1.00");
833 MODULE_LICENSE("GPL");
835 module_init(kptllnd_module_init);
836 module_exit(kptllnd_module_fini);