1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Based on ksocknal and qswnal
6 * Copyright (C) 2002 Cluster File Systems, Inc.
7 * Author: Robert Read <rread@datarithm.net>
9 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11 * Portals is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Portals is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Portals; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 * preallocate send buffers, store on list
27 * put receive buffers on queue, handle with receive threads
33 extern kgmnal_rx_t *kgm_add_recv(kgmnal_data_t *,int);
39 PORTAL_ALLOC(t, (sizeof(kgmnal_tx_t)));
44 put_trans(kgmnal_tx_t *t)
46 PORTAL_FREE(t, sizeof(kgmnal_tx_t));
50 kgmnal_ispeer (ptl_nid_t nid)
52 unsigned int gmnid = (unsigned int)nid;
55 gm_max_node_id_in_use(kgmnal_data.kgm_port, &nnids);
57 return ((ptl_nid_t)gmnid == nid &&/* didn't lose high bits on conversion ? */
58 gmnid < nnids); /* it's in this machine */
62 * LIB functions follow
66 kgmnal_read (nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
69 CDEBUG(D_NET, "0x%Lx: reading %ld bytes from %p -> %p\n",
70 nal->ni.nid, (long)len, src_addr, dst_addr );
71 memcpy( dst_addr, src_addr, len );
76 kgmnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
79 CDEBUG(D_NET, "0x%Lx: writing %ld bytes from %p -> %p\n",
80 nal->ni.nid, (long)len, src_addr, dst_addr );
81 memcpy( dst_addr, src_addr, len );
86 kgmnal_malloc(nal_cb_t *nal, size_t len)
90 PORTAL_ALLOC(buf, len);
95 kgmnal_free(nal_cb_t *nal, void *buf, size_t len)
97 PORTAL_FREE(buf, len);
101 kgmnal_printf(nal_cb_t *nal, const char *fmt, ...)
106 if (portal_debug & D_NET) {
108 vsnprintf( msg, sizeof(msg), fmt, ap );
111 printk("CPUId: %d %s",smp_processor_id(), msg);
117 kgmnal_cli(nal_cb_t *nal, unsigned long *flags)
119 kgmnal_data_t *data= nal->nal_data;
121 spin_lock_irqsave(&data->kgm_dispatch_lock,*flags);
126 kgmnal_sti(nal_cb_t *nal, unsigned long *flags)
128 kgmnal_data_t *data= nal->nal_data;
130 spin_unlock_irqrestore(&data->kgm_dispatch_lock,*flags);
135 kgmnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
137 /* network distance doesn't mean much for this nal */
138 if ( nal->ni.nid == nid ) {
147 /* FIXME rmr: add rounting code here */
149 kgmnal_tx_done(kgmnal_tx_t *trans, int error)
151 lib_finalize(trans->ktx_nal, trans->ktx_private, trans->ktx_cookie);
153 gm_dma_free(kgmnal_data.kgm_port, trans->ktx_buffer);
155 trans->ktx_buffer = NULL;
160 static char * gm_error_strings[GM_NUM_STATUS_CODES] = {
161 [GM_SUCCESS] = "GM_SUCCESS",
162 [GM_SEND_TIMED_OUT] = "GM_SEND_TIMED_OUT",
163 [GM_SEND_REJECTED] = "GM_SEND_REJECTED",
164 [GM_SEND_TARGET_PORT_CLOSED] = "GM_SEND_TARGET_PORT_CLOSED",
165 [GM_SEND_TARGET_NODE_UNREACHABLE] = "GM_SEND_TARGET_NODE_UNREACHABLE",
166 [GM_SEND_DROPPED] = "GM_SEND_DROPPED",
167 [GM_SEND_PORT_CLOSED] = "GM_SEND_PORT_CLOSED",
170 inline char * get_error(int status)
172 if (gm_error_strings[status] != NULL)
173 return gm_error_strings[status];
175 return "Unknown error";
179 kgmnal_errhandler(struct gm_port *p, void *context, gm_status_t status)
181 CDEBUG(D_NET,"error callback: ktx %p status %d\n", context, status);
185 kgmnal_txhandler(struct gm_port *p, void *context, gm_status_t status)
187 kgmnal_tx_t *ktx = (kgmnal_tx_t *)context;
191 LASSERT (ktx != NULL);
193 CDEBUG(D_NET,"ktx %p status %d nid 0x%x pid %d\n", ktx, status,
194 ktx->ktx_tgt_node, ktx->ktx_tgt_port_id);
196 switch((int)status) {
197 case GM_SUCCESS: /* normal */
199 case GM_SEND_TIMED_OUT: /* application error */
200 case GM_SEND_REJECTED: /* size of msg unacceptable */
201 case GM_SEND_TARGET_PORT_CLOSED:
202 CERROR("%s (%d):\n", get_error(status), status);
203 gm_resume_sending(kgmnal_data.kgm_port, ktx->ktx_priority,
204 ktx->ktx_tgt_node, ktx->ktx_tgt_port_id,
205 kgmnal_errhandler, NULL);
208 case GM_SEND_TARGET_NODE_UNREACHABLE:
209 case GM_SEND_PORT_CLOSED:
210 CERROR("%s (%d):\n", get_error(status), status);
211 gm_drop_sends(kgmnal_data.kgm_port, ktx->ktx_priority,
212 ktx->ktx_tgt_node, ktx->ktx_tgt_port_id,
213 kgmnal_errhandler, NULL);
216 case GM_SEND_DROPPED:
217 CERROR("%s (%d):\n", get_error(status), status);
221 CERROR("Unknown status: %d\n", status);
226 kgmnal_tx_done(ktx, err);
233 kgmnal_send(nal_cb_t *nal,
246 * ipnal assumes that this is the private as passed to lib_dispatch..
249 kgmnal_tx_t *ktx=NULL;
252 int buf_len = sizeof(ptl_hdr_t) + len;
255 LASSERT ((options & PTL_MD_KIOV) == 0);
257 PROF_START(gmnal_send);
260 CDEBUG(D_NET, "sending %d bytes from %p to nid: 0x%Lx pid %d\n",
261 len, iov, nid, KGM_PORT_NUM);
263 /* ensure there is an available tx handle */
265 /* save transaction info to trans for later finalize and cleanup */
272 /* hmmm... GM doesn't support vectored write, so need to allocate buffer to coalesce
274 Also, memory must be dma'able or registered with GM. */
276 if (buf_len <= MSG_LEN_SMALL) {
277 buf_size = MSG_SIZE_SMALL;
278 } else if (buf_len <= MSG_LEN_LARGE) {
279 buf_size = MSG_SIZE_LARGE;
281 printk("kgmnal:request exceeds TX MTU size (%d).\n",
287 buf = gm_dma_malloc(kgmnal_data.kgm_port, buf_len);
292 memcpy(buf, hdr, sizeof(ptl_hdr_t));
295 lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
296 options, niov, iov, len);
299 ktx->ktx_private = private;
300 ktx->ktx_cookie = cookie;
301 ktx->ktx_len = buf_len;
302 ktx->ktx_size = buf_size;
303 ktx->ktx_buffer = buf;
304 ktx->ktx_priority = GM_LOW_PRIORITY;
305 ktx->ktx_tgt_node = nid;
306 ktx->ktx_tgt_port_id = KGM_PORT_NUM;
308 CDEBUG(D_NET, "gm_send %d bytes (size %d) from %p to nid: 0x%Lx "
309 "pid %d pri %d\n", buf_len, buf_size, iov, nid, KGM_PORT_NUM,
312 gm_send_with_callback(kgmnal_data.kgm_port, buf, buf_size,
313 buf_len, GM_LOW_PRIORITY,
315 kgmnal_txhandler, ktx);
317 PROF_FINISH(gmnal_send);
322 kgmnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
324 CERROR ("forwarding not implemented\n");
328 kqswnal_fwd_callback (void *arg, int error)
330 CERROR ("forwarding not implemented\n");
335 kgmnal_requeue_rx(kgmnal_rx_t *krx)
337 gm_provide_receive_buffer(kgmnal_data.kgm_port, krx->krx_buffer,
338 krx->krx_size, krx->krx_priority);
341 /* Process a received portals packet */
343 /* Receive Interrupt Handler */
344 static void kgmnal_rx(kgmnal_data_t *kgm, unsigned long len, unsigned int size,
345 void * buf, unsigned int pri)
347 ptl_hdr_t *hdr = buf;
350 CDEBUG(D_NET,"buf %p, len %ld\n", buf, len);
352 if ( len < sizeof( ptl_hdr_t ) ) {
353 /* XXX what's this for? */
354 if (kgm->kgm_shuttingdown)
356 CERROR("kgmnal: did not receive complete portal header, "
358 gm_provide_receive_buffer(kgm->kgm_port, buf, size, pri);
362 /* might want to use seperate threads to handle receive */
363 krx.krx_buffer = buf;
366 krx.krx_priority = pri;
368 if ( hdr->dest_nid == kgmnal_lib.ni.nid ) {
369 PROF_START(lib_parse);
370 lib_parse(&kgmnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
371 PROF_FINISH(lib_parse);
372 } else if (kgmnal_ispeer(hdr->dest_nid)) {
373 /* should have gone direct to peer */
374 CERROR("dropping packet from 0x%llx to 0x%llx: target is "
375 "a peer", hdr->src_nid, hdr->dest_nid);
376 kgmnal_requeue_rx(&krx);
378 /* forward to gateway */
379 CERROR("forwarding not implemented yet");
380 kgmnal_requeue_rx(&krx);
387 static int kgmnal_recv(nal_cb_t *nal,
396 kgmnal_rx_t *krx = private;
398 LASSERT ((options & PTL_MD_KIOV) == 0);
400 CDEBUG(D_NET,"mlen=%d, rlen=%d\n", mlen, rlen);
402 /* What was actually received must be >= what sender claims to
403 * have sent. This is an LASSERT, since lib-move doesn't
404 * check cb return code yet. */
405 LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen);
406 LASSERT (mlen <= rlen);
408 PROF_START(gmnal_recv);
412 lib_copy_buf2iov (options, niov, iov,
413 krx->krx_buffer + sizeof (ptl_hdr_t), mlen);
417 PROF_START(lib_finalize);
418 lib_finalize(nal, private, cookie);
419 PROF_FINISH(lib_finalize);
421 kgmnal_requeue_rx(krx);
423 PROF_FINISH(gmnal_recv);
429 static void kgmnal_shutdown(void * none)
436 * Set terminate and use alarm to wake up the recv thread.
438 static void recv_shutdown(kgmnal_data_t *kgm)
442 kgm->kgm_shuttingdown = 1;
443 gm_initialize_alarm(&alarm);
444 gm_set_alarm(kgm->kgm_port, &alarm, 1, kgmnal_shutdown, NULL);
447 int kgmnal_end(kgmnal_data_t *kgm)
450 /* wait for sends to finish ? */
451 /* remove receive buffers */
452 /* shutdown receive thread */
459 /* Used only for the spinner */
460 int kgmnal_recv_thread(void *arg)
462 kgmnal_data_t *kgm = arg;
464 LASSERT(kgm != NULL);
466 kportal_daemonize("kgmnal_rx");
470 int priority = GM_LOW_PRIORITY;
471 if (kgm->kgm_shuttingdown)
474 e = gm_blocking_receive_no_spin(kgm->kgm_port);
476 CERROR("gm_blocking_receive returned NULL\n");
480 switch(gm_ntohc(e->recv.type)) {
481 case GM_HIGH_RECV_EVENT:
482 priority = GM_HIGH_PRIORITY;
485 kgmnal_rx(kgm, gm_ntohl(e->recv.length),
486 gm_ntohc(e->recv.size),
487 gm_ntohp(e->recv.buffer), priority);
490 CERROR("received alarm");
491 gm_unknown(kgm->kgm_port, e);
493 case GM_BAD_SEND_DETECTED_EVENT: /* ?? */
494 CERROR("received bad send!\n");
497 gm_unknown(kgm->kgm_port, e);
501 CERROR("shuttting down.\n");
505 nal_cb_t kgmnal_lib = {
506 nal_data: &kgmnal_data, /* NAL private data */
507 cb_send: kgmnal_send,
508 cb_recv: kgmnal_recv,
509 cb_read: kgmnal_read,
510 cb_write: kgmnal_write,
511 cb_malloc: kgmnal_malloc,
512 cb_free: kgmnal_free,
513 cb_printf: kgmnal_printf,