Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32
33 static int ptl_send_buf(struct ptlrpc_request *request,
34                         struct ptlrpc_connection *conn, int portal)
35 {
36         int rc;
37         ptl_process_id_t remote_id;
38         ptl_handle_md_t md_h;
39         ptl_ack_req_t ack_req;
40
41         LASSERT(conn);
42         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n", 
43                 conn, conn->c_peer.peer_ni->pni_name,
44                 conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
45
46         request->rq_req_md.user_ptr = request;
47
48         switch (request->rq_type) {
49         case PTL_RPC_MSG_REQUEST:
50                 request->rq_reqmsg->type = HTON__u32(request->rq_type);
51                 request->rq_req_md.start = request->rq_reqmsg;
52                 request->rq_req_md.length = request->rq_reqlen;
53                 request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_request_out_eq_h;
54                 break;
55         case PTL_RPC_MSG_ERR:
56         case PTL_RPC_MSG_REPLY:
57                 request->rq_repmsg->type = HTON__u32(request->rq_type);
58                 request->rq_req_md.start = request->rq_repmsg;
59                 request->rq_req_md.length = request->rq_replen;
60                 request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_reply_out_eq_h;
61                 break;
62         default:
63                 LBUG();
64                 return -1; /* notreached */
65         }
66         if (request->rq_flags & PTL_RPC_FL_WANT_ACK) {
67                 request->rq_req_md.threshold = 2; /* SENT and ACK */
68                 ack_req = PTL_ACK_REQ;
69         } else {
70                 request->rq_req_md.threshold = 1;
71                 ack_req = PTL_NOACK_REQ;
72         }
73         request->rq_req_md.options = PTL_MD_OP_PUT;
74         request->rq_req_md.user_ptr = request;
75
76         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
77                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
78                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
79         }
80
81         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md, &md_h);
82         if (rc != 0) {
83                 CERROR("PtlMDBind failed: %d\n", rc);
84                 LBUG();
85                 return rc;
86         }
87
88         remote_id.nid = conn->c_peer.peer_nid;
89         remote_id.pid = 0;
90
91         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
92                request->rq_req_md.length, portal, request->rq_xid);
93
94         if (!portal)
95                 LBUG();
96         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
97         if (rc != PTL_OK) {
98                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
99                        remote_id.nid, portal, request->rq_xid, rc);
100                 PtlMDUnlink(md_h);
101         }
102
103         return rc;
104 }
105
106 static inline struct iovec *
107 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
108 {
109         struct iovec *iov;
110
111         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
112                 return (desc->bd_iov);
113
114         OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec));
115         if (iov == NULL)
116                 LBUG();
117
118         return (iov);
119 }
120
121 static inline void
122 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
123 {
124         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
125                 return;
126
127         OBD_FREE (iov, desc->bd_page_count * sizeof (struct iovec));
128 }
129
130 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
131 {
132         int rc;
133         struct ptlrpc_peer *peer;
134         struct list_head *tmp, *next;
135         ptl_process_id_t remote_id;
136         __u32 xid = 0;
137         struct iovec *iov;
138         ENTRY;
139
140         iov = ptlrpc_get_bulk_iov (desc);
141         if (iov == NULL)
142                 RETURN (-ENOMEM);
143
144         peer = &desc->bd_connection->c_peer;
145
146         desc->bd_md.start = iov;
147         desc->bd_md.niov = 0;
148         desc->bd_md.length = 0;
149         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
150         desc->bd_md.threshold = 2; /* SENT and ACK */
151         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
152         desc->bd_md.user_ptr = desc;
153
154         atomic_set(&desc->bd_source_callback_count, 2);
155
156         list_for_each_safe(tmp, next, &desc->bd_page_list) {
157                 struct ptlrpc_bulk_page *bulk;
158                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
159
160                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
161
162                 if (desc->bd_md.niov == 0)
163                         xid = bulk->bp_xid;
164                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
165
166                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
167                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
168                 if (iov[desc->bd_md.niov].iov_len <= 0) {
169                         CERROR("bad bp_buflen[%d] @ %p: %d\n", desc->bd_md.niov,
170                                bulk->bp_buf, bulk->bp_buflen);
171                         CERROR("desc: xid %u, pages %d, ptl %d, ref %d\n",
172                                xid, desc->bd_page_count, desc->bd_portal,
173                                atomic_read(&desc->bd_refcount));
174                         LBUG();
175                 }
176                 desc->bd_md.niov++;
177                 desc->bd_md.length += bulk->bp_buflen;
178         }
179
180         LASSERT(desc->bd_md.niov == desc->bd_page_count);
181         LASSERT(desc->bd_md.niov != 0);
182
183         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
184                        &desc->bd_md_h);
185
186         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
187
188         if (rc != PTL_OK) {
189                 CERROR("PtlMDBind failed: %d\n", rc);
190                 LBUG();
191                 RETURN(rc);
192         }
193
194         remote_id.nid = peer->peer_nid;
195         remote_id.pid = 0;
196
197         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
198                "nid "LPX64" pid %d xid %d\n", 
199                desc->bd_md.niov, desc->bd_md.length,
200                desc->bd_portal, peer->peer_ni->pni_name,
201                remote_id.nid, remote_id.pid, xid);
202
203         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
204                     desc->bd_portal, 0, xid, 0, 0);
205         if (rc != PTL_OK) {
206                 CERROR("PtlPut("LPU64", %d, %d) failed: %d\n",
207                        remote_id.nid, desc->bd_portal, xid, rc);
208                 PtlMDUnlink(desc->bd_md_h);
209                 LBUG();
210                 RETURN(rc);
211         }
212
213         RETURN(0);
214 }
215
216 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
217 {
218         int rc;
219         struct ptlrpc_peer *peer;
220         struct list_head *tmp, *next;
221         ptl_process_id_t remote_id;
222         __u32 xid = 0;
223         struct iovec *iov;
224         ENTRY;
225
226         iov = ptlrpc_get_bulk_iov (desc);
227         if (iov == NULL)
228                 RETURN (-ENOMEM);
229
230         peer = &desc->bd_connection->c_peer;
231
232         desc->bd_md.start = iov;
233         desc->bd_md.niov = 0;
234         desc->bd_md.length = 0;
235         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
236         desc->bd_md.threshold = 2; /* SENT and REPLY */
237         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
238         desc->bd_md.user_ptr = desc;
239
240         atomic_set(&desc->bd_source_callback_count, 2);
241
242         list_for_each_safe(tmp, next, &desc->bd_page_list) {
243                 struct ptlrpc_bulk_page *bulk;
244                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
245
246                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
247
248                 if (desc->bd_md.niov == 0)
249                         xid = bulk->bp_xid;
250                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
251
252                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
253                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
254                 if (iov[desc->bd_md.niov].iov_len <= 0) {
255                         CERROR("bad bulk %p bp_buflen[%d] @ %p: %d\n", bulk,
256                                desc->bd_md.niov, bulk->bp_buf, bulk->bp_buflen);
257                         CERROR("desc %p: xid %u, pages %d, ptl %d, ref %d\n",
258                                desc, xid, desc->bd_page_count, desc->bd_portal,
259                                atomic_read(&desc->bd_refcount));
260                         LBUG();
261                 }
262                 desc->bd_md.niov++;
263                 desc->bd_md.length += bulk->bp_buflen;
264         }
265
266         LASSERT(desc->bd_md.niov == desc->bd_page_count);
267         LASSERT(desc->bd_md.niov != 0);
268
269         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
270                        &desc->bd_md_h);
271
272         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
273
274         if (rc != PTL_OK) {
275                 CERROR("PtlMDBind failed: %d\n", rc);
276                 LBUG();
277                 RETURN(rc);
278         }
279
280         remote_id.nid = desc->bd_connection->c_peer.peer_nid;
281         remote_id.pid = 0;
282
283         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
284                "nid "LPX64" pid %d xid %d\n", 
285                desc->bd_md.niov, desc->bd_md.length,
286                desc->bd_portal, peer->peer_ni->pni_name,
287                remote_id.nid, remote_id.pid, xid);
288
289         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0, xid, 0);
290         if (rc != PTL_OK) {
291                 CERROR("PtlGet("LPU64", %d, %d) failed: %d\n",
292                        remote_id.nid, desc->bd_portal, xid, rc);
293                 PtlMDUnlink(desc->bd_md_h);
294                 LBUG();
295                 RETURN(rc);
296         }
297
298         RETURN(0);
299 }
300
301 static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
302 {
303         struct ptlrpc_peer *peer;
304         struct list_head *tmp, *next;
305         int rc;
306         __u32 xid = 0;
307         struct iovec *iov;
308         ptl_process_id_t source_id;
309         ENTRY;
310
311         if (desc->bd_page_count > PTL_MD_MAX_IOV) {
312                 CERROR("iov longer than %d pages not supported (count=%d)\n",
313                        PTL_MD_MAX_IOV, desc->bd_page_count);
314                 RETURN(-EINVAL);
315         }
316
317         iov = ptlrpc_get_bulk_iov (desc);
318         if (iov == NULL)
319                 return (-ENOMEM);
320
321         peer = &desc->bd_connection->c_peer;
322         
323         desc->bd_md.start = iov;
324         desc->bd_md.niov = 0;
325         desc->bd_md.length = 0;
326         desc->bd_md.threshold = 1;
327         desc->bd_md.user_ptr = desc;
328
329         list_for_each_safe(tmp, next, &desc->bd_page_list) {
330                 struct ptlrpc_bulk_page *bulk;
331                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
332
333                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
334
335                 if (desc->bd_md.niov == 0)
336                         xid = bulk->bp_xid;
337                 LASSERT(xid == bulk->bp_xid);   /* should all be the same */
338
339                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
340                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
341                 desc->bd_md.niov++;
342                 desc->bd_md.length += bulk->bp_buflen;
343         }
344
345         LASSERT(desc->bd_md.niov == desc->bd_page_count);
346         LASSERT(desc->bd_md.niov != 0);
347
348         source_id.nid = desc->bd_connection->c_peer.peer_nid;
349         source_id.pid = PTL_PID_ANY;
350
351         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
352                          desc->bd_portal, source_id, xid, 0,
353                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
354
355         if (rc != PTL_OK) {
356                 CERROR("PtlMEAttach failed: %d\n", rc);
357                 LBUG();
358                 GOTO(cleanup, rc);
359         }
360
361         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
362                          &desc->bd_md_h);
363         if (rc != PTL_OK) {
364                 CERROR("PtlMDAttach failed: %d\n", rc);
365                 LBUG();
366                 GOTO(cleanup, rc);
367         }
368
369         ptlrpc_put_bulk_iov (desc, iov);
370
371         CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
372                "portal %u on %s\n", desc->bd_md.niov, desc->bd_md.length,
373                xid, desc->bd_portal, peer->peer_ni->pni_name);
374
375         RETURN(0);
376
377  cleanup:
378         ptlrpc_put_bulk_iov (desc, iov);
379         ptlrpc_abort_bulk(desc);
380
381         return rc;
382 }
383
384 int ptlrpc_register_bulk_get(struct ptlrpc_bulk_desc *desc)
385 {
386         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
387         desc->bd_md.eventq = 
388                 desc->bd_connection->c_peer.peer_ni->pni_bulk_get_source_eq_h;
389
390         return ptlrpc_register_bulk_shared(desc);
391 }
392
393 int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc)
394 {
395         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
396         desc->bd_md.eventq = 
397                 desc->bd_connection->c_peer.peer_ni->pni_bulk_put_sink_eq_h;
398
399         return ptlrpc_register_bulk_shared(desc);
400 }
401
402 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
403 {
404         int rc1, rc2;
405         /* This should be safe: these handles are initialized to be
406          * invalid in ptlrpc_prep_bulk() */
407         rc1 = PtlMDUnlink(desc->bd_md_h);
408         if (rc1 != PTL_OK)
409                 CERROR("PtlMDUnlink: %d\n", rc1);
410         rc2 = PtlMEUnlink(desc->bd_me_h);
411         if (rc2 != PTL_OK)
412                 CERROR("PtlMEUnlink: %d\n", rc2);
413
414         return rc1 ? rc1 : rc2;
415 }
416
417 void obd_brw_set_addref(struct obd_brw_set *set)
418 {
419         atomic_inc(&set->brw_refcount);
420 }
421
422 void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
423 {
424         LASSERT(list_empty(&desc->bd_set_chain));
425
426         ptlrpc_bulk_addref(desc);
427         atomic_inc(&set->brw_desc_count);
428         desc->bd_brw_set = set;
429         list_add(&desc->bd_set_chain, &set->brw_desc_head);
430 }
431
432 void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
433 {
434         atomic_dec(&desc->bd_brw_set->brw_desc_count);
435         list_del_init(&desc->bd_set_chain);
436         ptlrpc_bulk_decref(desc);
437 }
438
439 struct obd_brw_set *obd_brw_set_new(void)
440 {
441         struct obd_brw_set *set;
442
443         OBD_ALLOC(set, sizeof(*set));
444
445         if (set != NULL) {
446                 init_waitqueue_head(&set->brw_waitq);
447                 INIT_LIST_HEAD(&set->brw_desc_head);
448                 atomic_set(&set->brw_refcount, 1);
449                 atomic_set(&set->brw_desc_count, 0);
450         }
451
452         return set;
453 }
454
455 static void obd_brw_set_free(struct obd_brw_set *set)
456 {
457         struct list_head *tmp, *next;
458         ENTRY;
459
460         list_for_each_safe(tmp, next, &set->brw_desc_head) {
461                 struct ptlrpc_bulk_desc *desc =
462                         list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
463
464                 CERROR("Unfinished bulk descriptor: %p\n", desc);
465
466                 ptlrpc_abort_bulk(desc);
467         }
468         OBD_FREE(set, sizeof(*set));
469         EXIT;
470         return;
471 }
472
473 void obd_brw_set_decref(struct obd_brw_set *set)
474 {
475         ENTRY;
476         if (atomic_dec_and_test(&set->brw_refcount))
477                 obd_brw_set_free(set);
478         EXIT;
479 }
480
481 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
482 {
483         if (req->rq_repmsg == NULL) {
484                 CERROR("bad: someone called ptlrpc_reply when they meant "
485                        "ptlrpc_error\n");
486                 return -EINVAL;
487         }
488
489         /* FIXME: we need to increment the count of handled events */
490         if (req->rq_type != PTL_RPC_MSG_ERR)
491                 req->rq_type = PTL_RPC_MSG_REPLY;
492         //req->rq_repmsg->conn = req->rq_connection->c_remote_conn;
493         //req->rq_repmsg->token = req->rq_connection->c_remote_token;
494         req->rq_repmsg->status = HTON__u32(req->rq_status);
495         return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
496 }
497
498 int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
499 {
500         int rc;
501         ENTRY;
502
503         if (!req->rq_repmsg) {
504                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
505                                      &req->rq_repmsg);
506                 if (rc)
507                         RETURN(rc);
508         }
509
510
511         req->rq_type = PTL_RPC_MSG_ERR;
512
513         rc = ptlrpc_reply(svc, req);
514         RETURN(rc);
515 }
516
517 int ptl_send_rpc(struct ptlrpc_request *request)
518 {
519         int rc;
520         char *repbuf;
521         ptl_process_id_t source_id;
522
523         ENTRY;
524
525         if (request->rq_type != PTL_RPC_MSG_REQUEST) {
526                 CERROR("wrong packet type sent %d\n",
527                        NTOH__u32(request->rq_reqmsg->type));
528                 LBUG();
529                 RETURN(EINVAL);
530         }
531
532         source_id.nid = request->rq_connection->c_peer.peer_nid;
533         source_id.pid = PTL_PID_ANY;
534
535         /* add a ref, which will be balanced in request_out_callback */
536         ptlrpc_request_addref(request);
537         if (request->rq_replen != 0) {
538                 if (request->rq_reply_md.start != NULL) {
539                         rc = PtlMEUnlink(request->rq_reply_me_h);
540                         if (rc != PTL_OK && rc != PTL_INV_ME) {
541                                 CERROR("rc %d\n", rc);
542                                 LBUG();
543                         }
544                         repbuf = (char *)request->rq_reply_md.start;
545                         request->rq_repmsg = NULL;
546                 } else {
547                         OBD_ALLOC(repbuf, request->rq_replen);
548                         if (!repbuf) {
549                                 LBUG();
550                                 RETURN(ENOMEM);
551                         }
552                 }
553
554                 rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
555                              request->rq_reply_portal,/* XXX FIXME bug 625069 */
556                                  source_id, request->rq_xid, 0, PTL_UNLINK,
557                                  PTL_INS_AFTER, &request->rq_reply_me_h);
558                 if (rc != PTL_OK) {
559                         CERROR("PtlMEAttach failed: %d\n", rc);
560                         LBUG();
561                         GOTO(cleanup, rc);
562                 }
563
564                 request->rq_reply_md.start = repbuf;
565                 request->rq_reply_md.length = request->rq_replen;
566                 request->rq_reply_md.threshold = 1;
567                 request->rq_reply_md.options = PTL_MD_OP_PUT;
568                 request->rq_reply_md.user_ptr = request;
569                 request->rq_reply_md.eventq =
570                         request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
571
572                 rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
573                                  PTL_UNLINK, NULL);
574                 if (rc != PTL_OK) {
575                         CERROR("PtlMDAttach failed: %d\n", rc);
576                         LBUG();
577                         GOTO(cleanup2, rc);
578                 }
579
580                 CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
581                        ", portal %u on %s\n",
582                        request->rq_replen, request->rq_xid,
583                        request->rq_reply_portal,
584                        request->rq_connection->c_peer.peer_ni->pni_name);
585         }
586
587         /* Clear any flags that may be present from previous sends,
588          * except for REPLAY, NO_RESEND and WANT_ACK. */
589         request->rq_flags &= (PTL_RPC_FL_REPLAY | PTL_RPC_FL_NO_RESEND |
590                               PTL_RPC_FL_WANT_ACK);
591         rc = ptl_send_buf(request, request->rq_connection,
592                           request->rq_request_portal);
593         RETURN(rc);
594
595  cleanup2:
596         PtlMEUnlink(request->rq_reply_me_h);
597  cleanup:
598         OBD_FREE(repbuf, request->rq_replen);
599         // up(&request->rq_client->cli_rpc_sem);
600
601         return rc;
602 }
603
604 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
605 {
606         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
607         struct ptlrpc_service *service = srv_ni->sni_service;
608         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
609         int rc;
610         ptl_md_t dummy;
611         ptl_handle_md_t md_h;
612
613         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
614
615         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx.%lx\n",
616                service->srv_req_portal, srv_ni->sni_ni->pni_name,
617                srv_ni->sni_ni->pni_ni_h.nal_idx,
618                srv_ni->sni_ni->pni_ni_h.handle_idx);
619
620         /* Attach the leading ME on which we build the ring */
621         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
622                          match_id, 0, ~0,
623                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
624         if (rc != PTL_OK) {
625                 CERROR("PtlMEAttach failed: %d\n", rc);
626                 LBUG();
627         }
628
629         dummy.start      = rqbd->rqbd_buffer;
630         dummy.length     = service->srv_buf_size;
631         dummy.max_size   = service->srv_max_req_size;
632         dummy.threshold  = PTL_MD_THRESH_INF;
633         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
634         dummy.user_ptr   = rqbd;
635         dummy.eventq     = srv_ni->sni_eq_h;
636
637         atomic_inc(&srv_ni->sni_nrqbds_receiving);
638         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
639
640         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
641         if (rc != PTL_OK) {
642                 CERROR("PtlMDAttach failed: %d\n", rc);
643                 LBUG();
644 #warning proper cleanup required
645                 PtlMEUnlink (rqbd->rqbd_me_h);
646                 atomic_set(&rqbd->rqbd_refcount, 0);
647                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
648         }
649 }