1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Message decoding, parsing and finalizing routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
33 #include <portals/lib-p30.h>
36 lib_enq_event_locked (nal_cb_t *nal, void *private,
37 lib_eq_t *eq, ptl_event_t *ev)
42 ev->sequence = eq->sequence++; /* Allocate the next queue slot */
44 /* size must be a power of 2 to handle a wrapped sequence # */
45 LASSERT (eq->size != 0 &&
46 eq->size == LOWEST_BIT_SET (eq->size));
47 eq_slot = eq->base + (ev->sequence & (eq->size - 1));
49 /* Copy the event into the allocated slot, ensuring all the rest of
50 * the event's contents have been copied _before_ the sequence
51 * number gets updated. A processes 'getting' an event waits on
52 * the next queue slot's sequence to be 'new'. When it is, _all_
53 * other event fields had better be consistent. I assert
54 * 'sequence' is the last member, so I only need a 2 stage copy. */
56 LASSERT(sizeof (ptl_event_t) ==
57 offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
59 rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
60 offsetof (ptl_event_t, sequence));
61 LASSERT (rc == PTL_OK);
66 /* Updating the sequence number is what makes the event 'new' NB if
67 * the cb_write below isn't atomic, this could cause a race with
69 rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
70 (void *)&ev->sequence,sizeof (ev->sequence));
71 LASSERT (rc == PTL_OK);
77 if (nal->cb_callback != NULL)
78 nal->cb_callback(nal, private, eq, ev);
79 else if (eq->event_callback != NULL)
80 eq->event_callback(ev);
84 lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
92 /* ni went down while processing this message */
99 /* Only send an ACK if the PUT completed successfully */
100 if (status == PTL_OK &&
101 !ptl_is_wire_handle_none(&msg->ack_wmd)) {
103 LASSERT(msg->ev.type == PTL_EVENT_PUT_END);
105 memset (&ack, 0, sizeof (ack));
106 ack.type = HTON__u32 (PTL_MSG_ACK);
107 ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
108 ack.src_nid = HTON__u64 (nal->ni.nid);
109 ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
110 ack.src_pid = HTON__u32 (nal->ni.pid);
111 ack.payload_length = 0;
113 ack.msg.ack.dst_wmd = msg->ack_wmd;
114 ack.msg.ack.match_bits = msg->ev.match_bits;
115 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
117 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
118 msg->ev.initiator.nid, msg->ev.initiator.pid,
121 /* send failed: there's nothing else to clean up. */
122 CERROR("Error %d sending ACK to "LPX64"\n",
123 rc, msg->ev.initiator.nid);
129 state_lock(nal, &flags);
131 /* Now it's safe to drop my caller's ref */
133 LASSERT (md->pending >= 0);
135 /* Should I unlink this MD? */
136 if (md->pending != 0) /* other refs */
138 else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0)
140 else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0)
143 unlink = lib_md_exhausted(md);
145 msg->ev.ni_fail_type = status;
146 msg->ev.unlinked = unlink;
149 lib_enq_event_locked(nal, private, md->eq, &msg->ev);
152 lib_md_unlink(nal, md);
154 list_del (&msg->msg_list);
155 nal->ni.counters.msgs_alloc--;
156 lib_msg_free(nal, msg);
158 state_unlock(nal, &flags);