Whamcloud - gitweb
847e265e2852c5ba936b255e09ad10cb04954ae7
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19  #include "ptllnd.h"
20
21 void
22 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
23 {
24         memset(rxbp, 0, sizeof(*rxbp));
25         spin_lock_init(&rxbp->rxbp_lock);
26         INIT_LIST_HEAD(&rxbp->rxbp_list);
27 }
28
29 void
30 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
31 {
32         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
33
34         LASSERT(rxb->rxb_refcount == 0);
35         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
36         LASSERT(!rxb->rxb_posted);
37         LASSERT(rxb->rxb_idle);
38
39         list_del(&rxb->rxb_list);
40         rxbp->rxbp_count--;
41
42         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
43         LIBCFS_FREE(rxb, sizeof(*rxb));
44 }
45
46 int
47 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
48 {
49         int               bufsize;
50         int               msgs_per_buffer;
51         int               rc;
52         kptl_rx_buffer_t *rxb;
53         char             *buffer;
54         unsigned long     flags;
55
56         bufsize = kptllnd_rx_buffer_size();
57         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
58
59         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
60
61         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
62
63         for (;;) {
64                 if (rxbp->rxbp_shutdown) {
65                         rc = -ESHUTDOWN;
66                         break;
67                 }
68                 
69                 if (rxbp->rxbp_reserved + count <= 
70                     rxbp->rxbp_count * msgs_per_buffer) {
71                         rc = 0;
72                         break;
73                 }
74                 
75                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
76                 
77                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
78                 LIBCFS_ALLOC(buffer, bufsize);
79
80                 if (rxb == NULL || buffer == NULL) {
81                         CERROR("Failed to allocate rx buffer\n");
82
83                         if (rxb != NULL)
84                                 LIBCFS_FREE(rxb, sizeof(*rxb));
85                         if (buffer != NULL)
86                                 LIBCFS_FREE(buffer, bufsize);
87                         
88                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
89                         rc = -ENOMEM;
90                         break;
91                 }
92
93                 memset(rxb, 0, sizeof(*rxb));
94
95                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
96                 rxb->rxb_refcount = 0;
97                 rxb->rxb_pool = rxbp;
98                 rxb->rxb_idle = 0;
99                 rxb->rxb_posted = 0;
100                 rxb->rxb_buffer = buffer;
101                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
102
103                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
104                 
105                 if (rxbp->rxbp_shutdown) {
106                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
107                         
108                         LIBCFS_FREE(rxb, sizeof(*rxb));
109                         LIBCFS_FREE(buffer, bufsize);
110
111                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
112                         rc = -ESHUTDOWN;
113                         break;
114                 }
115                 
116                 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
117                 rxbp->rxbp_count++;
118
119                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
120                 
121                 kptllnd_rx_buffer_post(rxb);
122
123                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
124         }
125
126         if (rc == 0)
127                 rxbp->rxbp_reserved += count;
128
129         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
130
131         return rc;
132 }
133
134 void
135 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
136                                  int count)
137 {
138         unsigned long flags;
139
140         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
141
142         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
143         rxbp->rxbp_reserved -= count;
144
145         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
146 }
147
148 void
149 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
150 {
151         kptl_rx_buffer_t       *rxb;
152         int                     rc;
153         int                     i;
154         unsigned long           flags;
155         struct list_head       *tmp;
156         struct list_head       *nxt;
157         ptl_handle_md_t         mdh;
158
159         /* CAVEAT EMPTOR: I'm racing with everything here!!!  
160          *
161          * Buffers can still be posted after I set rxbp_shutdown because I
162          * can't hold rxbp_lock while I'm posting them.
163          *
164          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
165          * MD handle could become invalid under me.  I am vulnerable to portals
166          * re-using handles (i.e. make the same handle valid again, but for a
167          * different MD) from when the MD is actually unlinked, to when the
168          * event callback tells me it has been unlinked. */
169
170         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
171
172         rxbp->rxbp_shutdown = 1;
173
174         for (i = 9;; i++) {
175                 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
176                         rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
177                 
178                         if (rxb->rxb_idle) {
179                                 spin_unlock_irqrestore(&rxbp->rxbp_lock, 
180                                                        flags);
181                                 kptllnd_rx_buffer_destroy(rxb);
182                                 spin_lock_irqsave(&rxbp->rxbp_lock, 
183                                                   flags);
184                                 continue;
185                         }
186
187                         mdh = rxb->rxb_mdh;
188                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
189                                 continue;
190                         
191                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
192
193                         rc = PtlMDUnlink(mdh);
194
195                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
196                         
197 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
198                         /* callback clears rxb_mdh and drops net's ref
199                          * (which causes repost, but since I set
200                          * shutdown, it will just set the buffer
201                          * idle) */
202 #else
203                         if (rc == PTL_OK) {
204                                 rxb->rxb_posted = 0;
205                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
206                                 kptllnd_rx_buffer_decref_locked(rxb);
207                         }
208 #endif
209                 }
210
211                 if (list_empty(&rxbp->rxbp_list))
212                         break;
213
214                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
215
216                 /* Wait a bit for references to be dropped */
217                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
218                        "Waiting for %d Busy RX Buffers\n",
219                        rxbp->rxbp_count);
220
221                 cfs_pause(cfs_time_seconds(1));
222
223                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
224         }
225
226         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
227 }
228
229 void
230 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
231 {
232         int                     rc;
233         ptl_md_t                md;
234         ptl_handle_me_t         meh;
235         ptl_handle_md_t         mdh;
236         ptl_process_id_t        any;
237         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
238         unsigned long           flags;
239
240         LASSERT (!in_interrupt());
241         LASSERT (rxb->rxb_refcount == 0);
242         LASSERT (!rxb->rxb_idle);
243         LASSERT (!rxb->rxb_posted);
244         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
245
246         any.nid = PTL_NID_ANY;
247         any.pid = PTL_PID_ANY;
248
249         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
250
251         if (rxbp->rxbp_shutdown) {
252                 rxb->rxb_idle = 1;
253                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
254                 return;
255         }
256
257         rxb->rxb_refcount = 1;                  /* net's ref */
258         rxb->rxb_posted = 1;                    /* I'm posting */
259         
260         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
261
262         rc = PtlMEAttach(kptllnd_data.kptl_nih,
263                          *kptllnd_tunables.kptl_portal,
264                          any,
265                          LNET_MSG_MATCHBITS,
266                          0, /* all matchbits are valid - ignore none */
267                          PTL_UNLINK,
268                          PTL_INS_AFTER,
269                          &meh);
270         if (rc != PTL_OK) {
271                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
272                        kptllnd_errtype2str(rc), rc);
273                 goto failed;
274         }
275
276         /*
277          * Setup MD
278          */
279         md.start = rxb->rxb_buffer;
280         md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
281         md.threshold = PTL_MD_THRESH_INF;
282         md.options = PTL_MD_OP_PUT |
283                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
284                      PTL_MD_EVENT_START_DISABLE |
285                      PTL_MD_MAX_SIZE |
286                      PTL_MD_LOCAL_ALIGN8;
287         md.user_ptr = &rxb->rxb_eventarg;
288         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
289         md.eq_handle = kptllnd_data.kptl_eqh;
290
291         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
292         if (rc == PTL_OK) {
293                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
294                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
295                         rxb->rxb_mdh = mdh;
296                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
297                 return;
298         }
299         
300         CERROR("PtlMDAttach rxb failed %s(%d)\n",
301                kptllnd_errtype2str(rc), rc);
302         rc = PtlMEUnlink(meh);
303         LASSERT(rc == PTL_OK);
304
305  failed:
306         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
307         rxb->rxb_posted = 0;
308         /* XXX this will just try again immediately */
309         kptllnd_rx_buffer_decref_locked(rxb);
310         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
311 }
312
313 kptl_rx_t *
314 kptllnd_rx_alloc(void)
315 {
316         kptl_rx_t* rx;
317
318         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
319                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
320                 return NULL;
321         }
322
323         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
324         if (rx == NULL) {
325                 CERROR("Failed to allocate rx\n");
326                 return NULL;
327         }
328
329         memset(rx, 0, sizeof(*rx));
330         return rx;
331 }
332
333 void
334 kptllnd_rx_done(kptl_rx_t *rx)
335 {
336         kptl_rx_buffer_t *rxb = rx->rx_rxb;
337         kptl_peer_t      *peer = rx->rx_peer;
338         unsigned long     flags;
339
340         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
341
342         if (rxb != NULL)
343                 kptllnd_rx_buffer_decref(rxb);
344
345         if (peer != NULL) {
346                 /* Update credits (after I've decref-ed the buffer) */
347                 spin_lock_irqsave(&peer->peer_lock, flags);
348
349                 peer->peer_outstanding_credits++;
350                 LASSERT (peer->peer_outstanding_credits +
351                          peer->peer_sent_credits <=
352                          *kptllnd_tunables.kptl_peercredits);
353
354                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
355                        libcfs_id2str(peer->peer_id), peer->peer_credits,
356                        peer->peer_outstanding_credits, peer->peer_sent_credits,
357                        rx);
358
359                 spin_unlock_irqrestore(&peer->peer_lock, flags);
360
361                 /* I might have to send back credits */
362                 kptllnd_peer_check_sends(peer);
363                 kptllnd_peer_decref(peer);
364         }
365
366         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
367 }
368
369 void
370 kptllnd_rx_buffer_callback (ptl_event_t *ev)
371 {
372         kptl_eventarg_t        *eva = ev->md.user_ptr;
373         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
374         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
375         kptl_rx_t              *rx;
376         int                     unlinked;
377         unsigned long           flags;
378
379 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
380         unlinked = ev->unlinked;
381 #else
382         unlinked = ev->type == PTL_EVENT_UNLINK;
383 #endif
384
385         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
386                kptllnd_ptlid2str(ev->initiator), 
387                kptllnd_evtype2str(ev->type), ev->type, rxb, 
388                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
389                unlinked);
390
391         LASSERT (!rxb->rxb_idle);
392         LASSERT (ev->md.start == rxb->rxb_buffer);
393         LASSERT (ev->offset + ev->mlength <= 
394                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
395         LASSERT (ev->type == PTL_EVENT_PUT_END || 
396                  ev->type == PTL_EVENT_UNLINK);
397         LASSERT (ev->type == PTL_EVENT_UNLINK ||
398                  ev->match_bits == LNET_MSG_MATCHBITS);
399
400         if (ev->ni_fail_type != PTL_NI_OK) {
401                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
402                        kptllnd_ptlid2str(ev->initiator),
403                        kptllnd_evtype2str(ev->type), ev->type, rxb,
404                        kptllnd_errtype2str(ev->ni_fail_type),
405                        ev->ni_fail_type, unlinked);
406
407         } else if (ev->type == PTL_EVENT_PUT_END &&
408                    !rxbp->rxbp_shutdown) {
409
410                 /* rxbp_shutdown sampled without locking!  I only treat it as a
411                  * hint since shutdown can start while rx's are queued on
412                  * kptl_sched_rxq. */
413 #if (PTL_MD_LOCAL_ALIGN8 == 0)
414                 /* Portals can't force message alignment - someone sending an
415                  * odd-length message will misalign subsequent messages and
416                  * force the fixup below...  */
417                 if ((ev->mlength & 7) != 0)
418                         CWARN("Message from %s has odd length "LPU64": "
419                               "probable version incompatibility\n",
420                               kptllnd_ptlid2str(ev->initiator),
421                               (__u64)ev->mlength);
422 #endif
423                 rx = kptllnd_rx_alloc();
424                 if (rx == NULL) {
425                         CERROR("Message from %s dropped: ENOMEM",
426                                kptllnd_ptlid2str(ev->initiator));
427                 } else {
428                         if ((ev->offset & 7) == 0) {
429                                 kptllnd_rx_buffer_addref(rxb);
430                                 rx->rx_rxb = rxb;
431                                 rx->rx_nob = ev->mlength;
432                                 rx->rx_msg = (kptl_msg_t *)
433                                              (rxb->rxb_buffer + ev->offset);
434                         } else {
435 #if (PTL_MD_LOCAL_ALIGN8 == 0)
436                                 /* Portals can't force alignment - copy into
437                                  * rx_space (avoiding overflow) to fix */
438                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
439                                 
440                                 rx->rx_rxb = NULL;
441                                 rx->rx_nob = MIN(maxlen, ev->mlength);
442                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
443                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
444                                        rx->rx_nob);
445 #else
446                                 /* Portals should have forced the alignment */
447                                 LBUG();
448 #endif
449                         }
450
451                         rx->rx_initiator = ev->initiator;
452                         rx->rx_treceived = jiffies;
453 #ifdef CRAY_XT3
454                         rx->rx_uid = ev->uid;
455 #endif
456                         /* Queue for attention */
457                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, 
458                                           flags);
459
460                         list_add_tail(&rx->rx_list, 
461                                       &kptllnd_data.kptl_sched_rxq);
462                         wake_up(&kptllnd_data.kptl_sched_waitq);
463
464                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, 
465                                                flags);
466                 }
467         }
468
469         if (unlinked) {
470                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
471
472                 rxb->rxb_posted = 0;
473                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
474                 kptllnd_rx_buffer_decref_locked(rxb);
475
476                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
477         }
478 }
479
480 void
481 kptllnd_nak (kptl_rx_t *rx)
482 {
483         /* Fire-and-forget a stub message that will let the peer know my
484          * protocol magic/version and make her drop/refresh any peer state she
485          * might have with me. */
486         ptl_md_t md = {
487                 .start        = kptllnd_data.kptl_nak_msg,
488                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
489                 .threshold    = 1,
490                 .options      = 0,
491                 .user_ptr     = NULL,
492                 .eq_handle    = PTL_EQ_NONE};
493         ptl_handle_md_t   mdh;
494         int               rc;
495
496         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
497         if (rc != PTL_OK) {
498                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
499                       kptllnd_ptlid2str(rx->rx_initiator),
500                       kptllnd_errtype2str(rc), rc);
501                 return;
502         }
503
504         rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
505                     *kptllnd_tunables.kptl_portal, 0,
506                     LNET_MSG_MATCHBITS, 0, 0);
507
508         if (rc != PTL_OK)
509                 CWARN("Can't NAK %s: put failed %s(%d)\n",
510                       kptllnd_ptlid2str(rx->rx_initiator),
511                       kptllnd_errtype2str(rc), rc);
512 }
513
514 void
515 kptllnd_rx_parse(kptl_rx_t *rx)
516 {
517         kptl_msg_t             *msg = rx->rx_msg;
518         kptl_peer_t            *peer;
519         int                     rc;
520         unsigned long           flags;
521         lnet_process_id_t       srcid;
522
523         LASSERT (rx->rx_peer == NULL);
524
525         if ((rx->rx_nob >= 4 &&
526              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
527               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
528             (rx->rx_nob >= 6 &&
529              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
530                msg->ptlm_version != PTLLND_MSG_VERSION) ||
531               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
532                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
533                 /* NAK incompatible versions
534                  * See other LNDs for how to handle this if/when ptllnd begins
535                  * to allow different versions to co-exist */
536                 CERROR("Bad version: got %04x expected %04x from %s\n",
537                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
538                                msg->ptlm_version : __swab16(msg->ptlm_version)),
539                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
540                 kptllnd_nak(rx);
541                 goto rx_done;
542         }
543         
544         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
545         if (rc != 0) {
546                 CERROR ("Error %d unpacking rx from %s\n",
547                         rc, kptllnd_ptlid2str(rx->rx_initiator));
548                 goto rx_done;
549         }
550
551         srcid.nid = msg->ptlm_srcnid;
552         srcid.pid = msg->ptlm_srcpid;
553
554         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
555                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
556                msg->ptlm_credits, rx, rx->rx_rxb, 
557                jiffies - rx->rx_treceived,
558                cfs_duration_sec(jiffies - rx->rx_treceived));
559
560         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
561                 CERROR("Bad source id %s from %s\n",
562                        libcfs_id2str(srcid),
563                        kptllnd_ptlid2str(rx->rx_initiator));
564                 goto rx_done;
565         }
566
567         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
568                 peer = kptllnd_id2peer(srcid);
569                 if (peer == NULL)
570                         goto rx_done;
571                 
572                 CWARN("NAK from %s (%s)\n",
573                       libcfs_id2str(srcid),
574                       kptllnd_ptlid2str(rx->rx_initiator));
575
576                 rc = -EPROTO;
577                 goto failed;
578         }
579
580         if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
581             msg->ptlm_dstpid != the_lnet.ln_pid) {
582                 CERROR("Bad dstid %s (expected %s) from %s\n",
583                        libcfs_id2str((lnet_process_id_t) {
584                                .nid = msg->ptlm_dstnid,
585                                .pid = msg->ptlm_dstpid}),
586                        libcfs_id2str((lnet_process_id_t) {
587                                .nid = kptllnd_data.kptl_ni->ni_nid,
588                                .pid = the_lnet.ln_pid}),
589                        kptllnd_ptlid2str(rx->rx_initiator));
590                 goto rx_done;
591         }
592
593         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
594                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
595                 if (peer == NULL)
596                         goto rx_done;
597         } else {
598                 peer = kptllnd_id2peer(srcid);
599                 if (peer == NULL) {
600                         CWARN("NAK %s: no connection; peer must reconnect\n",
601                               libcfs_id2str(srcid));
602                         /* NAK to make the peer reconnect */
603                         kptllnd_nak(rx);
604                         goto rx_done;
605                 }
606
607                 /* Ignore anything apart from HELLO while I'm waiting for it and
608                  * any messages for a previous incarnation of the connection */
609                 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
610                     msg->ptlm_dststamp < peer->peer_myincarnation) {
611                         kptllnd_peer_decref(peer);
612                         goto rx_done;
613                 }
614
615                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
616                         CERROR("%s: Unexpected srcstamp "LPX64" "
617                                "("LPX64" expected)\n",
618                                libcfs_id2str(peer->peer_id),
619                                msg->ptlm_srcstamp,
620                                peer->peer_incarnation);
621                         rc = -EPROTO;
622                         goto failed;
623                 }
624
625                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
626                         CERROR("%s: Unexpected dststamp "LPX64" "
627                                "("LPX64" expected)\n",
628                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
629                                peer->peer_myincarnation);
630                         rc = -EPROTO;
631                         goto failed;
632                 }
633         }
634
635         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
636                  msg->ptlm_srcpid == peer->peer_id.pid);
637
638         spin_lock_irqsave(&peer->peer_lock, flags);
639
640         /* Check peer only sends when I've sent her credits */
641         if (peer->peer_sent_credits == 0) {
642                 int  c = peer->peer_credits;
643                 int oc = peer->peer_outstanding_credits;
644                 int sc = peer->peer_sent_credits;
645                 
646                 spin_unlock_irqrestore(&peer->peer_lock, flags);
647
648                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
649                        libcfs_id2str(peer->peer_id), c, sc, oc);
650                 goto failed;
651         }
652         peer->peer_sent_credits--;
653
654         /* No check for credit overflow - the peer may post new
655          * buffers after the startup handshake. */
656         peer->peer_credits += msg->ptlm_credits;
657
658         spin_unlock_irqrestore(&peer->peer_lock, flags);
659
660         /* See if something can go out now that credits have come in */
661         if (msg->ptlm_credits != 0)
662                 kptllnd_peer_check_sends(peer);
663
664         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
665          * peer_outstanding_credits when it completes */
666         rx->rx_peer = peer;
667         kptllnd_peer_alive(peer);
668
669         switch (msg->ptlm_type) {
670         default:
671                 /* already checked by kptllnd_msg_unpack() */
672                 LBUG();
673
674         case PTLLND_MSG_TYPE_HELLO:
675                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
676                 goto rx_done;
677
678         case PTLLND_MSG_TYPE_NOOP:
679                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
680                 goto rx_done;
681
682         case PTLLND_MSG_TYPE_IMMEDIATE:
683                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
684                 rc = lnet_parse(kptllnd_data.kptl_ni,
685                                 &msg->ptlm_u.immediate.kptlim_hdr,
686                                 msg->ptlm_srcnid,
687                                 rx, 0);
688                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
689                         return;
690                 goto failed;
691                 
692         case PTLLND_MSG_TYPE_PUT:
693         case PTLLND_MSG_TYPE_GET:
694                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
695                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
696                         "PUT" : "GET");
697
698                 /* checked in kptllnd_msg_unpack() */
699                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
700                          PTL_RESERVED_MATCHBITS);
701
702                 /* Update last match bits seen */
703                 spin_lock_irqsave(&peer->peer_lock, flags);
704
705                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
706                     rx->rx_peer->peer_last_matchbits_seen)
707                         rx->rx_peer->peer_last_matchbits_seen =
708                                 msg->ptlm_u.rdma.kptlrm_matchbits;
709
710                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
711
712                 rc = lnet_parse(kptllnd_data.kptl_ni,
713                                 &msg->ptlm_u.rdma.kptlrm_hdr,
714                                 msg->ptlm_srcnid,
715                                 rx, 1);
716                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
717                         return;
718                 goto failed;
719          }
720
721  failed:
722         kptllnd_peer_close(peer, rc);
723         if (rx->rx_peer == NULL)                /* drop ref on peer */
724                 kptllnd_peer_decref(peer);      /* unless rx_done will */
725  rx_done:
726         kptllnd_rx_done(rx);
727 }