Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-msg.c
5  * Message decoding, parsing and finalizing routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32
33 #include <portals/lib-p30.h>
34
35 void
36 lib_enq_event_locked (nal_cb_t *nal, void *private, 
37                       lib_eq_t *eq, ptl_event_t *ev)
38 {
39         ptl_event_t  *eq_slot;
40         int           rc;
41         
42         ev->sequence = eq->sequence++; /* Allocate the next queue slot */
43
44         /* size must be a power of 2 to handle a wrapped sequence # */
45         LASSERT (eq->size != 0 &&
46                  eq->size == LOWEST_BIT_SET (eq->size));
47         eq_slot = eq->base + (ev->sequence & (eq->size - 1));
48
49         /* Copy the event into the allocated slot, ensuring all the rest of
50          * the event's contents have been copied _before_ the sequence
51          * number gets updated.  A processes 'getting' an event waits on
52          * the next queue slot's sequence to be 'new'.  When it is, _all_
53          * other event fields had better be consistent.  I assert
54          * 'sequence' is the last member, so I only need a 2 stage copy. */
55
56         LASSERT(sizeof (ptl_event_t) ==
57                 offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
58
59         rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
60                             offsetof (ptl_event_t, sequence));
61         LASSERT (rc == PTL_OK);
62
63 #ifdef __KERNEL__
64         barrier();
65 #endif
66         /* Updating the sequence number is what makes the event 'new' NB if
67          * the cb_write below isn't atomic, this could cause a race with
68          * PtlEQGet */
69         rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
70                            (void *)&ev->sequence,sizeof (ev->sequence));
71         LASSERT (rc == PTL_OK);
72
73 #ifdef __KERNEL__
74         barrier();
75 #endif
76
77         if (nal->cb_callback != NULL)
78                 nal->cb_callback(nal, private, eq, ev);
79         else if (eq->event_callback != NULL)
80                 eq->event_callback(ev);
81 }
82
83 void 
84 lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
85 {
86         lib_md_t     *md;
87         int           unlink;
88         unsigned long flags;
89         int           rc;
90         ptl_hdr_t     ack;
91
92         /* ni went down while processing this message */
93         if (nal->ni.up == 0)
94                 return;
95
96         if (msg == NULL)
97                 return;
98
99         /* Only send an ACK if the PUT completed successfully */
100         if (status == PTL_OK &&
101             !ptl_is_wire_handle_none(&msg->ack_wmd)) {
102
103                 LASSERT(msg->ev.type == PTL_EVENT_PUT_END);
104
105                 memset (&ack, 0, sizeof (ack));
106                 ack.type     = HTON__u32 (PTL_MSG_ACK);
107                 ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
108                 ack.src_nid  = HTON__u64 (nal->ni.nid);
109                 ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
110                 ack.src_pid  = HTON__u32 (nal->ni.pid);
111                 ack.payload_length = 0;
112
113                 ack.msg.ack.dst_wmd = msg->ack_wmd;
114                 ack.msg.ack.match_bits = msg->ev.match_bits;
115                 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
116
117                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
118                                msg->ev.initiator.nid, msg->ev.initiator.pid, 
119                                NULL, 0, 0);
120                 if (rc != PTL_OK) {
121                         /* send failed: there's nothing else to clean up. */
122                         CERROR("Error %d sending ACK to "LPX64"\n", 
123                                rc, msg->ev.initiator.nid);
124                 }
125         }
126
127         md = msg->md;
128
129         state_lock(nal, &flags);
130
131         /* Now it's safe to drop my caller's ref */
132         md->pending--;
133         LASSERT (md->pending >= 0);
134
135         /* Should I unlink this MD? */
136         if (md->pending != 0)                   /* other refs */
137                 unlink = 0;
138         else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0)
139                 unlink = 1;
140         else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0)
141                 unlink = 0;
142         else
143                 unlink = lib_md_exhausted(md);
144
145         msg->ev.ni_fail_type = status;
146         msg->ev.unlinked = unlink;
147
148         if (md->eq != NULL)
149                 lib_enq_event_locked(nal, private, md->eq, &msg->ev);
150
151         if (unlink)
152                 lib_md_unlink(nal, md);
153
154         list_del (&msg->msg_list);
155         nal->ni.counters.msgs_alloc--;
156         lib_msg_free(nal, msg);
157
158         state_unlock(nal, &flags);
159 }