Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32 #include "ptlrpc_internal.h"
33
34 static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len, 
35                          ptl_ack_req_t ack, struct ptlrpc_cb_id *cbid,
36                          struct ptlrpc_connection *conn, int portal, __u64 xid)
37 {
38         ptl_process_id_t remote_id;
39         int              rc;
40         int              rc2;
41         ptl_md_t         md;
42         char str[PTL_NALFMT_SIZE];
43         ENTRY;
44
45         LASSERT (portal != 0);
46         LASSERT (conn != NULL);
47         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" (%s) on %s\n",
48                 conn, conn->c_peer.peer_ni->pni_name,
49                 conn->c_peer.peer_nid,
50                 portals_nid2str(conn->c_peer.peer_ni->pni_number,
51                                 conn->c_peer.peer_nid, str),
52                 conn->c_peer.peer_ni->pni_name);
53
54         remote_id.nid = conn->c_peer.peer_nid,
55         remote_id.pid = 0;
56
57         md.start     = base;
58         md.length    = len;
59         md.threshold = (ack == PTL_ACK_REQ) ? 2 : 1;
60         md.options   = 0;
61         md.user_ptr  = cbid;
62         md.eventq    = conn->c_peer.peer_ni->pni_eq_h;
63
64         if (ack == PTL_ACK_REQ &&
65             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
66                 /* don't ask for the ack to simulate failing client */
67                 ack = PTL_NOACK_REQ;
68                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
69         }
70
71         rc = PtlMDBind (conn->c_peer.peer_ni->pni_ni_h, md, mdh);
72         if (rc != PTL_OK) {
73                 CERROR ("PtlMDBind failed: %d\n", rc);
74                 LASSERT (rc == PTL_NOSPACE);
75                 RETURN (-ENOMEM);
76         }
77
78         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
79                len, portal, xid);
80
81         rc2 = PtlPut (*mdh, ack, remote_id, portal, 0, xid, 0, 0);
82         if (rc != PTL_OK) {
83                 /* We're going to get an UNLINK event when I unlink below,
84                  * which will complete just like any other failed send, so
85                  * I fall through and return success here! */
86                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
87                        remote_id.nid, portal, xid, rc);
88                 rc2 = PtlMDUnlink(*mdh);
89                 LASSERT (rc2 == PTL_OK);
90         }
91
92         RETURN (0);
93 }
94
95 int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
96 {
97         int                 rc;
98         int                 rc2;
99         struct ptlrpc_peer *peer;
100         ptl_process_id_t    remote_id;
101         ptl_md_t            md;
102         __u64               xid;
103         ENTRY;
104
105         /* NB no locking required until desc is on the network */
106         LASSERT (!desc->bd_network_rw);
107         LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
108                  desc->bd_type == BULK_GET_SINK);
109         desc->bd_success = 0;
110         peer = &desc->bd_export->exp_connection->c_peer;
111
112         md.start = &desc->bd_iov[0];
113         md.niov = desc->bd_page_count;
114         md.length = desc->bd_nob;
115         md.eventq = peer->peer_ni->pni_eq_h;
116         md.threshold = 2; /* SENT and ACK/REPLY */
117 #ifdef __KERNEL__
118         md.options = PTL_MD_KIOV;
119 #else
120         md.options = PTL_MD_IOV;
121 #endif
122         md.user_ptr = &desc->bd_cbid;
123         LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
124         LASSERT (desc->bd_cbid.cbid_arg == desc);
125
126         /* NB total length may be 0 for a read past EOF, so we send a 0
127          * length bulk, since the client expects a bulk event. */
128
129         rc = PtlMDBind(peer->peer_ni->pni_ni_h, md, &desc->bd_md_h);
130         if (rc != PTL_OK) {
131                 CERROR("PtlMDBind failed: %d\n", rc);
132                 LASSERT (rc == PTL_NOSPACE);
133                 RETURN(-ENOMEM);
134         }
135
136         /* Client's bulk and reply matchbits are the same */
137         xid = desc->bd_req->rq_xid;
138         remote_id.nid = peer->peer_nid;
139         remote_id.pid = 0;
140
141         CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d on %s "
142                "nid "LPX64" pid %d xid "LPX64"\n", 
143                md.niov, md.length, desc->bd_portal, peer->peer_ni->pni_name,
144                remote_id.nid, remote_id.pid, xid);
145
146         /* Network is about to get at the memory */
147         desc->bd_network_rw = 1;
148
149         if (desc->bd_type == BULK_PUT_SOURCE)
150                 rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, remote_id,
151                              desc->bd_portal, 0, xid, 0, 0);
152         else
153                 rc = PtlGet (desc->bd_md_h, remote_id,
154                              desc->bd_portal, 0, xid, 0);
155         
156         if (rc != PTL_OK) {
157                 /* Can't send, so we unlink the MD bound above.  The UNLINK
158                  * event this creates will signal completion with failure,
159                  * so we return SUCCESS here! */
160                 CERROR("Transfer("LPU64", %d, "LPX64") failed: %d\n",
161                        remote_id.nid, desc->bd_portal, xid, rc);
162                 rc2 = PtlMDUnlink(desc->bd_md_h);
163                 LASSERT (rc2 == PTL_OK);
164         }
165
166         RETURN(0);
167 }
168
169 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
170 {
171         /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
172          * serialises with completion callback) */
173         struct l_wait_info lwi;
174         int                rc;
175
176         LASSERT (!in_interrupt ());             /* might sleep */
177
178         if (!ptlrpc_bulk_active(desc))          /* completed or */
179                 return;                         /* never started */
180         
181         /* The unlink ensures the callback happens ASAP and is the last
182          * one.  If it fails, it must be because completion just
183          * happened. */
184
185         rc = PtlMDUnlink (desc->bd_md_h);
186         if (rc == PTL_INV_MD) {
187                 LASSERT(!ptlrpc_bulk_active(desc));
188                 return;
189         }
190         
191         LASSERT (rc == PTL_OK);
192
193         for (;;) {
194                 /* Network access will complete in finite time but the HUGE
195                  * timeout lets us CWARN for visibility of sluggish NALs */
196                 lwi = LWI_TIMEOUT (300 * HZ, NULL, NULL);
197                 rc = l_wait_event(desc->bd_waitq, 
198                                   !ptlrpc_bulk_active(desc), &lwi);
199                 if (rc == 0)
200                         return;
201
202                 LASSERT(rc == -ETIMEDOUT);
203                 CWARN("Unexpectedly long timeout: desc %p\n", desc);
204         }
205 }
206
207 int ptlrpc_register_bulk (struct ptlrpc_request *req)
208 {
209         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
210         struct ptlrpc_peer *peer;
211         int rc;
212         int rc2;
213         ptl_process_id_t source_id;
214         ptl_handle_me_t  me_h;
215         ptl_md_t         md;
216         ENTRY;
217
218         /* NB no locking required until desc is on the network */
219         LASSERT (desc->bd_nob > 0);
220         LASSERT (!desc->bd_network_rw);
221         LASSERT (desc->bd_page_count <= PTL_MD_MAX_PAGES);
222         LASSERT (desc->bd_req != NULL);
223         LASSERT (desc->bd_type == BULK_PUT_SINK ||
224                  desc->bd_type == BULK_GET_SOURCE);
225
226         desc->bd_success = 0;
227
228         peer = &desc->bd_import->imp_connection->c_peer;
229
230         md.start = &desc->bd_iov[0];
231         md.niov = desc->bd_page_count;
232         md.length = desc->bd_nob;
233         md.eventq = peer->peer_ni->pni_eq_h;
234         md.threshold = 1;                       /* PUT or GET */
235         md.options = (desc->bd_type == BULK_GET_SOURCE) ? 
236                      PTL_MD_OP_GET : PTL_MD_OP_PUT;
237 #ifdef __KERNEL__
238         md.options |= PTL_MD_KIOV;
239 #else
240         md.options |= PTL_MD_IOV;
241 #endif
242         md.user_ptr = &desc->bd_cbid;
243         LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
244         LASSERT (desc->bd_cbid.cbid_arg == desc);
245
246         /* XXX Registering the same xid on retried bulk makes my head
247          * explode trying to understand how the original request's bulk
248          * might interfere with the retried request -eeb */
249         LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
250         desc->bd_registered = 1;
251         desc->bd_last_xid = req->rq_xid;
252
253         source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
254         source_id.pid = PTL_PID_ANY;
255
256         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
257                          desc->bd_portal, source_id, req->rq_xid, 0,
258                          PTL_UNLINK, PTL_INS_AFTER, &me_h);
259         if (rc != PTL_OK) {
260                 CERROR("PtlMEAttach failed: %d\n", rc);
261                 LASSERT (rc == PTL_NOSPACE);
262                 RETURN (-ENOMEM);
263         }
264
265         /* About to let the network at it... */
266         desc->bd_network_rw = 1;
267         rc = PtlMDAttach(me_h, md, PTL_UNLINK, &desc->bd_md_h);
268         if (rc != PTL_OK) {
269                 CERROR("PtlMDAttach failed: %d\n", rc);
270                 LASSERT (rc == PTL_NOSPACE);
271                 desc->bd_network_rw = 0;
272                 rc2 = PtlMEUnlink (me_h);
273                 LASSERT (rc2 == PTL_OK);
274                 RETURN (-ENOMEM);
275         }
276
277         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
278                "portal %u on %s\n",
279                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
280                md.niov, md.length,
281                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
282         RETURN(0);
283 }
284
285 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
286 {
287         /* Disconnect a bulk desc from the network. Idempotent. Not
288          * thread-safe (i.e. only interlocks with completion callback). */
289         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
290         wait_queue_head_t       *wq;
291         struct l_wait_info       lwi;
292         int                      rc;
293
294         LASSERT (!in_interrupt ());             /* might sleep */
295
296         if (!ptlrpc_bulk_active(desc))          /* completed or */
297                 return;                         /* never registered */
298         
299         LASSERT (desc->bd_req == req);          /* bd_req NULL until registered */
300
301         /* the unlink ensures the callback happens ASAP and is the last
302          * one.  If it fails, it must be because completion just
303          * happened. */
304
305         rc = PtlMDUnlink (desc->bd_md_h);
306         if (rc == PTL_INV_MD) {
307                 LASSERT(!ptlrpc_bulk_active(desc));
308                 return;
309         }
310         
311         LASSERT (rc == PTL_OK);
312         
313         if (desc->bd_req->rq_set != NULL)
314                 wq = &req->rq_set->set_waitq;
315         else
316                 wq = &req->rq_reply_waitq;
317
318         for (;;) {
319                 /* Network access will complete in finite time but the HUGE
320                  * timeout lets us CWARN for visibility of sluggish NALs */
321                 lwi = LWI_TIMEOUT (300 * HZ, NULL, NULL);
322                 rc = l_wait_event(*wq, !ptlrpc_bulk_active(desc), &lwi);
323                 if (rc == 0)
324                         return;
325                 
326                 LASSERT (rc == -ETIMEDOUT);
327                 CWARN("Unexpectedly long timeout: desc %p\n", desc);
328         }
329 }
330
331 int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
332 {
333         struct ptlrpc_service     *svc = req->rq_rqbd->rqbd_srv_ni->sni_service;
334         struct ptlrpc_reply_state *rs = req->rq_reply_state;
335         struct ptlrpc_connection  *conn;
336         int                        rc;
337
338         /* We must already have a reply buffer (only ptlrpc_error() may be
339          * called without one).  We must also have a request buffer which
340          * is either the actual (swabbed) incoming request, or a saved copy
341          * if this is a req saved in target_queue_final_reply(). */
342         LASSERT (req->rq_reqmsg != NULL);
343         LASSERT (rs != NULL);
344         LASSERT (req->rq_repmsg != NULL);
345         LASSERT (may_be_difficult || !rs->rs_difficult);
346         LASSERT (req->rq_repmsg == &rs->rs_msg);
347         LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);
348         LASSERT (rs->rs_cb_id.cbid_arg == rs);
349
350         LASSERT (req->rq_repmsg != NULL);
351         if (req->rq_type != PTL_RPC_MSG_ERR)
352                 req->rq_type = PTL_RPC_MSG_REPLY;
353
354         req->rq_repmsg->type   = req->rq_type;
355         req->rq_repmsg->status = req->rq_status;
356         req->rq_repmsg->opc    = req->rq_reqmsg->opc;
357
358         if (req->rq_export == NULL) 
359                 conn = ptlrpc_get_connection(&req->rq_peer, NULL);
360         else
361                 conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
362
363         atomic_inc (&svc->srv_outstanding_replies);
364
365         rc = ptl_send_buf (&rs->rs_md_h, req->rq_repmsg, req->rq_replen,
366                            rs->rs_difficult ? PTL_ACK_REQ : PTL_NOACK_REQ,
367                            &rs->rs_cb_id, conn,
368                            svc->srv_rep_portal, req->rq_xid);
369         if (rc != 0) {
370                 atomic_dec (&svc->srv_outstanding_replies);
371
372                 if (!rs->rs_difficult) {
373                         /* Callers other than target_send_reply() expect me
374                          * to clean up on a comms error */
375                         lustre_free_reply_state (rs);
376                         req->rq_reply_state = NULL;
377                         req->rq_repmsg = NULL;
378                 }
379         }
380         ptlrpc_put_connection(conn);
381         return rc;
382 }
383
384 int ptlrpc_reply (struct ptlrpc_request *req)
385 {
386         return (ptlrpc_send_reply (req, 0));
387 }
388
389 int ptlrpc_error(struct ptlrpc_request *req)
390 {
391         int rc;
392         ENTRY;
393
394         if (!req->rq_repmsg) {
395                 rc = lustre_pack_reply(req, 0, NULL, NULL);
396                 if (rc)
397                         RETURN(rc);
398         }
399
400         req->rq_type = PTL_RPC_MSG_ERR;
401
402         rc = ptlrpc_send_reply (req, 0);
403         RETURN(rc);
404 }
405
406 int ptl_send_rpc(struct ptlrpc_request *request)
407 {
408         int rc;
409         int rc2;
410         struct ptlrpc_connection *connection;
411         unsigned long flags;
412         ptl_process_id_t source_id;
413         ptl_handle_me_t  reply_me_h;
414         ptl_md_t         reply_md;
415         ENTRY;
416
417         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
418
419         /* If this is a re-transmit, we're required to have disengaged
420          * cleanly from the previous attempt */
421         LASSERT (!request->rq_receiving_reply);
422
423         connection = request->rq_import->imp_connection;
424
425         if (request->rq_bulk != NULL) {
426                 rc = ptlrpc_register_bulk (request);
427                 if (rc != 0)
428                         RETURN(rc);
429         }
430
431         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
432         request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
433         request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
434
435         source_id.nid = connection->c_peer.peer_nid;
436         source_id.pid = PTL_PID_ANY;
437
438         LASSERT (request->rq_replen != 0);
439         if (request->rq_repmsg == NULL)
440                 OBD_ALLOC(request->rq_repmsg, request->rq_replen);
441         if (request->rq_repmsg == NULL)
442                 GOTO(cleanup_bulk, rc = -ENOMEM);
443
444         rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
445                          request->rq_reply_portal, /* XXX FIXME bug 249 */
446                          source_id, request->rq_xid, 0, PTL_UNLINK,
447                          PTL_INS_AFTER, &reply_me_h);
448         if (rc != PTL_OK) {
449                 CERROR("PtlMEAttach failed: %d\n", rc);
450                 LASSERT (rc == PTL_NOSPACE);
451                 GOTO(cleanup_repmsg, rc = -ENOMEM);
452         }
453
454         spin_lock_irqsave (&request->rq_lock, flags);
455         /* If the MD attach succeeds, there _will_ be a reply_in callback */
456         request->rq_receiving_reply = 1;
457         /* Clear any flags that may be present from previous sends. */
458         request->rq_replied = 0;
459         request->rq_err = 0;
460         request->rq_timedout = 0;
461         request->rq_resend = 0;
462         request->rq_restart = 0;
463         spin_unlock_irqrestore (&request->rq_lock, flags);
464
465         reply_md.start     = request->rq_repmsg;
466         reply_md.length    = request->rq_replen;
467         reply_md.threshold = 1;
468         reply_md.options   = PTL_MD_OP_PUT;
469         reply_md.user_ptr  = &request->rq_reply_cbid;
470         reply_md.eventq    = connection->c_peer.peer_ni->pni_eq_h;
471
472         rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK, 
473                          &request->rq_reply_md_h);
474         if (rc != PTL_OK) {
475                 CERROR("PtlMDAttach failed: %d\n", rc);
476                 LASSERT (rc == PTL_NOSPACE);
477                 GOTO(cleanup_me, rc -ENOMEM);
478         }
479
480         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
481                ", portal %u on %s\n",
482                request->rq_replen, request->rq_xid,
483                request->rq_reply_portal,
484                connection->c_peer.peer_ni->pni_name);
485
486         ptlrpc_request_addref(request);        /* +1 ref for the SENT callback */
487
488         request->rq_sent = LTIME_S(CURRENT_TIME);
489         ptlrpc_pinger_sending_on_import(request->rq_import);
490         rc = ptl_send_buf(&request->rq_req_md_h, 
491                           request->rq_reqmsg, request->rq_reqlen,
492                           PTL_NOACK_REQ, &request->rq_req_cbid, 
493                           connection,
494                           request->rq_request_portal,
495                           request->rq_xid);
496         if (rc == 0) {
497                 ptlrpc_lprocfs_rpc_sent(request);
498                 RETURN(rc);
499         }
500
501         ptlrpc_req_finished (request);          /* drop callback ref */
502
503  cleanup_me:
504         /* MEUnlink is safe; the PUT didn't even get off the ground, and
505          * nobody apart from the PUT's target has the right nid+XID to
506          * access the reply buffer. */
507         rc2 = PtlMEUnlink(reply_me_h);
508         LASSERT (rc2 == PTL_OK);
509         /* UNLINKED callback called synchronously */
510         LASSERT (!request->rq_receiving_reply);
511
512  cleanup_repmsg:
513         OBD_FREE(request->rq_repmsg, request->rq_replen);
514         request->rq_repmsg = NULL;
515
516  cleanup_bulk:
517         if (request->rq_bulk != NULL)
518                 ptlrpc_unregister_bulk(request);
519
520         return rc;
521 }
522
523 int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
524 {
525         struct ptlrpc_srv_ni    *srv_ni = rqbd->rqbd_srv_ni;
526         struct ptlrpc_service   *service = srv_ni->sni_service;
527         static ptl_process_id_t  match_id = {PTL_NID_ANY, PTL_PID_ANY};
528         int                      rc;
529         ptl_md_t                 md;
530         ptl_handle_me_t          me_h;
531
532         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
533                service->srv_req_portal, srv_ni->sni_ni->pni_name,
534                srv_ni->sni_ni->pni_ni_h.nal_idx,
535                srv_ni->sni_ni->pni_ni_h.cookie);
536
537         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD))
538                 return (-ENOMEM);
539
540         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
541                          match_id, 0, ~0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
542         if (rc != PTL_OK) {
543                 CERROR("PtlMEAttach failed: %d\n", rc);
544                 return (-ENOMEM);
545         }
546
547         LASSERT(rqbd->rqbd_refcount == 0);
548         rqbd->rqbd_refcount = 1;
549
550         md.start      = rqbd->rqbd_buffer;
551         md.length     = service->srv_buf_size;
552         md.max_size   = service->srv_max_req_size;
553         md.threshold  = PTL_MD_THRESH_INF;
554         md.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
555         md.user_ptr   = &rqbd->rqbd_cbid;
556         md.eventq     = srv_ni->sni_ni->pni_eq_h;
557         
558         rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h);
559         if (rc == PTL_OK)
560                 return (0);
561
562         CERROR("PtlMDAttach failed: %d; \n", rc);
563         LASSERT (rc == PTL_NOSPACE);
564         rc = PtlMEUnlink (me_h);
565         LASSERT (rc == PTL_OK);
566         rqbd->rqbd_refcount = 0;
567         
568         return (-ENOMEM);
569 }