Whamcloud - gitweb
356660c0a2c79a6dd61f4b04ce530f39bd6d4e51
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19  #include "ptllnd.h"
20
21 void
22 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
23 {
24         memset(rxbp, 0, sizeof(*rxbp));
25         spin_lock_init(&rxbp->rxbp_lock);
26         INIT_LIST_HEAD(&rxbp->rxbp_list);
27 }
28
29 void
30 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
31 {
32         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
33
34         LASSERT(rxb->rxb_refcount == 0);
35         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
36         LASSERT(!rxb->rxb_posted);
37         LASSERT(rxb->rxb_idle);
38
39         list_del(&rxb->rxb_list);
40         rxbp->rxbp_count--;
41
42         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
43         LIBCFS_FREE(rxb, sizeof(*rxb));
44 }
45
46 int
47 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
48 {
49         int               bufsize;
50         int               msgs_per_buffer;
51         int               rc;
52         kptl_rx_buffer_t *rxb;
53         char             *buffer;
54         unsigned long     flags;
55
56         bufsize = kptllnd_rx_buffer_size();
57         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
58
59         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
60
61         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
62
63         for (;;) {
64                 if (rxbp->rxbp_shutdown) {
65                         rc = -ESHUTDOWN;
66                         break;
67                 }
68                 
69                 if (rxbp->rxbp_reserved + count <= 
70                     rxbp->rxbp_count * msgs_per_buffer) {
71                         rc = 0;
72                         break;
73                 }
74                 
75                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
76                 
77                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
78                 LIBCFS_ALLOC(buffer, bufsize);
79
80                 if (rxb == NULL || buffer == NULL) {
81                         CERROR("Failed to allocate rx buffer\n");
82
83                         if (rxb != NULL)
84                                 LIBCFS_FREE(rxb, sizeof(*rxb));
85                         if (buffer != NULL)
86                                 LIBCFS_FREE(buffer, bufsize);
87                         
88                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
89                         rc = -ENOMEM;
90                         break;
91                 }
92
93                 memset(rxb, 0, sizeof(*rxb));
94
95                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
96                 rxb->rxb_refcount = 0;
97                 rxb->rxb_pool = rxbp;
98                 rxb->rxb_idle = 0;
99                 rxb->rxb_posted = 0;
100                 rxb->rxb_buffer = buffer;
101                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
102
103                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
104                 
105                 if (rxbp->rxbp_shutdown) {
106                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
107                         
108                         LIBCFS_FREE(rxb, sizeof(*rxb));
109                         LIBCFS_FREE(buffer, bufsize);
110
111                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
112                         rc = -ESHUTDOWN;
113                         break;
114                 }
115                 
116                 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
117                 rxbp->rxbp_count++;
118
119                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
120                 
121                 kptllnd_rx_buffer_post(rxb);
122
123                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
124         }
125
126         if (rc == 0)
127                 rxbp->rxbp_reserved += count;
128
129         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
130
131         return rc;
132 }
133
134 void
135 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
136                                  int count)
137 {
138         unsigned long flags;
139
140         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
141
142         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
143         rxbp->rxbp_reserved -= count;
144
145         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
146 }
147
148 void
149 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
150 {
151         kptl_rx_buffer_t       *rxb;
152         int                     rc;
153         int                     i;
154         unsigned long           flags;
155         struct list_head       *tmp;
156         struct list_head       *nxt;
157         ptl_handle_md_t         mdh;
158
159         /* CAVEAT EMPTOR: I'm racing with everything here!!!  
160          *
161          * Buffers can still be posted after I set rxbp_shutdown because I
162          * can't hold rxbp_lock while I'm posting them.
163          *
164          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
165          * MD handle could become invalid under me.  I am vulnerable to portals
166          * re-using handles (i.e. make the same handle valid again, but for a
167          * different MD) from when the MD is actually unlinked, to when the
168          * event callback tells me it has been unlinked. */
169
170         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
171
172         rxbp->rxbp_shutdown = 1;
173
174         for (i = 9;; i++) {
175                 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
176                         rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
177                 
178                         if (rxb->rxb_idle) {
179                                 spin_unlock_irqrestore(&rxbp->rxbp_lock, 
180                                                        flags);
181                                 kptllnd_rx_buffer_destroy(rxb);
182                                 spin_lock_irqsave(&rxbp->rxbp_lock, 
183                                                   flags);
184                                 continue;
185                         }
186
187                         mdh = rxb->rxb_mdh;
188                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
189                                 continue;
190                         
191                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
192
193                         rc = PtlMDUnlink(mdh);
194
195                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
196                         
197 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
198                         /* callback clears rxb_mdh and drops net's ref
199                          * (which causes repost, but since I set
200                          * shutdown, it will just set the buffer
201                          * idle) */
202 #else
203                         if (rc == PTL_OK) {
204                                 rxb->rxb_posted = 0;
205                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
206                                 kptllnd_rx_buffer_decref_locked(rxb);
207                         }
208 #endif
209                 }
210
211                 if (list_empty(&rxbp->rxbp_list))
212                         break;
213
214                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
215
216                 /* Wait a bit for references to be dropped */
217                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
218                        "Waiting for %d Busy RX Buffers\n",
219                        rxbp->rxbp_count);
220
221                 cfs_pause(cfs_time_seconds(1));
222
223                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
224         }
225
226         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
227 }
228
229 void
230 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
231 {
232         int                     rc;
233         ptl_md_t                md;
234         ptl_handle_me_t         meh;
235         ptl_handle_md_t         mdh;
236         ptl_process_id_t        any;
237         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
238         unsigned long           flags;
239
240         LASSERT (!in_interrupt());
241         LASSERT (rxb->rxb_refcount == 0);
242         LASSERT (!rxb->rxb_idle);
243         LASSERT (!rxb->rxb_posted);
244         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
245
246         any.nid = PTL_NID_ANY;
247         any.pid = PTL_PID_ANY;
248
249         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
250
251         if (rxbp->rxbp_shutdown) {
252                 rxb->rxb_idle = 1;
253                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
254                 return;
255         }
256
257         rxb->rxb_refcount = 1;                  /* net's ref */
258         rxb->rxb_posted = 1;                    /* I'm posting */
259         
260         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
261
262         rc = PtlMEAttach(kptllnd_data.kptl_nih,
263                          *kptllnd_tunables.kptl_portal,
264                          any,
265                          LNET_MSG_MATCHBITS,
266                          0, /* all matchbits are valid - ignore none */
267                          PTL_UNLINK,
268                          PTL_INS_AFTER,
269                          &meh);
270         if (rc != PTL_OK) {
271                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
272                        kptllnd_errtype2str(rc), rc);
273                 goto failed;
274         }
275
276         /*
277          * Setup MD
278          */
279         md.start = rxb->rxb_buffer;
280         md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
281         md.threshold = PTL_MD_THRESH_INF;
282         md.options = PTL_MD_OP_PUT |
283                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
284                      PTL_MD_EVENT_START_DISABLE |
285                      PTL_MD_MAX_SIZE |
286                      PTL_MD_LOCAL_ALIGN8;
287         md.user_ptr = &rxb->rxb_eventarg;
288         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
289         md.eq_handle = kptllnd_data.kptl_eqh;
290
291         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
292         if (rc == PTL_OK) {
293                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
294                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
295                         rxb->rxb_mdh = mdh;
296                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
297                 return;
298         }
299         
300         CERROR("PtlMDAttach rxb failed %s(%d)\n",
301                kptllnd_errtype2str(rc), rc);
302         rc = PtlMEUnlink(meh);
303         LASSERT(rc == PTL_OK);
304
305  failed:
306         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
307         rxb->rxb_posted = 0;
308         /* XXX this will just try again immediately */
309         kptllnd_rx_buffer_decref_locked(rxb);
310         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
311 }
312
313 kptl_rx_t *
314 kptllnd_rx_alloc(void)
315 {
316         kptl_rx_t* rx;
317
318         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
319                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
320                 return NULL;
321         }
322
323         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
324         if (rx == NULL) {
325                 CERROR("Failed to allocate rx\n");
326                 return NULL;
327         }
328
329         memset(rx, 0, sizeof(*rx));
330         return rx;
331 }
332
333 void
334 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
335 {
336         kptl_rx_buffer_t *rxb = rx->rx_rxb;
337         kptl_peer_t      *peer = rx->rx_peer;
338         unsigned long     flags;
339
340         LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
341                  post_credit == PTLLND_POSTRX_PEER_CREDIT);
342
343         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
344
345         if (rxb != NULL)
346                 kptllnd_rx_buffer_decref(rxb);
347
348         if (peer != NULL) {
349                 /* Update credits (after I've decref-ed the buffer) */
350                 spin_lock_irqsave(&peer->peer_lock, flags);
351
352                 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
353                         peer->peer_outstanding_credits++;
354
355                 LASSERT (peer->peer_outstanding_credits +
356                          peer->peer_sent_credits <=
357                          *kptllnd_tunables.kptl_peercredits);
358
359                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
360                        libcfs_id2str(peer->peer_id), peer->peer_credits,
361                        peer->peer_outstanding_credits, peer->peer_sent_credits,
362                        rx);
363
364                 spin_unlock_irqrestore(&peer->peer_lock, flags);
365
366                 /* I might have to send back credits */
367                 kptllnd_peer_check_sends(peer);
368                 kptllnd_peer_decref(peer);
369         }
370
371         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
372 }
373
374 void
375 kptllnd_rx_buffer_callback (ptl_event_t *ev)
376 {
377         kptl_eventarg_t        *eva = ev->md.user_ptr;
378         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
379         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
380         kptl_rx_t              *rx;
381         int                     unlinked;
382         unsigned long           flags;
383
384 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
385         unlinked = ev->unlinked;
386 #else
387         unlinked = ev->type == PTL_EVENT_UNLINK;
388 #endif
389
390         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
391                kptllnd_ptlid2str(ev->initiator), 
392                kptllnd_evtype2str(ev->type), ev->type, rxb, 
393                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
394                unlinked);
395
396         LASSERT (!rxb->rxb_idle);
397         LASSERT (ev->md.start == rxb->rxb_buffer);
398         LASSERT (ev->offset + ev->mlength <= 
399                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
400         LASSERT (ev->type == PTL_EVENT_PUT_END || 
401                  ev->type == PTL_EVENT_UNLINK);
402         LASSERT (ev->type == PTL_EVENT_UNLINK ||
403                  ev->match_bits == LNET_MSG_MATCHBITS);
404
405         if (ev->ni_fail_type != PTL_NI_OK) {
406                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
407                        kptllnd_ptlid2str(ev->initiator),
408                        kptllnd_evtype2str(ev->type), ev->type, rxb,
409                        kptllnd_errtype2str(ev->ni_fail_type),
410                        ev->ni_fail_type, unlinked);
411
412         } else if (ev->type == PTL_EVENT_PUT_END &&
413                    !rxbp->rxbp_shutdown) {
414
415                 /* rxbp_shutdown sampled without locking!  I only treat it as a
416                  * hint since shutdown can start while rx's are queued on
417                  * kptl_sched_rxq. */
418 #if (PTL_MD_LOCAL_ALIGN8 == 0)
419                 /* Portals can't force message alignment - someone sending an
420                  * odd-length message will misalign subsequent messages and
421                  * force the fixup below...  */
422                 if ((ev->mlength & 7) != 0)
423                         CWARN("Message from %s has odd length "LPU64": "
424                               "probable version incompatibility\n",
425                               kptllnd_ptlid2str(ev->initiator),
426                               (__u64)ev->mlength);
427 #endif
428                 rx = kptllnd_rx_alloc();
429                 if (rx == NULL) {
430                         CERROR("Message from %s dropped: ENOMEM",
431                                kptllnd_ptlid2str(ev->initiator));
432                 } else {
433                         if ((ev->offset & 7) == 0) {
434                                 kptllnd_rx_buffer_addref(rxb);
435                                 rx->rx_rxb = rxb;
436                                 rx->rx_nob = ev->mlength;
437                                 rx->rx_msg = (kptl_msg_t *)
438                                              (rxb->rxb_buffer + ev->offset);
439                         } else {
440 #if (PTL_MD_LOCAL_ALIGN8 == 0)
441                                 /* Portals can't force alignment - copy into
442                                  * rx_space (avoiding overflow) to fix */
443                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
444                                 
445                                 rx->rx_rxb = NULL;
446                                 rx->rx_nob = MIN(maxlen, ev->mlength);
447                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
448                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
449                                        rx->rx_nob);
450 #else
451                                 /* Portals should have forced the alignment */
452                                 LBUG();
453 #endif
454                         }
455
456                         rx->rx_initiator = ev->initiator;
457                         rx->rx_treceived = jiffies;
458 #ifdef CRAY_XT3
459                         rx->rx_uid = ev->uid;
460 #endif
461                         /* Queue for attention */
462                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, 
463                                           flags);
464
465                         list_add_tail(&rx->rx_list, 
466                                       &kptllnd_data.kptl_sched_rxq);
467                         wake_up(&kptllnd_data.kptl_sched_waitq);
468
469                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, 
470                                                flags);
471                 }
472         }
473
474         if (unlinked) {
475                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
476
477                 rxb->rxb_posted = 0;
478                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
479                 kptllnd_rx_buffer_decref_locked(rxb);
480
481                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
482         }
483 }
484
485 void
486 kptllnd_nak (kptl_rx_t *rx)
487 {
488         /* Fire-and-forget a stub message that will let the peer know my
489          * protocol magic/version and make her drop/refresh any peer state she
490          * might have with me. */
491         ptl_md_t md = {
492                 .start        = kptllnd_data.kptl_nak_msg,
493                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
494                 .threshold    = 1,
495                 .options      = 0,
496                 .user_ptr     = NULL,
497                 .eq_handle    = PTL_EQ_NONE};
498         ptl_handle_md_t   mdh;
499         int               rc;
500
501         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
502         if (rc != PTL_OK) {
503                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
504                       kptllnd_ptlid2str(rx->rx_initiator),
505                       kptllnd_errtype2str(rc), rc);
506                 return;
507         }
508
509         rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
510                     *kptllnd_tunables.kptl_portal, 0,
511                     LNET_MSG_MATCHBITS, 0, 0);
512
513         if (rc != PTL_OK)
514                 CWARN("Can't NAK %s: put failed %s(%d)\n",
515                       kptllnd_ptlid2str(rx->rx_initiator),
516                       kptllnd_errtype2str(rc), rc);
517 }
518
519 void
520 kptllnd_rx_parse(kptl_rx_t *rx)
521 {
522         kptl_msg_t             *msg = rx->rx_msg;
523         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
524         kptl_peer_t            *peer;
525         int                     rc;
526         unsigned long           flags;
527         lnet_process_id_t       srcid;
528
529         LASSERT (rx->rx_peer == NULL);
530
531         if ((rx->rx_nob >= 4 &&
532              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
533               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
534             (rx->rx_nob >= 6 &&
535              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
536                msg->ptlm_version != PTLLND_MSG_VERSION) ||
537               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
538                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
539                 /* NAK incompatible versions
540                  * See other LNDs for how to handle this if/when ptllnd begins
541                  * to allow different versions to co-exist */
542                 CERROR("Bad version: got %04x expected %04x from %s\n",
543                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
544                                msg->ptlm_version : __swab16(msg->ptlm_version)),
545                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
546                 kptllnd_nak(rx);
547                 goto rx_done;
548         }
549         
550         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
551         if (rc != 0) {
552                 CERROR ("Error %d unpacking rx from %s\n",
553                         rc, kptllnd_ptlid2str(rx->rx_initiator));
554                 goto rx_done;
555         }
556
557         srcid.nid = msg->ptlm_srcnid;
558         srcid.pid = msg->ptlm_srcpid;
559
560         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
561                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
562                msg->ptlm_credits, rx, rx->rx_rxb, 
563                jiffies - rx->rx_treceived,
564                cfs_duration_sec(jiffies - rx->rx_treceived));
565
566         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
567                 CERROR("Bad source id %s from %s\n",
568                        libcfs_id2str(srcid),
569                        kptllnd_ptlid2str(rx->rx_initiator));
570                 goto rx_done;
571         }
572
573         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
574                 peer = kptllnd_id2peer(srcid);
575                 if (peer == NULL)
576                         goto rx_done;
577                 
578                 CWARN("NAK from %s (%s)\n",
579                       libcfs_id2str(srcid),
580                       kptllnd_ptlid2str(rx->rx_initiator));
581
582                 rc = -EPROTO;
583                 goto failed;
584         }
585
586         if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
587             msg->ptlm_dstpid != the_lnet.ln_pid) {
588                 CERROR("Bad dstid %s (expected %s) from %s\n",
589                        libcfs_id2str((lnet_process_id_t) {
590                                .nid = msg->ptlm_dstnid,
591                                .pid = msg->ptlm_dstpid}),
592                        libcfs_id2str((lnet_process_id_t) {
593                                .nid = kptllnd_data.kptl_ni->ni_nid,
594                                .pid = the_lnet.ln_pid}),
595                        kptllnd_ptlid2str(rx->rx_initiator));
596                 goto rx_done;
597         }
598
599         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
600                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
601                 if (peer == NULL)
602                         goto rx_done;
603         } else {
604                 peer = kptllnd_id2peer(srcid);
605                 if (peer == NULL) {
606                         CWARN("NAK %s: no connection; peer must reconnect\n",
607                               libcfs_id2str(srcid));
608                         /* NAK to make the peer reconnect */
609                         kptllnd_nak(rx);
610                         goto rx_done;
611                 }
612
613                 /* Ignore anything apart from HELLO while I'm waiting for it and
614                  * any messages for a previous incarnation of the connection */
615                 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
616                     msg->ptlm_dststamp < peer->peer_myincarnation) {
617                         kptllnd_peer_decref(peer);
618                         goto rx_done;
619                 }
620
621                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
622                         CERROR("%s: Unexpected srcstamp "LPX64" "
623                                "("LPX64" expected)\n",
624                                libcfs_id2str(peer->peer_id),
625                                msg->ptlm_srcstamp,
626                                peer->peer_incarnation);
627                         rc = -EPROTO;
628                         goto failed;
629                 }
630
631                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
632                         CERROR("%s: Unexpected dststamp "LPX64" "
633                                "("LPX64" expected)\n",
634                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
635                                peer->peer_myincarnation);
636                         rc = -EPROTO;
637                         goto failed;
638                 }
639         }
640
641         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
642                  msg->ptlm_srcpid == peer->peer_id.pid);
643
644         spin_lock_irqsave(&peer->peer_lock, flags);
645
646         /* Check peer only sends when I've sent her credits */
647         if (peer->peer_sent_credits == 0) {
648                 int  c = peer->peer_credits;
649                 int oc = peer->peer_outstanding_credits;
650                 int sc = peer->peer_sent_credits;
651
652                 spin_unlock_irqrestore(&peer->peer_lock, flags);
653
654                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
655                        libcfs_id2str(peer->peer_id), c, sc, oc);
656                 goto failed;
657         }
658         peer->peer_sent_credits--;
659
660         /* No check for credit overflow - the peer may post new
661          * buffers after the startup handshake. */
662         peer->peer_credits += msg->ptlm_credits;
663
664         /* This ensures the credit taken by NOOP can be returned */
665         if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
666                 peer->peer_outstanding_credits++;
667                 post_credit = PTLLND_POSTRX_NO_CREDIT;
668         }
669
670         spin_unlock_irqrestore(&peer->peer_lock, flags);
671
672         /* See if something can go out now that credits have come in */
673         if (msg->ptlm_credits != 0)
674                 kptllnd_peer_check_sends(peer);
675
676         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
677          * peer_outstanding_credits when it completes */
678         rx->rx_peer = peer;
679         kptllnd_peer_alive(peer);
680
681         switch (msg->ptlm_type) {
682         default:
683                 /* already checked by kptllnd_msg_unpack() */
684                 LBUG();
685
686         case PTLLND_MSG_TYPE_HELLO:
687                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
688                 goto rx_done;
689
690         case PTLLND_MSG_TYPE_NOOP:
691                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
692                 goto rx_done;
693
694         case PTLLND_MSG_TYPE_IMMEDIATE:
695                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
696                 rc = lnet_parse(kptllnd_data.kptl_ni,
697                                 &msg->ptlm_u.immediate.kptlim_hdr,
698                                 msg->ptlm_srcnid,
699                                 rx, 0);
700                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
701                         return;
702                 goto failed;
703                 
704         case PTLLND_MSG_TYPE_PUT:
705         case PTLLND_MSG_TYPE_GET:
706                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
707                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
708                         "PUT" : "GET");
709
710                 /* checked in kptllnd_msg_unpack() */
711                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
712                          PTL_RESERVED_MATCHBITS);
713
714                 /* Update last match bits seen */
715                 spin_lock_irqsave(&peer->peer_lock, flags);
716
717                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
718                     rx->rx_peer->peer_last_matchbits_seen)
719                         rx->rx_peer->peer_last_matchbits_seen =
720                                 msg->ptlm_u.rdma.kptlrm_matchbits;
721
722                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
723
724                 rc = lnet_parse(kptllnd_data.kptl_ni,
725                                 &msg->ptlm_u.rdma.kptlrm_hdr,
726                                 msg->ptlm_srcnid,
727                                 rx, 1);
728                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
729                         return;
730                 goto failed;
731          }
732
733  failed:
734         kptllnd_peer_close(peer, rc);
735         if (rx->rx_peer == NULL)                /* drop ref on peer */
736                 kptllnd_peer_decref(peer);      /* unless rx_done will */
737  rx_done:
738         kptllnd_rx_done(rx, post_credit);
739 }