Whamcloud - gitweb
- Fix for the 16kb page directory handling (thanks Andreas, it appears to
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/module.h>
26 #include <linux/obd_support.h>
27 #include <linux/lustre_net.h>
28
29 ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq,
30         bulk_sink_eq;
31 static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
32
33 /*
34  *  Free the packet when it has gone out
35  */
36 static int request_out_callback(ptl_event_t *ev)
37 {
38         ENTRY;
39
40         if (ev->type != PTL_EVENT_SENT) {
41                 // XXX make sure we understand all events, including ACK's
42                 CERROR("Unknown event %d\n", ev->type);
43                 LBUG();
44         }
45
46         RETURN(1);
47 }
48
49
50 /*
51  *  Free the packet when it has gone out
52  */
53 static int reply_out_callback(ptl_event_t *ev)
54 {
55         ENTRY;
56
57         if (ev->type == PTL_EVENT_SENT) {
58                 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
59         } else {
60                 // XXX make sure we understand all events, including ACK's
61                 CERROR("Unknown event %d\n", ev->type);
62                 LBUG();
63         }
64
65         RETURN(1);
66 }
67
68 /*
69  * Wake up the thread waiting for the reply once it comes in.
70  */
71 static int reply_in_callback(ptl_event_t *ev)
72 {
73         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
74         ENTRY;
75
76         if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) {
77                 CERROR("Reply received for freed request!  Probably a missing "
78                        "ptlrpc_abort()\n");
79                 LBUG();
80         }
81
82         if (req->rq_xid != ev->match_bits) {
83                 CERROR("Reply packet for wrong request\n");
84                 LBUG(); 
85         }
86
87         if (ev->type == PTL_EVENT_PUT) {
88                 req->rq_repmsg = ev->mem_desc.start + ev->offset;
89                 barrier();
90                 wake_up(&req->rq_wait_for_rep);
91         } else {
92                 // XXX make sure we understand all events, including ACK's
93                 CERROR("Unknown event %d\n", ev->type);
94                 LBUG();
95         }
96
97         RETURN(1);
98 }
99
100 int request_in_callback(ptl_event_t *ev)
101 {
102         struct ptlrpc_service *service = ev->mem_desc.user_ptr;
103
104         if (ev->rlength != ev->mlength)
105                 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
106                        ev->mlength, ev->rlength);
107
108         if (ev->type == PTL_EVENT_PUT)
109                 wake_up(&service->srv_waitq);
110         else
111                 CERROR("Unexpected event type: %d\n", ev->type);
112
113         return 0;
114 }
115
116 static int bulk_source_callback(ptl_event_t *ev)
117 {
118         struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
119         struct ptlrpc_bulk_desc *desc = bulk->b_desc;
120         ENTRY;
121
122         if (ev->type == PTL_EVENT_SENT) {
123                 CDEBUG(D_NET, "got SENT event\n");
124         } else if (ev->type == PTL_EVENT_ACK) {
125                 CDEBUG(D_NET, "got ACK event\n");
126                 if (bulk->b_cb != NULL)
127                         bulk->b_cb(bulk);
128                 if (atomic_dec_and_test(&desc->b_pages_remaining)) {
129                         desc->b_flags |= PTL_BULK_FL_SENT;
130                         wake_up(&desc->b_waitq);
131                         if (desc->b_cb != NULL)
132                                 desc->b_cb(desc, desc->b_cb_data);
133                 }
134         } else {
135                 CERROR("Unexpected event type!\n");
136                 LBUG();
137         }
138
139         RETURN(1);
140 }
141
142 static int bulk_sink_callback(ptl_event_t *ev)
143 {
144         struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
145         struct ptlrpc_bulk_desc *desc = bulk->b_desc;
146         ENTRY;
147
148         if (ev->type == PTL_EVENT_PUT) {
149                 if (bulk->b_buf != ev->mem_desc.start + ev->offset)
150                         CERROR("bulkbuf != mem_desc -- why?\n");
151                 if (bulk->b_cb != NULL)
152                         bulk->b_cb(bulk);
153                 if (atomic_dec_and_test(&desc->b_pages_remaining)) {
154                         desc->b_flags |= PTL_BULK_FL_RCVD;
155                         wake_up(&desc->b_waitq);
156                         if (desc->b_cb != NULL)
157                                 desc->b_cb(desc, desc->b_cb_data);
158                 }
159         } else {
160                 CERROR("Unexpected event type!\n");
161                 LBUG();
162         }
163
164         RETURN(1);
165 }
166
167 int ptlrpc_init_portals(void)
168 {
169         int rc;
170         ptl_handle_ni_t ni;
171
172         socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal");
173         qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal");
174         if (socknal_nip == NULL && qswnal_nip == NULL) {
175                 CERROR("get_ni failed: is a NAL module loaded?\n");
176                 return -EIO;
177         }
178
179         /* Use the qswnal if it's there */
180         if (qswnal_nip != NULL)
181                 ni = *qswnal_nip;
182         else
183                 ni = *socknal_nip;
184
185         rc = PtlEQAlloc(ni, 1024, request_out_callback, &request_out_eq);
186         if (rc != PTL_OK)
187                 CERROR("PtlEQAlloc failed: %d\n", rc);
188
189         rc = PtlEQAlloc(ni, 1024, reply_out_callback, &reply_out_eq);
190         if (rc != PTL_OK)
191                 CERROR("PtlEQAlloc failed: %d\n", rc);
192
193         rc = PtlEQAlloc(ni, 1024, reply_in_callback, &reply_in_eq);
194         if (rc != PTL_OK)
195                 CERROR("PtlEQAlloc failed: %d\n", rc);
196
197         rc = PtlEQAlloc(ni, 1024, bulk_source_callback, &bulk_source_eq);
198         if (rc != PTL_OK)
199                 CERROR("PtlEQAlloc failed: %d\n", rc);
200
201         rc = PtlEQAlloc(ni, 1024, bulk_sink_callback, &bulk_sink_eq);
202         if (rc != PTL_OK)
203                 CERROR("PtlEQAlloc failed: %d\n", rc);
204
205         return rc;
206 }
207
208 void ptlrpc_exit_portals(void)
209 {
210         PtlEQFree(request_out_eq);
211         PtlEQFree(reply_out_eq);
212         PtlEQFree(reply_in_eq);
213         PtlEQFree(bulk_source_eq);
214         PtlEQFree(bulk_sink_eq);
215
216         if (qswnal_nip != NULL)
217                 inter_module_put("kqswnal_ni");
218         if (socknal_nip != NULL)
219                 inter_module_put("ksocknal_ni");
220 }