Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-msg.c
5  * Message decoding, parsing and finalizing routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32
33 #include <portals/lib-p30.h>
34
35 int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
36 {
37         lib_md_t     *md;
38         lib_eq_t     *eq;
39         int           rc;
40         unsigned long flags;
41
42         /* ni went down while processing this message */
43         if (nal->ni.up == 0) {
44                 return -1;
45         }
46
47         if (msg == NULL)
48                 return 0;
49
50         rc = 0;
51         if (msg->send_ack) {
52                 ptl_hdr_t ack;
53
54                 LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
55
56                 memset (&ack, 0, sizeof (ack));
57                 ack.type     = HTON__u32 (PTL_MSG_ACK);
58                 ack.dest_nid = HTON__u64 (msg->nid);
59                 ack.src_nid  = HTON__u64 (nal->ni.nid);
60                 ack.dest_pid = HTON__u32 (msg->pid);
61                 ack.src_pid  = HTON__u32 (nal->ni.pid);
62                 PTL_HDR_LENGTH(&ack) = 0;
63
64                 ack.msg.ack.dst_wmd = msg->ack_wmd;
65                 ack.msg.ack.match_bits = msg->ev.match_bits;
66                 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
67
68                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
69                                msg->nid, msg->pid, NULL, 0, 0);
70         }
71
72         md = msg->md;
73         LASSERT (md->pending > 0);  /* I've not dropped my ref yet */
74         eq = md->eq;
75
76         state_lock(nal, &flags);
77
78         if (eq != NULL) {
79                 ptl_event_t  *ev = &msg->ev;
80                 ptl_event_t  *eq_slot;
81
82                 /* I have to hold the lock while I bump the sequence number
83                  * and copy the event into the queue.  If not, and I was
84                  * interrupted after bumping the sequence number, other
85                  * events could fill the queue, including the slot I just
86                  * allocated to this event.  On resuming, I would overwrite
87                  * a more 'recent' event with old event state, and
88                  * processes taking events off the queue would not detect
89                  * overflow correctly.
90                  */
91
92                 ev->sequence = eq->sequence++;/* Allocate the next queue slot */
93
94                 /* size must be a power of 2 to handle a wrapped sequence # */
95                 LASSERT (eq->size != 0 &&
96                          eq->size == LOWEST_BIT_SET (eq->size));
97                 eq_slot = eq->base + (ev->sequence & (eq->size - 1));
98
99                 /* Invalidate unlinked_me unless this is the last
100                  * event for an auto-unlinked MD.  Note that if md was
101                  * auto-unlinked, md->pending can only decrease
102                  */
103                 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
104                     md->pending != 1)                       /* not last ref */
105                         ev->unlinked_me = PTL_HANDLE_NONE;
106
107                 /* Copy the event into the allocated slot, ensuring all the
108                  * rest of the event's contents have been copied _before_
109                  * the sequence number gets updated.  A processes 'getting'
110                  * an event waits on the next queue slot's sequence to be
111                  * 'new'.  When it is, _all_ other event fields had better
112                  * be consistent.  I assert 'sequence' is the last member,
113                  * so I only need a 2 stage copy.
114                  */
115                 LASSERT(sizeof (ptl_event_t) ==
116                         offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
117
118                 rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
119                                     offsetof (ptl_event_t, sequence));
120                 LASSERT (rc == 0);
121
122 #ifdef __KERNEL__
123                 barrier();
124 #endif
125                 /* Updating the sequence number is what makes the event 'new' */
126
127                 /* cb_write is not necessarily atomic, so this could
128                    cause a race with PtlEQGet */
129                 rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
130                                    (void *)&ev->sequence,sizeof (ev->sequence));
131                 LASSERT (rc == 0);
132
133 #ifdef __KERNEL__
134                 barrier();
135 #endif
136
137                 /* I must also ensure that (a) callbacks are made in the
138                  * same order as the events land in the queue, and (b) the
139                  * callback occurs before the event can be removed from the
140                  * queue, so I can't drop the lock during the callback. */
141                 if (nal->cb_callback != NULL)
142                         nal->cb_callback(nal, private, eq, ev);
143                 else  if (eq->event_callback != NULL)
144                         (void)((eq->event_callback) (ev));
145         }
146
147         LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
148                  (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
149
150         md->pending--;
151         if (md->pending == 0 && /* no more outstanding operations on this md */
152             (md->threshold == 0 ||              /* done its business */
153              (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
154                 lib_md_unlink(nal, md);
155
156         list_del (&msg->msg_list);
157         nal->ni.counters.msgs_alloc--;
158         lib_msg_free(nal, msg);
159
160         state_unlock(nal, &flags);
161
162         return rc;
163 }