Whamcloud - gitweb
merge b_multinet into HEAD
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-msg.c
5  * Message decoding, parsing and finalizing routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32
33 #include <portals/lib-p30.h>
34
35 int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
36 {
37         lib_md_t     *md;
38         lib_eq_t     *eq;
39         int           rc;
40         unsigned long flags;
41
42         /* ni went down while processing this message */
43         if (nal->ni.up == 0) {
44                 return -1;
45         }
46
47         if (msg == NULL)
48                 return 0;
49
50         rc = 0;
51         if (msg->send_ack) {
52                 ptl_hdr_t ack;
53
54                 LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
55
56                 memset (&ack, 0, sizeof (ack));
57                 ack.type     = HTON__u32 (PTL_MSG_ACK);
58                 ack.dest_nid = HTON__u64 (msg->nid);
59                 ack.src_nid  = HTON__u64 (nal->ni.nid);
60                 ack.dest_pid = HTON__u32 (msg->pid);
61                 ack.src_pid  = HTON__u32 (nal->ni.pid);
62                 PTL_HDR_LENGTH(&ack) = 0;
63
64                 ack.msg.ack.dst_wmd = msg->ack_wmd;
65                 ack.msg.ack.match_bits = msg->ev.match_bits;
66                 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
67
68                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
69                                msg->nid, msg->pid, NULL, 0, 0);
70                 /* If this send fails, there's nothing else to clean up */
71         }
72
73         md = msg->md;
74         LASSERT (md->pending > 0);  /* I've not dropped my ref yet */
75         eq = md->eq;
76
77         state_lock(nal, &flags);
78
79         if (eq != NULL) {
80                 ptl_event_t  *ev = &msg->ev;
81                 ptl_event_t  *eq_slot;
82
83                 /* I have to hold the lock while I bump the sequence number
84                  * and copy the event into the queue.  If not, and I was
85                  * interrupted after bumping the sequence number, other
86                  * events could fill the queue, including the slot I just
87                  * allocated to this event.  On resuming, I would overwrite
88                  * a more 'recent' event with old event state, and
89                  * processes taking events off the queue would not detect
90                  * overflow correctly.
91                  */
92
93                 ev->sequence = eq->sequence++;/* Allocate the next queue slot */
94
95                 /* size must be a power of 2 to handle a wrapped sequence # */
96                 LASSERT (eq->size != 0 &&
97                          eq->size == LOWEST_BIT_SET (eq->size));
98                 eq_slot = eq->base + (ev->sequence & (eq->size - 1));
99
100                 /* Invalidate unlinked_me unless this is the last
101                  * event for an auto-unlinked MD.  Note that if md was
102                  * auto-unlinked, md->pending can only decrease
103                  */
104                 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
105                     md->pending != 1)                       /* not last ref */
106                         ev->unlinked_me = PTL_HANDLE_NONE;
107
108                 /* Copy the event into the allocated slot, ensuring all the
109                  * rest of the event's contents have been copied _before_
110                  * the sequence number gets updated.  A processes 'getting'
111                  * an event waits on the next queue slot's sequence to be
112                  * 'new'.  When it is, _all_ other event fields had better
113                  * be consistent.  I assert 'sequence' is the last member,
114                  * so I only need a 2 stage copy.
115                  */
116                 LASSERT(sizeof (ptl_event_t) ==
117                         offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
118
119                 rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
120                                     offsetof (ptl_event_t, sequence));
121                 LASSERT (rc == 0);
122
123 #ifdef __KERNEL__
124                 barrier();
125 #endif
126                 /* Updating the sequence number is what makes the event 'new' */
127
128                 /* cb_write is not necessarily atomic, so this could
129                    cause a race with PtlEQGet */
130                 rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
131                                    (void *)&ev->sequence,sizeof (ev->sequence));
132                 LASSERT (rc == 0);
133
134 #ifdef __KERNEL__
135                 barrier();
136 #endif
137
138                 /* I must also ensure that (a) callbacks are made in the
139                  * same order as the events land in the queue, and (b) the
140                  * callback occurs before the event can be removed from the
141                  * queue, so I can't drop the lock during the callback. */
142                 if (nal->cb_callback != NULL)
143                         nal->cb_callback(nal, private, eq, ev);
144                 else  if (eq->event_callback != NULL)
145                         (void)((eq->event_callback) (ev));
146         }
147
148         LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
149                  (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
150
151         md->pending--;
152         if (md->pending == 0 && /* no more outstanding operations on this md */
153             (md->threshold == 0 ||              /* done its business */
154              (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
155                 lib_md_unlink(nal, md);
156
157         list_del (&msg->msg_list);
158         nal->ni.counters.msgs_alloc--;
159         lib_msg_free(nal, msg);
160
161         state_unlock(nal, &flags);
162
163         return rc;
164 }