Whamcloud - gitweb
fixed bulk source callback race
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/lustre_net.h>
27 #include <linux/lustre_lib.h>
28
29 extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
30         bulk_source_eq, bulk_sink_eq;
31
32 static int ptl_send_buf(struct ptlrpc_request *request,
33                         struct ptlrpc_connection *conn, int portal)
34 {
35         int rc;
36         ptl_process_id_t remote_id;
37         ptl_handle_md_t md_h;
38
39         request->rq_req_md.user_ptr = request;
40
41         switch (request->rq_type) {
42         case PTL_RPC_MSG_REQUEST:
43                 request->rq_reqmsg->type = HTON__u32(request->rq_type);
44                 request->rq_req_md.start = request->rq_reqmsg;
45                 request->rq_req_md.length = request->rq_reqlen;
46                 request->rq_req_md.eventq = request_out_eq;
47                 break;
48         case PTL_RPC_MSG_REPLY:
49                 request->rq_repmsg->type = HTON__u32(request->rq_type);
50                 request->rq_req_md.start = request->rq_repmsg;
51                 request->rq_req_md.length = request->rq_replen;
52                 request->rq_req_md.eventq = reply_out_eq;
53                 break;
54         default:
55                 LBUG();
56                 return -1; /* notreached */
57         }
58         request->rq_req_md.threshold = 1;
59         request->rq_req_md.options = PTL_MD_OP_PUT;
60         request->rq_req_md.user_ptr = request;
61
62         rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
63         if (rc != 0) {
64                 CERROR("PtlMDBind failed: %d\n", rc);
65                 LBUG();
66                 return rc;
67         }
68
69         remote_id.nid = conn->c_peer.peer_nid;
70         remote_id.pid = 0;
71
72         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %Ld\n",
73                request->rq_req_md.length, portal, request->rq_xid);
74
75         if (!portal)
76                 LBUG();
77         rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
78                     0, 0);
79         if (rc != PTL_OK) {
80                 CERROR("PtlPut(%Lu, %d, %Ld) failed: %d\n", remote_id.nid,
81                        portal, request->rq_xid, rc);
82                 PtlMDUnlink(md_h);
83         }
84
85         return rc;
86 }
87
88 static inline struct iovec *
89 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
90 {
91         struct iovec *iov;
92
93         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
94                 return (desc->bd_iov);
95
96         OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec));
97         if (iov == NULL)
98                 LBUG();
99
100         return (iov);
101 }
102
103 static inline void
104 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
105 {
106         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
107                 return;
108
109         OBD_FREE (iov, desc->bd_page_count * sizeof (struct iovec));
110 }
111
112 int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
113 {
114         int rc;
115         struct list_head *tmp, *next;
116         ptl_process_id_t remote_id;
117         __u32 xid = 0;
118         struct iovec *iov;
119         ENTRY;
120
121         iov = ptlrpc_get_bulk_iov (desc);
122         if (iov == NULL)
123                 RETURN (-ENOMEM);
124
125         desc->bd_md.start = iov;
126         desc->bd_md.niov = 0;
127         desc->bd_md.length = 0;
128         desc->bd_md.eventq = bulk_source_eq;
129         desc->bd_md.threshold = 2; /* SENT and ACK */
130         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
131         desc->bd_md.user_ptr = desc;
132
133         atomic_set (&desc->bd_source_callback_count, 2);
134         
135         list_for_each_safe(tmp, next, &desc->bd_page_list) {
136                 struct ptlrpc_bulk_page *bulk;
137                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
138
139                 LASSERT (desc->bd_md.niov < desc->bd_page_count);
140
141                 if (desc->bd_md.niov == 0)
142                         xid = bulk->bp_xid;
143                 LASSERT (xid == bulk->bp_xid);   /* should all be the same */
144
145                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
146                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
147                 desc->bd_md.niov++;
148                 desc->bd_md.length += bulk->bp_buflen;
149         }
150
151         LASSERT (desc->bd_md.niov == desc->bd_page_count);
152         LASSERT (desc->bd_md.niov != 0);
153
154         rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
155                        &desc->bd_md_h);
156
157         ptlrpc_put_bulk_iov (desc, iov);        /* move down to reduce latency to send */
158
159         if (rc != PTL_OK) {
160                 CERROR("PtlMDBind failed: %d\n", rc);
161                 LBUG();
162                 RETURN(rc);
163         }
164
165         remote_id.nid = desc->bd_connection->c_peer.peer_nid;
166         remote_id.pid = 0;
167
168         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid %Lx pid %d xid %d\n",
169                desc->bd_md.niov, desc->bd_md.length,
170                desc->bd_portal, remote_id.nid, remote_id.pid, xid);
171
172         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
173                     desc->bd_portal, 0, xid, 0, 0);
174         if (rc != PTL_OK) {
175                 CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
176                        remote_id.nid, desc->bd_portal, xid, rc);
177                 PtlMDUnlink(desc->bd_md_h);
178                 LBUG();
179                 RETURN(rc);
180         }
181
182         RETURN(0);
183 }
184
185 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
186 {
187         struct list_head *tmp, *next;
188         int rc;
189         __u32 xid = 0;
190         struct iovec *iov;
191         ptl_process_id_t source_id;
192         ENTRY;
193
194         iov = ptlrpc_get_bulk_iov (desc);
195         if (iov == NULL)
196                 return (-ENOMEM);
197
198         desc->bd_md.start = iov;
199         desc->bd_md.niov = 0;
200         desc->bd_md.length = 0;
201         desc->bd_md.threshold = 1;
202         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
203         desc->bd_md.user_ptr = desc;
204         desc->bd_md.eventq = bulk_sink_eq;
205
206         list_for_each_safe(tmp, next, &desc->bd_page_list) {
207                 struct ptlrpc_bulk_page *bulk;
208                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
209
210                 LASSERT (desc->bd_md.niov < desc->bd_page_count);
211
212                 if (desc->bd_md.niov == 0)
213                         xid = bulk->bp_xid;
214                 LASSERT (xid == bulk->bp_xid);   /* should all be the same */
215
216                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
217                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
218                 desc->bd_md.niov++;
219                 desc->bd_md.length += bulk->bp_buflen;
220         }
221
222         LASSERT (desc->bd_md.niov == desc->bd_page_count);
223         LASSERT (desc->bd_md.niov != 0);
224
225         source_id.nid = desc->bd_connection->c_peer.peer_nid;
226         source_id.pid = PTL_PID_ANY;
227
228         rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni,
229                          desc->bd_portal, source_id, xid, 0,
230                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
231
232         ptlrpc_put_bulk_iov (desc, iov);
233
234         if (rc != PTL_OK) {
235                 CERROR("PtlMEAttach failed: %d\n", rc);
236                 LBUG();
237                 GOTO(cleanup, rc);
238         }
239
240         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
241                          &desc->bd_md_h);
242         if (rc != PTL_OK) {
243                 CERROR("PtlMDAttach failed: %d\n", rc);
244                 LBUG();
245                 GOTO(cleanup, rc);
246         }
247
248         CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
249                "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
250                xid, desc->bd_portal);
251
252         RETURN(0);
253
254  cleanup:
255         ptlrpc_abort_bulk(desc);
256
257         return rc;
258 }
259
260 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
261 {
262         /* This should be safe: these handles are initialized to be
263          * invalid in ptlrpc_prep_bulk() */
264         PtlMDUnlink(desc->bd_md_h);
265         PtlMEUnlink(desc->bd_me_h);
266
267         return 0;
268 }
269
270 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
271 {
272         if (req->rq_repmsg == NULL) {
273                 CERROR("bad: someone called ptlrpc_reply when they meant "
274                        "ptlrpc_error\n");
275                 return -EINVAL;
276         }
277
278         /* FIXME: we need to increment the count of handled events */
279         if (req->rq_type != PTL_RPC_MSG_ERR)
280                 req->rq_type = PTL_RPC_MSG_REPLY;
281         //req->rq_repmsg->conn = req->rq_connection->c_remote_conn;
282         //req->rq_repmsg->token = req->rq_connection->c_remote_token;
283         req->rq_repmsg->status = HTON__u32(req->rq_status);
284         return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
285 }
286
287 int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
288 {
289         int rc;
290         ENTRY;
291
292         if (req->rq_repmsg) {
293                 CERROR("req already has repmsg\n");
294                 LBUG();
295         }
296
297         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
298         if (rc)
299                 RETURN(rc);
300
301         req->rq_repmsg->type = HTON__u32(PTL_RPC_MSG_ERR);
302
303         rc = ptlrpc_reply(svc, req);
304         RETURN(rc);
305 }
306
307 int ptl_send_rpc(struct ptlrpc_request *request)
308 {
309         int rc;
310         char *repbuf;
311         ptl_process_id_t source_id;
312
313         ENTRY;
314
315         if (request->rq_type != PTL_RPC_MSG_REQUEST) {
316                 CERROR("wrong packet type sent %d\n",
317                        NTOH__u32(request->rq_reqmsg->type));
318                 LBUG();
319                 RETURN(EINVAL);
320         }
321         if (request->rq_replen == 0) {
322                 CERROR("request->rq_replen is 0!\n");
323                 RETURN(EINVAL);
324         }
325
326         /* request->rq_repmsg is set only when the reply comes in, in
327          * client_packet_callback() */
328         if (request->rq_reply_md.start)
329                 OBD_FREE(request->rq_reply_md.start, request->rq_replen);
330
331         OBD_ALLOC(repbuf, request->rq_replen);
332         if (!repbuf) {
333                 LBUG();
334                 RETURN(ENOMEM);
335         }
336
337         // down(&request->rq_client->cli_rpc_sem);
338
339         source_id.nid = request->rq_connection->c_peer.peer_nid;
340         source_id.pid = PTL_PID_ANY;
341
342         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
343                          request->rq_import->imp_client->cli_reply_portal,
344                          source_id, request->rq_xid, 0, PTL_UNLINK,
345                          PTL_INS_AFTER, &request->rq_reply_me_h);
346         if (rc != PTL_OK) {
347                 CERROR("PtlMEAttach failed: %d\n", rc);
348                 LBUG();
349                 GOTO(cleanup, rc);
350         }
351
352         request->rq_reply_md.start = repbuf;
353         request->rq_reply_md.length = request->rq_replen;
354         request->rq_reply_md.threshold = 1;
355         request->rq_reply_md.options = PTL_MD_OP_PUT;
356         request->rq_reply_md.user_ptr = request;
357         request->rq_reply_md.eventq = reply_in_eq;
358
359         rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
360                          PTL_UNLINK, &request->rq_reply_md_h);
361         if (rc != PTL_OK) {
362                 CERROR("PtlMDAttach failed: %d\n", rc);
363                 LBUG();
364                 GOTO(cleanup2, rc);
365         }
366
367         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %Lu, portal %u\n",
368                request->rq_replen, request->rq_xid,
369                request->rq_import->imp_client->cli_reply_portal);
370
371         rc = ptl_send_buf(request, request->rq_connection,
372                           request->rq_import->imp_client->cli_request_portal);
373         RETURN(rc);
374
375  cleanup2:
376         PtlMEUnlink(request->rq_reply_me_h);
377  cleanup:
378         OBD_FREE(repbuf, request->rq_replen);
379         // up(&request->rq_client->cli_rpc_sem);
380
381         return rc;
382 }
383
384 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
385 {
386         struct ptlrpc_service *service = rqbd->rqbd_service;
387         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
388         int rc;
389         ptl_md_t dummy;
390         ptl_handle_md_t md_h;
391
392         /* Attach the leading ME on which we build the ring */
393         rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
394                          match_id, 0, ~0, 
395                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
396         if (rc != PTL_OK) {
397                 CERROR("PtlMEAttach failed: %d\n", rc);
398                 LBUG();
399         }
400
401         dummy.start         = rqbd->rqbd_buffer;
402         dummy.length        = service->srv_buf_size;
403         dummy.max_offset    = service->srv_buf_size;
404         dummy.threshold     = 1;
405         dummy.options       = PTL_MD_OP_PUT;
406         dummy.user_ptr      = rqbd;
407         dummy.eventq        = service->srv_eq_h;
408
409         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
410         if (rc != PTL_OK) {
411                 /* cleanup */
412                 CERROR("PtlMDAttach failed: %d\n", rc);
413                 LBUG();
414         }
415 }