Whamcloud - gitweb
62a76c48314a0768385b204f4688feb50ba27038
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32
33 static int ptl_send_buf(struct ptlrpc_request *request,
34                         struct ptlrpc_connection *conn, int portal)
35 {
36         int rc;
37         ptl_process_id_t remote_id;
38         ptl_handle_md_t md_h;
39         ptl_ack_req_t ack_req;
40
41         LASSERT(conn);
42         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n", 
43                 conn, conn->c_peer.peer_ni->pni_name,
44                 conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
45
46         request->rq_req_md.user_ptr = request;
47
48         switch (request->rq_type) {
49         case PTL_RPC_MSG_REQUEST:
50                 request->rq_reqmsg->type = HTON__u32(request->rq_type);
51                 request->rq_req_md.start = request->rq_reqmsg;
52                 request->rq_req_md.length = request->rq_reqlen;
53                 request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_request_out_eq_h;
54                 break;
55         case PTL_RPC_MSG_ERR:
56         case PTL_RPC_MSG_REPLY:
57                 request->rq_repmsg->type = HTON__u32(request->rq_type);
58                 request->rq_req_md.start = request->rq_repmsg;
59                 request->rq_req_md.length = request->rq_replen;
60                 request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_reply_out_eq_h;
61                 break;
62         default:
63                 LBUG();
64                 return -1; /* notreached */
65         }
66         if (request->rq_flags & PTL_RPC_FL_WANT_ACK) {
67                 request->rq_req_md.threshold = 2; /* SENT and ACK */
68                 ack_req = PTL_ACK_REQ;
69         } else {
70                 request->rq_req_md.threshold = 1;
71                 ack_req = PTL_NOACK_REQ;
72         }
73         request->rq_req_md.options = PTL_MD_OP_PUT;
74         request->rq_req_md.user_ptr = request;
75
76         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
77                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
78                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
79         }
80
81         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md, &md_h);
82         if (rc != 0) {
83                 CERROR("PtlMDBind failed: %d\n", rc);
84                 LBUG();
85                 return rc;
86         }
87
88         remote_id.nid = conn->c_peer.peer_nid;
89         remote_id.pid = 0;
90
91         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
92                request->rq_req_md.length, portal, request->rq_xid);
93
94         if (!portal)
95                 LBUG();
96         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
97         if (rc != PTL_OK) {
98                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
99                        remote_id.nid, portal, request->rq_xid, rc);
100                 PtlMDUnlink(md_h);
101         }
102
103         return rc;
104 }
105
106 static inline struct iovec *
107 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
108 {
109         struct iovec *iov;
110
111         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
112                 return (desc->bd_iov);
113
114         OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec));
115         if (iov == NULL)
116                 LBUG();
117
118         return (iov);
119 }
120
121 static inline void
122 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
123 {
124         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
125                 return;
126
127         OBD_FREE (iov, desc->bd_page_count * sizeof (struct iovec));
128 }
129
130 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
131 {
132         int rc;
133         struct ptlrpc_peer *peer;
134         struct list_head *tmp, *next;
135         ptl_process_id_t remote_id;
136         __u32 xid = 0;
137         struct iovec *iov;
138         ENTRY;
139
140         iov = ptlrpc_get_bulk_iov (desc);
141         if (iov == NULL)
142                 RETURN (-ENOMEM);
143
144         peer = &desc->bd_connection->c_peer;
145
146         desc->bd_md.start = iov;
147         desc->bd_md.niov = 0;
148         desc->bd_md.length = 0;
149         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
150         desc->bd_md.threshold = 2; /* SENT and ACK */
151         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
152         desc->bd_md.user_ptr = desc;
153
154         atomic_set(&desc->bd_source_callback_count, 2);
155
156         list_for_each_safe(tmp, next, &desc->bd_page_list) {
157                 struct ptlrpc_bulk_page *bulk;
158                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
159
160                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
161
162                 if (desc->bd_md.niov == 0)
163                         xid = bulk->bp_xid;
164                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
165
166                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
167                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
168                 if (iov[desc->bd_md.niov].iov_len <= 0) {
169                         CERROR("bad bp_buflen[%d] @ %p: %d\n", desc->bd_md.niov,
170                                bulk->bp_buf, bulk->bp_buflen);
171                         CERROR("desc: xid %u, pages %d, ptl %d, ref %d\n",
172                                xid, desc->bd_page_count, desc->bd_portal,
173                                atomic_read(&desc->bd_refcount));
174                         LBUG();
175                 }
176                 desc->bd_md.niov++;
177                 desc->bd_md.length += bulk->bp_buflen;
178         }
179
180         LASSERT(desc->bd_md.niov == desc->bd_page_count);
181         LASSERT(desc->bd_md.niov != 0);
182
183         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
184                        &desc->bd_md_h);
185
186         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
187
188         if (rc != PTL_OK) {
189                 CERROR("PtlMDBind failed: %d\n", rc);
190                 LBUG();
191                 RETURN(rc);
192         }
193
194         remote_id.nid = peer->peer_nid;
195         remote_id.pid = 0;
196
197         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
198                "nid "LPX64" pid %d xid %d\n", 
199                desc->bd_md.niov, desc->bd_md.length,
200                desc->bd_portal, peer->peer_ni->pni_name,
201                remote_id.nid, remote_id.pid, xid);
202
203         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
204                     desc->bd_portal, 0, xid, 0, 0);
205         if (rc != PTL_OK) {
206                 CERROR("PtlPut("LPU64", %d, %d) failed: %d\n",
207                        remote_id.nid, desc->bd_portal, xid, rc);
208                 PtlMDUnlink(desc->bd_md_h);
209                 LBUG();
210                 RETURN(rc);
211         }
212
213         RETURN(0);
214 }
215
216 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
217 {
218         int rc;
219         struct ptlrpc_peer *peer;
220         struct list_head *tmp, *next;
221         ptl_process_id_t remote_id;
222         __u32 xid = 0;
223         struct iovec *iov;
224         ENTRY;
225
226         iov = ptlrpc_get_bulk_iov (desc);
227         if (iov == NULL)
228                 RETURN (-ENOMEM);
229
230         peer = &desc->bd_connection->c_peer;
231
232         desc->bd_md.start = iov;
233         desc->bd_md.niov = 0;
234         desc->bd_md.length = 0;
235         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
236         desc->bd_md.threshold = 2; /* SENT and REPLY */
237         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
238         desc->bd_md.user_ptr = desc;
239
240         atomic_set(&desc->bd_source_callback_count, 2);
241
242         list_for_each_safe(tmp, next, &desc->bd_page_list) {
243                 struct ptlrpc_bulk_page *bulk;
244                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
245
246                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
247
248                 if (desc->bd_md.niov == 0)
249                         xid = bulk->bp_xid;
250                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
251
252                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
253                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
254                 if (iov[desc->bd_md.niov].iov_len <= 0) {
255                         CERROR("bad bulk %p bp_buflen[%d] @ %p: %d\n", bulk,
256                                desc->bd_md.niov, bulk->bp_buf, bulk->bp_buflen);
257                         CERROR("desc %p: xid %u, pages %d, ptl %d, ref %d\n",
258                                desc, xid, desc->bd_page_count, desc->bd_portal,
259                                atomic_read(&desc->bd_refcount));
260                         LBUG();
261                 }
262                 desc->bd_md.niov++;
263                 desc->bd_md.length += bulk->bp_buflen;
264         }
265
266         LASSERT(desc->bd_md.niov == desc->bd_page_count);
267         LASSERT(desc->bd_md.niov != 0);
268
269         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
270                        &desc->bd_md_h);
271
272         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
273
274         if (rc != PTL_OK) {
275                 CERROR("PtlMDBind failed: %d\n", rc);
276                 LBUG();
277                 RETURN(rc);
278         }
279
280         remote_id.nid = desc->bd_connection->c_peer.peer_nid;
281         remote_id.pid = 0;
282
283         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
284                "nid "LPX64" pid %d xid %d\n", 
285                desc->bd_md.niov, desc->bd_md.length,
286                desc->bd_portal, peer->peer_ni->pni_name,
287                remote_id.nid, remote_id.pid, xid);
288
289         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0, xid, 0);
290         if (rc != PTL_OK) {
291                 CERROR("PtlGet("LPU64", %d, %d) failed: %d\n",
292                        remote_id.nid, desc->bd_portal, xid, rc);
293                 PtlMDUnlink(desc->bd_md_h);
294                 LBUG();
295                 RETURN(rc);
296         }
297
298         RETURN(0);
299 }
300
301 static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
302 {
303         struct ptlrpc_peer *peer;
304         struct list_head *tmp, *next;
305         int rc;
306         __u32 xid = 0;
307         struct iovec *iov;
308         ptl_process_id_t source_id;
309         ENTRY;
310
311         if (desc->bd_page_count > PTL_MD_MAX_IOV) {
312                 CERROR("iov longer than %d pages not supported (count=%d)\n",
313                        PTL_MD_MAX_IOV, desc->bd_page_count);
314                 RETURN(-EINVAL);
315         }
316
317         iov = ptlrpc_get_bulk_iov (desc);
318         if (iov == NULL)
319                 return (-ENOMEM);
320
321         peer = &desc->bd_connection->c_peer;
322         
323         desc->bd_md.start = iov;
324         desc->bd_md.niov = 0;
325         desc->bd_md.length = 0;
326         desc->bd_md.threshold = 1;
327         desc->bd_md.user_ptr = desc;
328
329         list_for_each_safe(tmp, next, &desc->bd_page_list) {
330                 struct ptlrpc_bulk_page *bulk;
331                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
332
333                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
334
335                 if (desc->bd_md.niov == 0)
336                         xid = bulk->bp_xid;
337                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
338
339                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
340                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
341                 desc->bd_md.niov++;
342                 desc->bd_md.length += bulk->bp_buflen;
343         }
344
345         LASSERT(desc->bd_md.niov == desc->bd_page_count);
346         LASSERT(desc->bd_md.niov != 0);
347
348         source_id.nid = desc->bd_connection->c_peer.peer_nid;
349         source_id.pid = PTL_PID_ANY;
350
351         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
352                          desc->bd_portal, source_id, xid, 0,
353                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
354
355         if (rc != PTL_OK) {
356                 CERROR("PtlMEAttach failed: %d\n", rc);
357                 LBUG();
358                 GOTO(cleanup, rc);
359         }
360
361         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
362                          &desc->bd_md_h);
363         if (rc != PTL_OK) {
364                 CERROR("PtlMDAttach failed: %d\n", rc);
365                 LBUG();
366                 GOTO(cleanup, rc);
367         }
368
369         ptlrpc_put_bulk_iov (desc, iov);
370
371         CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
372                "portal %u on %s\n", desc->bd_md.niov, desc->bd_md.length,
373                xid, desc->bd_portal, peer->peer_ni->pni_name);
374
375         RETURN(0);
376
377  cleanup:
378         ptlrpc_put_bulk_iov (desc, iov);
379         ptlrpc_abort_bulk(desc);
380
381         return rc;
382 }
383
384 int ptlrpc_register_bulk_get(struct ptlrpc_bulk_desc *desc)
385 {
386         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
387         desc->bd_md.eventq = 
388                 desc->bd_connection->c_peer.peer_ni->pni_bulk_get_source_eq_h;
389
390         return ptlrpc_register_bulk_shared(desc);
391 }
392
393 int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc)
394 {
395         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
396         desc->bd_md.eventq = 
397                 desc->bd_connection->c_peer.peer_ni->pni_bulk_put_sink_eq_h;
398
399         return ptlrpc_register_bulk_shared(desc);
400 }
401
402 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
403 {
404         /* This should be safe: these handles are initialized to be
405          * invalid in ptlrpc_prep_bulk() */
406         PtlMDUnlink(desc->bd_md_h);
407         PtlMEUnlink(desc->bd_me_h);
408
409         return 0;
410 }
411
412 void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
413 {
414         LASSERT(list_empty(&desc->bd_set_chain));
415
416         ptlrpc_bulk_addref(desc);
417         atomic_inc(&set->brw_refcount);
418         desc->bd_brw_set = set;
419         list_add(&desc->bd_set_chain, &set->brw_desc_head);
420 }
421
422 void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
423 {
424         atomic_dec(&desc->bd_brw_set->brw_refcount);
425         list_del_init(&desc->bd_set_chain);
426         ptlrpc_bulk_decref(desc);
427 }
428
429 struct obd_brw_set *obd_brw_set_new(void)
430 {
431         struct obd_brw_set *set;
432
433         OBD_ALLOC(set, sizeof(*set));
434
435         if (set != NULL) {
436                 init_waitqueue_head(&set->brw_waitq);
437                 INIT_LIST_HEAD(&set->brw_desc_head);
438                 atomic_set(&set->brw_refcount, 0);
439         }
440
441         return set;
442 }
443
444 void obd_brw_set_free(struct obd_brw_set *set)
445 {
446         struct list_head *tmp, *next;
447         ENTRY;
448
449         list_for_each_safe(tmp, next, &set->brw_desc_head) {
450                 struct ptlrpc_bulk_desc *desc =
451                         list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
452
453                 CERROR("Unfinished bulk descriptor: %p\n", desc);
454
455                 ptlrpc_abort_bulk(desc);
456         }
457         OBD_FREE(set, sizeof(*set));
458         EXIT;
459         return;
460 }
461
462 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
463 {
464         if (req->rq_repmsg == NULL) {
465                 CERROR("bad: someone called ptlrpc_reply when they meant "
466                        "ptlrpc_error\n");
467                 return -EINVAL;
468         }
469
470         /* FIXME: we need to increment the count of handled events */
471         if (req->rq_type != PTL_RPC_MSG_ERR)
472                 req->rq_type = PTL_RPC_MSG_REPLY;
473         //req->rq_repmsg->conn = req->rq_connection->c_remote_conn;
474         //req->rq_repmsg->token = req->rq_connection->c_remote_token;
475         req->rq_repmsg->status = HTON__u32(req->rq_status);
476         return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
477 }
478
479 int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
480 {
481         int rc;
482         ENTRY;
483
484         if (!req->rq_repmsg) {
485                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
486                                      &req->rq_repmsg);
487                 if (rc)
488                         RETURN(rc);
489         }
490
491
492         req->rq_type = PTL_RPC_MSG_ERR;
493
494         rc = ptlrpc_reply(svc, req);
495         RETURN(rc);
496 }
497
498 int ptl_send_rpc(struct ptlrpc_request *request)
499 {
500         int rc;
501         char *repbuf;
502         ptl_process_id_t source_id;
503
504         ENTRY;
505
506         if (request->rq_type != PTL_RPC_MSG_REQUEST) {
507                 CERROR("wrong packet type sent %d\n",
508                        NTOH__u32(request->rq_reqmsg->type));
509                 LBUG();
510                 RETURN(EINVAL);
511         }
512
513         source_id.nid = request->rq_connection->c_peer.peer_nid;
514         source_id.pid = PTL_PID_ANY;
515
516         /* add a ref, which will be balanced in request_out_callback */
517         ptlrpc_request_addref(request);
518         if (request->rq_replen != 0) {
519                 if (request->rq_reply_md.start != NULL) {
520                         rc = PtlMEUnlink(request->rq_reply_me_h);
521                         if (rc != PTL_OK && rc != PTL_INV_ME) {
522                                 CERROR("rc %d\n", rc);
523                                 LBUG();
524                         }
525                         repbuf = (char *)request->rq_reply_md.start;
526                         request->rq_repmsg = NULL;
527                 } else {
528                         OBD_ALLOC(repbuf, request->rq_replen);
529                         if (!repbuf) {
530                                 LBUG();
531                                 RETURN(ENOMEM);
532                         }
533                 }
534
535                 rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
536                              request->rq_reply_portal,/* XXX FIXME bug 625069 */
537                                  source_id, request->rq_xid, 0, PTL_UNLINK,
538                                  PTL_INS_AFTER, &request->rq_reply_me_h);
539                 if (rc != PTL_OK) {
540                         CERROR("PtlMEAttach failed: %d\n", rc);
541                         LBUG();
542                         GOTO(cleanup, rc);
543                 }
544
545                 request->rq_reply_md.start = repbuf;
546                 request->rq_reply_md.length = request->rq_replen;
547                 request->rq_reply_md.threshold = 1;
548                 request->rq_reply_md.options = PTL_MD_OP_PUT;
549                 request->rq_reply_md.user_ptr = request;
550                 request->rq_reply_md.eventq =
551                         request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
552
553                 rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
554                                  PTL_UNLINK, NULL);
555                 if (rc != PTL_OK) {
556                         CERROR("PtlMDAttach failed: %d\n", rc);
557                         LBUG();
558                         GOTO(cleanup2, rc);
559                 }
560
561                 CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
562                        ", portal %u on %s\n",
563                        request->rq_replen, request->rq_xid,
564                        request->rq_reply_portal,
565                        request->rq_connection->c_peer.peer_ni->pni_name);
566         }
567
568         /* Clear any flags that may be present from previous sends,
569          * except for REPLAY, NO_RESEND and WANT_ACK. */
570         request->rq_flags &= (PTL_RPC_FL_REPLAY | PTL_RPC_FL_NO_RESEND |
571                               PTL_RPC_FL_WANT_ACK);
572         rc = ptl_send_buf(request, request->rq_connection,
573                           request->rq_request_portal);
574         RETURN(rc);
575
576  cleanup2:
577         PtlMEUnlink(request->rq_reply_me_h);
578  cleanup:
579         OBD_FREE(repbuf, request->rq_replen);
580         // up(&request->rq_client->cli_rpc_sem);
581
582         return rc;
583 }
584
585 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
586 {
587         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
588         struct ptlrpc_service *service = srv_ni->sni_service;
589         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
590         int rc;
591         ptl_md_t dummy;
592         ptl_handle_md_t md_h;
593
594         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
595
596         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx.%lx\n",
597                service->srv_req_portal, srv_ni->sni_ni->pni_name,
598                srv_ni->sni_ni->pni_ni_h.nal_idx,
599                srv_ni->sni_ni->pni_ni_h.handle_idx);
600
601         /* Attach the leading ME on which we build the ring */
602         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
603                          match_id, 0, ~0,
604                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
605         if (rc != PTL_OK) {
606                 CERROR("PtlMEAttach failed: %d\n", rc);
607                 LBUG();
608         }
609
610         dummy.start      = rqbd->rqbd_buffer;
611         dummy.length     = service->srv_buf_size;
612         dummy.max_size   = service->srv_max_req_size;
613         dummy.threshold  = PTL_MD_THRESH_INF;
614         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
615         dummy.user_ptr   = rqbd;
616         dummy.eventq     = srv_ni->sni_eq_h;
617
618         atomic_inc(&srv_ni->sni_nrqbds_receiving);
619         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
620
621         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
622         if (rc != PTL_OK) {
623                 CERROR("PtlMDAttach failed: %d\n", rc);
624                 LBUG();
625 #warning proper cleanup required
626                 PtlMEUnlink (rqbd->rqbd_me_h);
627                 atomic_set(&rqbd->rqbd_refcount, 0);
628                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
629         }
630 }