1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Message decoding, parsing and finalizing routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_PORTALS
30 # include <libcfs/kp30.h>
33 #include <portals/lib-p30.h>
36 lib_enq_event_locked (lib_nal_t *nal, void *private,
37 lib_eq_t *eq, ptl_event_t *ev)
41 /* Allocate the next queue slot */
42 ev->link = ev->sequence = eq->eq_enq_seq++;
43 /* NB we don't support START events yet and we don't create a separate
44 * UNLINK event unless an explicit unlink succeeds, so the link
45 * sequence is pretty useless */
47 /* We don't support different uid/jids yet */
51 /* size must be a power of 2 to handle sequence # overflow */
52 LASSERT (eq->eq_size != 0 &&
53 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
54 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
56 /* There is no race since both event consumers and event producers
57 * take the LIB_LOCK(), so we don't screw around with memory
58 * barriers, setting the sequence number last or wierd structure
59 * layout assertions. */
62 /* Call the callback handler (if any) */
63 if (eq->eq_callback != NULL)
64 eq->eq_callback (eq_slot);
66 /* Wake anyone sleeping for an event (see lib-eq.c) */
68 if (cfs_waitq_active(&nal->libnal_ni.ni_waitq))
69 cfs_waitq_broadcast(&nal->libnal_ni.ni_waitq);
71 pthread_cond_broadcast(&nal->libnal_ni.ni_cond);
76 lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
87 /* Only send an ACK if the PUT completed successfully */
88 if (status == PTL_OK &&
89 !ptl_is_wire_handle_none(&msg->ack_wmd)) {
91 LASSERT(msg->ev.type == PTL_EVENT_PUT_END);
93 memset (&ack, 0, sizeof (ack));
94 ack.type = cpu_to_le32(PTL_MSG_ACK);
95 ack.dest_nid = cpu_to_le64(msg->ev.initiator.nid);
96 ack.dest_pid = cpu_to_le32(msg->ev.initiator.pid);
97 ack.src_nid = cpu_to_le64(nal->libnal_ni.ni_pid.nid);
98 ack.src_pid = cpu_to_le32(nal->libnal_ni.ni_pid.pid);
99 ack.payload_length = 0;
101 ack.msg.ack.dst_wmd = msg->ack_wmd;
102 ack.msg.ack.match_bits = msg->ev.match_bits;
103 ack.msg.ack.mlength = cpu_to_le32(msg->ev.mlength);
105 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
106 msg->ev.initiator.nid, msg->ev.initiator.pid,
109 /* send failed: there's nothing else to clean up. */
110 CERROR("Error %d sending ACK to "LPX64"\n",
111 rc, msg->ev.initiator.nid);
117 LIB_LOCK(nal, flags);
119 /* Now it's safe to drop my caller's ref */
121 LASSERT (md->pending >= 0);
123 /* Should I unlink this MD? */
124 if (md->pending != 0) /* other refs */
126 else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0)
128 else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0)
131 unlink = lib_md_exhausted(md);
133 msg->ev.ni_fail_type = status;
134 msg->ev.unlinked = unlink;
137 lib_enq_event_locked(nal, private, md->eq, &msg->ev);
140 lib_md_unlink(nal, md);
142 list_del (&msg->msg_list);
143 nal->libnal_ni.ni_counters.msgs_alloc--;
144 lib_msg_free(nal, msg);
146 LIB_UNLOCK(nal, flags);