Whamcloud - gitweb
Clean up compiler warnings for 64-bit systems where __u64 is a long and
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/lustre_net.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/obd.h>
29
30 extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
31         bulk_source_eq, bulk_sink_eq;
32
33 static int ptl_send_buf(struct ptlrpc_request *request,
34                         struct ptlrpc_connection *conn, int portal)
35 {
36         int rc;
37         ptl_process_id_t remote_id;
38         ptl_handle_md_t md_h;
39
40         request->rq_req_md.user_ptr = request;
41
42         switch (request->rq_type) {
43         case PTL_RPC_MSG_REQUEST:
44                 request->rq_reqmsg->type = HTON__u32(request->rq_type);
45                 request->rq_req_md.start = request->rq_reqmsg;
46                 request->rq_req_md.length = request->rq_reqlen;
47                 request->rq_req_md.eventq = request_out_eq;
48                 break;
49         case PTL_RPC_MSG_REPLY:
50                 request->rq_repmsg->type = HTON__u32(request->rq_type);
51                 request->rq_req_md.start = request->rq_repmsg;
52                 request->rq_req_md.length = request->rq_replen;
53                 request->rq_req_md.eventq = reply_out_eq;
54                 break;
55         default:
56                 LBUG();
57                 return -1; /* notreached */
58         }
59         request->rq_req_md.threshold = 1;
60         request->rq_req_md.options = PTL_MD_OP_PUT;
61         request->rq_req_md.user_ptr = request;
62
63         rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
64         if (rc != 0) {
65                 CERROR("PtlMDBind failed: %d\n", rc);
66                 LBUG();
67                 return rc;
68         }
69
70         remote_id.nid = conn->c_peer.peer_nid;
71         remote_id.pid = 0;
72
73         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
74                request->rq_req_md.length, portal, request->rq_xid);
75
76         if (!portal)
77                 LBUG();
78         rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
79                     0, 0);
80         if (rc != PTL_OK) {
81                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n", remote_id.nid,
82                        portal, request->rq_xid, rc);
83                 PtlMDUnlink(md_h);
84         }
85
86         return rc;
87 }
88
89 static inline struct iovec *
90 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
91 {
92         struct iovec *iov;
93
94         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
95                 return (desc->bd_iov);
96
97         OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec));
98         if (iov == NULL)
99                 LBUG();
100
101         return (iov);
102 }
103
104 static inline void
105 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
106 {
107         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
108                 return;
109
110         OBD_FREE (iov, desc->bd_page_count * sizeof (struct iovec));
111 }
112
113 int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
114 {
115         int rc;
116         struct list_head *tmp, *next;
117         ptl_process_id_t remote_id;
118         __u32 xid = 0;
119         struct iovec *iov;
120         ENTRY;
121
122         iov = ptlrpc_get_bulk_iov (desc);
123         if (iov == NULL)
124                 RETURN (-ENOMEM);
125
126         desc->bd_md.start = iov;
127         desc->bd_md.niov = 0;
128         desc->bd_md.length = 0;
129         desc->bd_md.eventq = bulk_source_eq;
130         desc->bd_md.threshold = 2; /* SENT and ACK */
131         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
132         desc->bd_md.user_ptr = desc;
133
134         atomic_set (&desc->bd_source_callback_count, 2);
135         
136         list_for_each_safe(tmp, next, &desc->bd_page_list) {
137                 struct ptlrpc_bulk_page *bulk;
138                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
139
140                 LASSERT (desc->bd_md.niov < desc->bd_page_count);
141
142                 if (desc->bd_md.niov == 0)
143                         xid = bulk->bp_xid;
144                 LASSERT (xid == bulk->bp_xid);   /* should all be the same */
145
146                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
147                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
148                 desc->bd_md.niov++;
149                 desc->bd_md.length += bulk->bp_buflen;
150         }
151
152         LASSERT (desc->bd_md.niov == desc->bd_page_count);
153         LASSERT (desc->bd_md.niov != 0);
154
155         rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
156                        &desc->bd_md_h);
157
158         ptlrpc_put_bulk_iov (desc, iov);        /* move down to reduce latency to send */
159
160         if (rc != PTL_OK) {
161                 CERROR("PtlMDBind failed: %d\n", rc);
162                 LBUG();
163                 RETURN(rc);
164         }
165
166         remote_id.nid = desc->bd_connection->c_peer.peer_nid;
167         remote_id.pid = 0;
168
169         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid %d xid %d\n",
170                desc->bd_md.niov, desc->bd_md.length,
171                desc->bd_portal, remote_id.nid, remote_id.pid, xid);
172
173         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
174                     desc->bd_portal, 0, xid, 0, 0);
175         if (rc != PTL_OK) {
176                 CERROR("PtlPut("LPU64", %d, %d) failed: %d\n",
177                        remote_id.nid, desc->bd_portal, xid, rc);
178                 PtlMDUnlink(desc->bd_md_h);
179                 LBUG();
180                 RETURN(rc);
181         }
182
183         RETURN(0);
184 }
185
186 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
187 {
188         struct list_head *tmp, *next;
189         int rc;
190         __u32 xid = 0;
191         struct iovec *iov;
192         ptl_process_id_t source_id;
193         ENTRY;
194
195         iov = ptlrpc_get_bulk_iov (desc);
196         if (iov == NULL)
197                 return (-ENOMEM);
198
199         desc->bd_md.start = iov;
200         desc->bd_md.niov = 0;
201         desc->bd_md.length = 0;
202         desc->bd_md.threshold = 1;
203         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
204         desc->bd_md.user_ptr = desc;
205         desc->bd_md.eventq = bulk_sink_eq;
206
207         list_for_each_safe(tmp, next, &desc->bd_page_list) {
208                 struct ptlrpc_bulk_page *bulk;
209                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
210
211                 LASSERT (desc->bd_md.niov < desc->bd_page_count);
212
213                 if (desc->bd_md.niov == 0)
214                         xid = bulk->bp_xid;
215                 LASSERT (xid == bulk->bp_xid);   /* should all be the same */
216
217                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
218                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
219                 desc->bd_md.niov++;
220                 desc->bd_md.length += bulk->bp_buflen;
221         }
222
223         LASSERT (desc->bd_md.niov == desc->bd_page_count);
224         LASSERT (desc->bd_md.niov != 0);
225
226         source_id.nid = desc->bd_connection->c_peer.peer_nid;
227         source_id.pid = PTL_PID_ANY;
228
229         rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni,
230                          desc->bd_portal, source_id, xid, 0,
231                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
232
233         ptlrpc_put_bulk_iov (desc, iov);
234
235         if (rc != PTL_OK) {
236                 CERROR("PtlMEAttach failed: %d\n", rc);
237                 LBUG();
238                 GOTO(cleanup, rc);
239         }
240
241         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
242                          &desc->bd_md_h);
243         if (rc != PTL_OK) {
244                 CERROR("PtlMDAttach failed: %d\n", rc);
245                 LBUG();
246                 GOTO(cleanup, rc);
247         }
248
249         CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
250                "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
251                xid, desc->bd_portal);
252
253         RETURN(0);
254
255  cleanup:
256         ptlrpc_abort_bulk(desc);
257
258         return rc;
259 }
260
261 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
262 {
263         /* This should be safe: these handles are initialized to be
264          * invalid in ptlrpc_prep_bulk() */
265         PtlMDUnlink(desc->bd_md_h);
266         PtlMEUnlink(desc->bd_me_h);
267
268         return 0;
269 }
270
271 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
272 {
273         if (req->rq_repmsg == NULL) {
274                 CERROR("bad: someone called ptlrpc_reply when they meant "
275                        "ptlrpc_error\n");
276                 return -EINVAL;
277         }
278
279         /* FIXME: we need to increment the count of handled events */
280         if (req->rq_type != PTL_RPC_MSG_ERR)
281                 req->rq_type = PTL_RPC_MSG_REPLY;
282         //req->rq_repmsg->conn = req->rq_connection->c_remote_conn;
283         //req->rq_repmsg->token = req->rq_connection->c_remote_token;
284         req->rq_repmsg->status = HTON__u32(req->rq_status);
285         return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
286 }
287
288 int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
289 {
290         int rc;
291         ENTRY;
292
293         if (req->rq_repmsg) {
294                 CERROR("req already has repmsg\n");
295                 LBUG();
296         }
297
298         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
299         if (rc)
300                 RETURN(rc);
301
302         req->rq_repmsg->type = HTON__u32(PTL_RPC_MSG_ERR);
303
304         rc = ptlrpc_reply(svc, req);
305         RETURN(rc);
306 }
307
308 int ptl_send_rpc(struct ptlrpc_request *request)
309 {
310         int rc;
311         char *repbuf;
312         ptl_process_id_t source_id;
313
314         ENTRY;
315
316         if (request->rq_type != PTL_RPC_MSG_REQUEST) {
317                 CERROR("wrong packet type sent %d\n",
318                        NTOH__u32(request->rq_reqmsg->type));
319                 LBUG();
320                 RETURN(EINVAL);
321         }
322         if (request->rq_replen == 0) {
323                 CERROR("request->rq_replen is 0!\n");
324                 RETURN(EINVAL);
325         }
326
327         /* request->rq_repmsg is set only when the reply comes in, in
328          * client_packet_callback() */
329         if (request->rq_reply_md.start)
330                 OBD_FREE(request->rq_reply_md.start, request->rq_replen);
331
332         OBD_ALLOC(repbuf, request->rq_replen);
333         if (!repbuf) {
334                 LBUG();
335                 RETURN(ENOMEM);
336         }
337
338         // down(&request->rq_client->cli_rpc_sem);
339
340         source_id.nid = request->rq_connection->c_peer.peer_nid;
341         source_id.pid = PTL_PID_ANY;
342
343         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
344                          request->rq_import->imp_client->cli_reply_portal,
345                          source_id, request->rq_xid, 0, PTL_UNLINK,
346                          PTL_INS_AFTER, &request->rq_reply_me_h);
347         if (rc != PTL_OK) {
348                 CERROR("PtlMEAttach failed: %d\n", rc);
349                 LBUG();
350                 GOTO(cleanup, rc);
351         }
352
353         request->rq_reply_md.start = repbuf;
354         request->rq_reply_md.length = request->rq_replen;
355         request->rq_reply_md.threshold = 1;
356         request->rq_reply_md.options = PTL_MD_OP_PUT;
357         request->rq_reply_md.user_ptr = request;
358         request->rq_reply_md.eventq = reply_in_eq;
359
360         rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
361                          PTL_UNLINK, &request->rq_reply_md_h);
362         if (rc != PTL_OK) {
363                 CERROR("PtlMDAttach failed: %d\n", rc);
364                 LBUG();
365                 GOTO(cleanup2, rc);
366         }
367
368         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64", portal %u\n",
369                request->rq_replen, request->rq_xid,
370                request->rq_import->imp_client->cli_reply_portal);
371
372         rc = ptl_send_buf(request, request->rq_connection,
373                           request->rq_import->imp_client->cli_request_portal);
374         RETURN(rc);
375
376  cleanup2:
377         PtlMEUnlink(request->rq_reply_me_h);
378  cleanup:
379         OBD_FREE(repbuf, request->rq_replen);
380         // up(&request->rq_client->cli_rpc_sem);
381
382         return rc;
383 }
384
385 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
386 {
387         struct ptlrpc_service *service = rqbd->rqbd_service;
388         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
389         int rc;
390         ptl_md_t dummy;
391         ptl_handle_md_t md_h;
392
393         /* Attach the leading ME on which we build the ring */
394         rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
395                          match_id, 0, ~0, 
396                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
397         if (rc != PTL_OK) {
398                 CERROR("PtlMEAttach failed: %d\n", rc);
399                 LBUG();
400         }
401
402         dummy.start         = rqbd->rqbd_buffer;
403         dummy.length        = service->srv_buf_size;
404         dummy.max_offset    = service->srv_buf_size;
405         dummy.threshold     = 1;
406         dummy.options       = PTL_MD_OP_PUT;
407         dummy.user_ptr      = rqbd;
408         dummy.eventq        = service->srv_eq_h;
409
410         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
411         if (rc != PTL_OK) {
412                 /* cleanup */
413                 CERROR("PtlMDAttach failed: %d\n", rc);
414                 LBUG();
415         }
416 }