1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Message decoding, parsing and finalizing routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
33 #include <portals/lib-p30.h>
35 int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
42 /* ni went down while processing this message */
43 if (nal->ni.up == 0) {
54 LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
56 memset (&ack, 0, sizeof (ack));
57 ack.type = HTON__u32 (PTL_MSG_ACK);
58 ack.dest_nid = HTON__u64 (msg->nid);
59 ack.src_nid = HTON__u64 (nal->ni.nid);
60 ack.dest_pid = HTON__u32 (msg->pid);
61 ack.src_pid = HTON__u32 (nal->ni.pid);
62 ack.payload_length = 0;
64 ack.msg.ack.dst_wmd = msg->ack_wmd;
65 ack.msg.ack.match_bits = msg->ev.match_bits;
66 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
68 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
69 msg->nid, msg->pid, NULL, 0, 0);
70 /* If this send fails, there's nothing else to clean up */
74 LASSERT (md->pending > 0); /* I've not dropped my ref yet */
77 state_lock(nal, &flags);
80 ptl_event_t *ev = &msg->ev;
83 /* I have to hold the lock while I bump the sequence number
84 * and copy the event into the queue. If not, and I was
85 * interrupted after bumping the sequence number, other
86 * events could fill the queue, including the slot I just
87 * allocated to this event. On resuming, I would overwrite
88 * a more 'recent' event with old event state, and
89 * processes taking events off the queue would not detect
93 ev->sequence = eq->sequence++;/* Allocate the next queue slot */
95 /* size must be a power of 2 to handle a wrapped sequence # */
96 LASSERT (eq->size != 0 &&
97 eq->size == LOWEST_BIT_SET (eq->size));
98 eq_slot = eq->base + (ev->sequence & (eq->size - 1));
100 /* Invalidate unlinked_me unless this is the last
101 * event for an auto-unlinked MD. Note that if md was
102 * auto-unlinked, md->pending can only decrease
104 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
105 md->pending != 1) /* not last ref */
106 ev->unlinked_me = PTL_HANDLE_NONE;
108 /* Copy the event into the allocated slot, ensuring all the
109 * rest of the event's contents have been copied _before_
110 * the sequence number gets updated. A processes 'getting'
111 * an event waits on the next queue slot's sequence to be
112 * 'new'. When it is, _all_ other event fields had better
113 * be consistent. I assert 'sequence' is the last member,
114 * so I only need a 2 stage copy.
116 LASSERT(sizeof (ptl_event_t) ==
117 offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
119 rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
120 offsetof (ptl_event_t, sequence));
126 /* Updating the sequence number is what makes the event 'new' */
128 /* cb_write is not necessarily atomic, so this could
129 cause a race with PtlEQGet */
130 rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
131 (void *)&ev->sequence,sizeof (ev->sequence));
138 /* I must also ensure that (a) callbacks are made in the
139 * same order as the events land in the queue, and (b) the
140 * callback occurs before the event can be removed from the
141 * queue, so I can't drop the lock during the callback. */
142 if (nal->cb_callback != NULL)
143 nal->cb_callback(nal, private, eq, ev);
144 else if (eq->event_callback != NULL)
145 (void)((eq->event_callback) (ev));
148 LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
149 (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
152 if (md->pending == 0 && /* no more outstanding operations on this md */
153 (md->threshold == 0 || /* done its business */
154 (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
155 lib_md_unlink(nal, md);
157 list_del (&msg->msg_list);
158 nal->ni.counters.msgs_alloc--;
159 lib_msg_free(nal, msg);
161 state_unlock(nal, &flags);