Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-msg.c
5  * Message decoding, parsing and finalizing routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32
33 #include <portals/lib-p30.h>
34
35 void
36 lib_enq_event_locked (lib_nal_t *nal, void *private, 
37                       lib_eq_t *eq, ptl_event_t *ev)
38 {
39         ptl_event_t  *eq_slot;
40
41         /* Allocate the next queue slot */
42         ev->link = ev->sequence = eq->eq_enq_seq++;
43         /* NB we don't support START events yet and we don't create a separate
44          * UNLINK event unless an explicit unlink succeeds, so the link
45          * sequence is pretty useless */
46
47         /* We don't support different uid/jids yet */
48         ev->uid = 0;
49         ev->jid = 0;
50         
51         /* size must be a power of 2 to handle sequence # overflow */
52         LASSERT (eq->eq_size != 0 &&
53                  eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
54         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
55
56         /* There is no race since both event consumers and event producers
57          * take the LIB_LOCK(), so we don't screw around with memory
58          * barriers, setting the sequence number last or wierd structure
59          * layout assertions. */
60         *eq_slot = *ev;
61
62         /* Call the callback handler (if any) */
63         if (eq->eq_callback != NULL)
64                 eq->eq_callback (eq_slot);
65
66         /* Wake anyone sleeping for an event (see lib-eq.c) */
67 #ifdef __KERNEL__
68         if (waitqueue_active(&nal->libnal_ni.ni_waitq))
69                 wake_up_all(&nal->libnal_ni.ni_waitq);
70 #else
71         pthread_cond_broadcast(&nal->libnal_ni.ni_cond);
72 #endif
73 }
74
75 void 
76 lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
77 {
78         lib_md_t     *md;
79         int           unlink;
80         unsigned long flags;
81         int           rc;
82         ptl_hdr_t     ack;
83
84         if (msg == NULL)
85                 return;
86
87         /* Only send an ACK if the PUT completed successfully */
88         if (status == PTL_OK &&
89             !ptl_is_wire_handle_none(&msg->ack_wmd)) {
90
91                 LASSERT(msg->ev.type == PTL_EVENT_PUT_END);
92
93                 memset (&ack, 0, sizeof (ack));
94                 ack.type     = cpu_to_le32(PTL_MSG_ACK);
95                 ack.dest_nid = cpu_to_le64(msg->ev.initiator.nid);
96                 ack.dest_pid = cpu_to_le32(msg->ev.initiator.pid);
97                 ack.src_nid  = cpu_to_le64(nal->libnal_ni.ni_pid.nid);
98                 ack.src_pid  = cpu_to_le32(nal->libnal_ni.ni_pid.pid);
99                 ack.payload_length = 0;
100
101                 ack.msg.ack.dst_wmd = msg->ack_wmd;
102                 ack.msg.ack.match_bits = msg->ev.match_bits;
103                 ack.msg.ack.mlength = cpu_to_le32(msg->ev.mlength);
104
105                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
106                                msg->ev.initiator.nid, msg->ev.initiator.pid, 
107                                NULL, 0, 0);
108                 if (rc != PTL_OK) {
109                         /* send failed: there's nothing else to clean up. */
110                         CERROR("Error %d sending ACK to "LPX64"\n", 
111                                rc, msg->ev.initiator.nid);
112                 }
113         }
114
115         md = msg->md;
116
117         LIB_LOCK(nal, flags);
118
119         /* Now it's safe to drop my caller's ref */
120         md->pending--;
121         LASSERT (md->pending >= 0);
122
123         /* Should I unlink this MD? */
124         if (md->pending != 0)                   /* other refs */
125                 unlink = 0;
126         else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0)
127                 unlink = 1;
128         else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0)
129                 unlink = 0;
130         else
131                 unlink = lib_md_exhausted(md);
132
133         msg->ev.ni_fail_type = status;
134         msg->ev.unlinked = unlink;
135
136         if (md->eq != NULL)
137                 lib_enq_event_locked(nal, private, md->eq, &msg->ev);
138
139         if (unlink)
140                 lib_md_unlink(nal, md);
141
142         list_del (&msg->msg_list);
143         nal->libnal_ni.ni_counters.msgs_alloc--;
144         lib_msg_free(nal, msg);
145
146         LIB_UNLOCK(nal, flags);
147 }