Whamcloud - gitweb
* bug 11659 fix - finer-grained peerstamps for ptllnd connection
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19  #include "ptllnd.h"
20
21 void
22 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
23 {
24         memset(rxbp, 0, sizeof(*rxbp));
25         spin_lock_init(&rxbp->rxbp_lock);
26         INIT_LIST_HEAD(&rxbp->rxbp_list);
27 }
28
29 void
30 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
31 {
32         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
33
34         LASSERT(rxb->rxb_refcount == 0);
35         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
36         LASSERT(!rxb->rxb_posted);
37         LASSERT(rxb->rxb_idle);
38
39         list_del(&rxb->rxb_list);
40         rxbp->rxbp_count--;
41
42         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
43         LIBCFS_FREE(rxb, sizeof(*rxb));
44 }
45
46 int
47 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
48 {
49         int               bufsize;
50         int               msgs_per_buffer;
51         int               rc;
52         kptl_rx_buffer_t *rxb;
53         char             *buffer;
54         unsigned long     flags;
55
56         bufsize = kptllnd_rx_buffer_size();
57         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
58
59         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
60
61         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
62
63         for (;;) {
64                 if (rxbp->rxbp_shutdown) {
65                         rc = -ESHUTDOWN;
66                         break;
67                 }
68                 
69                 if (rxbp->rxbp_reserved + count <= 
70                     rxbp->rxbp_count * msgs_per_buffer) {
71                         rc = 0;
72                         break;
73                 }
74                 
75                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
76                 
77                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
78                 LIBCFS_ALLOC(buffer, bufsize);
79
80                 if (rxb == NULL || buffer == NULL) {
81                         CERROR("Failed to allocate rx buffer\n");
82
83                         if (rxb != NULL)
84                                 LIBCFS_FREE(rxb, sizeof(*rxb));
85                         if (buffer != NULL)
86                                 LIBCFS_FREE(buffer, bufsize);
87                         
88                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
89                         rc = -ENOMEM;
90                         break;
91                 }
92
93                 memset(rxb, 0, sizeof(*rxb));
94
95                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
96                 rxb->rxb_refcount = 0;
97                 rxb->rxb_pool = rxbp;
98                 rxb->rxb_idle = 0;
99                 rxb->rxb_posted = 0;
100                 rxb->rxb_buffer = buffer;
101                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
102
103                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
104                 
105                 if (rxbp->rxbp_shutdown) {
106                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
107                         
108                         LIBCFS_FREE(rxb, sizeof(*rxb));
109                         LIBCFS_FREE(buffer, bufsize);
110
111                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
112                         rc = -ESHUTDOWN;
113                         break;
114                 }
115                 
116                 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
117                 rxbp->rxbp_count++;
118
119                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
120                 
121                 kptllnd_rx_buffer_post(rxb);
122
123                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
124         }
125
126         if (rc == 0)
127                 rxbp->rxbp_reserved += count;
128
129         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
130
131         return rc;
132 }
133
134 void
135 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
136                                  int count)
137 {
138         unsigned long flags;
139
140         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
141
142         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
143         rxbp->rxbp_reserved -= count;
144
145         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
146 }
147
148 void
149 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
150 {
151         kptl_rx_buffer_t       *rxb;
152         int                     rc;
153         int                     i;
154         unsigned long           flags;
155         struct list_head       *tmp;
156         struct list_head       *nxt;
157         ptl_handle_md_t         mdh;
158
159         /* CAVEAT EMPTOR: I'm racing with everything here!!!  
160          *
161          * Buffers can still be posted after I set rxbp_shutdown because I
162          * can't hold rxbp_lock while I'm posting them.
163          *
164          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
165          * MD handle could become invalid under me.  I am vulnerable to portals
166          * re-using handles (i.e. make the same handle valid again, but for a
167          * different MD) from when the MD is actually unlinked, to when the
168          * event callback tells me it has been unlinked. */
169
170         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
171
172         rxbp->rxbp_shutdown = 1;
173
174         for (i = 9;; i++) {
175                 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
176                         rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
177                 
178                         if (rxb->rxb_idle) {
179                                 spin_unlock_irqrestore(&rxbp->rxbp_lock, 
180                                                        flags);
181                                 kptllnd_rx_buffer_destroy(rxb);
182                                 spin_lock_irqsave(&rxbp->rxbp_lock, 
183                                                   flags);
184                                 continue;
185                         }
186
187                         mdh = rxb->rxb_mdh;
188                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
189                                 continue;
190                         
191                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
192
193                         rc = PtlMDUnlink(mdh);
194
195                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
196                         
197 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
198                         /* callback clears rxb_mdh and drops net's ref
199                          * (which causes repost, but since I set
200                          * shutdown, it will just set the buffer
201                          * idle) */
202 #else
203                         if (rc == PTL_OK) {
204                                 rxb->rxb_posted = 0;
205                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
206                                 kptllnd_rx_buffer_decref_locked(rxb);
207                         }
208 #endif
209                 }
210
211                 if (list_empty(&rxbp->rxbp_list))
212                         break;
213
214                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
215
216                 /* Wait a bit for references to be dropped */
217                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
218                        "Waiting for %d Busy RX Buffers\n",
219                        rxbp->rxbp_count);
220
221                 cfs_pause(cfs_time_seconds(1));
222
223                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
224         }
225
226         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
227 }
228
229 void
230 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
231 {
232         int                     rc;
233         ptl_md_t                md;
234         ptl_handle_me_t         meh;
235         ptl_handle_md_t         mdh;
236         ptl_process_id_t        any;
237         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
238         unsigned long           flags;
239
240         LASSERT (!in_interrupt());
241         LASSERT (rxb->rxb_refcount == 0);
242         LASSERT (!rxb->rxb_idle);
243         LASSERT (!rxb->rxb_posted);
244         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
245
246         any.nid = PTL_NID_ANY;
247         any.pid = PTL_PID_ANY;
248
249         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
250
251         if (rxbp->rxbp_shutdown) {
252                 rxb->rxb_idle = 1;
253                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
254                 return;
255         }
256
257         rxb->rxb_refcount = 1;                  /* net's ref */
258         rxb->rxb_posted = 1;                    /* I'm posting */
259         
260         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
261
262         rc = PtlMEAttach(kptllnd_data.kptl_nih,
263                          *kptllnd_tunables.kptl_portal,
264                          any,
265                          LNET_MSG_MATCHBITS,
266                          0, /* all matchbits are valid - ignore none */
267                          PTL_UNLINK,
268                          PTL_INS_AFTER,
269                          &meh);
270         if (rc != PTL_OK) {
271                 CERROR("PtlMeAttach rxb failed %d\n", rc);
272                 goto failed;
273         }
274
275         /*
276          * Setup MD
277          */
278         md.start = rxb->rxb_buffer;
279         md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
280         md.threshold = PTL_MD_THRESH_INF;
281         md.options = PTL_MD_OP_PUT |
282                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
283                      PTL_MD_EVENT_START_DISABLE |
284                      PTL_MD_MAX_SIZE |
285                      PTL_MD_LOCAL_ALIGN8;
286         md.user_ptr = &rxb->rxb_eventarg;
287         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
288         md.eq_handle = kptllnd_data.kptl_eqh;
289
290         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
291         if (rc == PTL_OK) {
292                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
293                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
294                         rxb->rxb_mdh = mdh;
295                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
296                 return;
297         }
298         
299         CERROR("PtlMDAttach rxb failed %d\n", rc);
300         rc = PtlMEUnlink(meh);
301         LASSERT(rc == PTL_OK);
302
303  failed:
304         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
305         rxb->rxb_posted = 0;
306         /* XXX this will just try again immediately */
307         kptllnd_rx_buffer_decref_locked(rxb);
308         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
309 }
310
311 kptl_rx_t *
312 kptllnd_rx_alloc(void)
313 {
314         kptl_rx_t* rx;
315
316         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
317                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
318                 return NULL;
319         }
320
321         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
322         if (rx == NULL) {
323                 CERROR("Failed to allocate rx\n");
324                 return NULL;
325         }
326
327         memset(rx, 0, sizeof(*rx));
328         return rx;
329 }
330
331 void
332 kptllnd_rx_done(kptl_rx_t *rx)
333 {
334         kptl_rx_buffer_t *rxb = rx->rx_rxb;
335         kptl_peer_t      *peer = rx->rx_peer;
336         unsigned long     flags;
337
338         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
339
340         if (rxb != NULL)
341                 kptllnd_rx_buffer_decref(rxb);
342
343         if (peer != NULL) {
344                 /* Update credits (after I've decref-ed the buffer) */
345                 spin_lock_irqsave(&peer->peer_lock, flags);
346
347                 peer->peer_active_rxs--;
348                 LASSERT (peer->peer_active_rxs >= 0);
349
350                 peer->peer_outstanding_credits++;
351                 LASSERT (peer->peer_outstanding_credits <=
352                          *kptllnd_tunables.kptl_peercredits);
353
354                 CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
355                        libcfs_id2str(peer->peer_id),
356                        peer->peer_credits, peer->peer_outstanding_credits, rx);
357
358                 spin_unlock_irqrestore(&peer->peer_lock, flags);
359
360                 /* I might have to send back credits */
361                 kptllnd_peer_check_sends(peer);
362                 kptllnd_peer_decref(peer);
363         }
364
365         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
366 }
367
368 void
369 kptllnd_rx_buffer_callback (ptl_event_t *ev)
370 {
371         kptl_eventarg_t        *eva = ev->md.user_ptr;
372         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
373         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
374         kptl_rx_t              *rx;
375         int                     unlinked;
376         unsigned long           flags;
377
378 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
379         unlinked = ev->unlinked;
380 #else
381         unlinked = ev->type == PTL_EVENT_UNLINK;
382 #endif
383
384         CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
385                kptllnd_evtype2str(ev->type), ev->type, rxb, 
386                kptllnd_ptlid2str(ev->initiator), 
387                unlinked, ev->ni_fail_type);
388
389         LASSERT (!rxb->rxb_idle);
390         LASSERT (ev->md.start == rxb->rxb_buffer);
391         LASSERT (ev->offset + ev->mlength <= 
392                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
393         LASSERT (ev->type == PTL_EVENT_PUT_END || 
394                  ev->type == PTL_EVENT_UNLINK);
395         LASSERT (ev->type == PTL_EVENT_UNLINK ||
396                  ev->match_bits == LNET_MSG_MATCHBITS);
397
398         if (ev->ni_fail_type != PTL_NI_OK)
399                 CERROR("event type %d, status %d from %s\n",
400                        ev->type, ev->ni_fail_type,
401                        kptllnd_ptlid2str(ev->initiator));
402
403         if (ev->type == PTL_EVENT_PUT_END &&
404             ev->ni_fail_type == PTL_NI_OK &&
405             !rxbp->rxbp_shutdown) {
406
407                 /* rxbp_shutdown sampled without locking!  I only treat it as a
408                  * hint since shutdown can start while rx's are queued on
409                  * kptl_sched_rxq. */
410 #if (PTL_MD_LOCAL_ALIGN8 == 0)
411                 /* Portals can't force message alignment - someone sending an
412                  * odd-length message will misalign subsequent messages and
413                  * force the fixup below...  */
414                 if ((ev->mlength & 7) != 0)
415                         CWARN("Message from %s has odd length %d: "
416                               "probable version incompatibility\n",
417                               kptllnd_ptlid2str(ev->initiator),
418                               ev->mlength);
419 #endif
420                 rx = kptllnd_rx_alloc();
421                 if (rx == NULL) {
422                         CERROR("Message from %s dropped: ENOMEM",
423                                kptllnd_ptlid2str(ev->initiator));
424                 } else {
425                         if ((ev->offset & 7) == 0) {
426                                 kptllnd_rx_buffer_addref(rxb);
427                                 rx->rx_rxb = rxb;
428                                 rx->rx_nob = ev->mlength;
429                                 rx->rx_msg = (kptl_msg_t *)
430                                              (rxb->rxb_buffer + ev->offset);
431                         } else {
432 #if (PTL_MD_LOCAL_ALIGN8 == 0)
433                                 /* Portals can't force alignment - copy into
434                                  * rx_space (avoiding overflow) to fix */
435                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
436                                 
437                                 rx->rx_rxb = NULL;
438                                 rx->rx_nob = MIN(maxlen, ev->mlength);
439                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
440                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
441                                        rx->rx_nob);
442 #else
443                                 /* Portals should have forced the alignment */
444                                 LBUG();
445 #endif
446                         }
447
448                         rx->rx_initiator = ev->initiator;
449 #ifdef CRAY_XT3
450                         rx->rx_uid = ev->uid;
451 #endif
452                         /* Queue for attention */
453                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, 
454                                           flags);
455
456                         list_add_tail(&rx->rx_list, 
457                                       &kptllnd_data.kptl_sched_rxq);
458                         wake_up(&kptllnd_data.kptl_sched_waitq);
459
460                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, 
461                                                flags);
462                 }
463         }
464
465         if (unlinked) {
466                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
467
468                 rxb->rxb_posted = 0;
469                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
470                 kptllnd_rx_buffer_decref_locked(rxb);
471
472                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
473         }
474 }
475
476 void
477 kptllnd_nak (kptl_rx_t *rx)
478 {
479         /* Fire-and-forget a stub message that will let the peer know my
480          * protocol magic/version and make her drop/refresh any peer state she
481          * might have with me. */
482         ptl_md_t md = {
483                 .start        = kptllnd_data.kptl_nak_msg,
484                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
485                 .threshold    = 1,
486                 .options      = 0,
487                 .user_ptr     = NULL,
488                 .eq_handle    = PTL_EQ_NONE};
489         ptl_handle_md_t   mdh;
490         int               rc;
491
492         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
493         if (rc != PTL_OK) {
494                 CWARN("Can't NAK %s: bind failed %d\n",
495                       kptllnd_ptlid2str(rx->rx_initiator), rc);
496                 return;
497         }
498
499         rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
500                     *kptllnd_tunables.kptl_portal, 0,
501                     LNET_MSG_MATCHBITS, 0, 0);
502
503         if (rc != PTL_OK)
504                 CWARN("Can't NAK %s: put failed %d\n",
505                       kptllnd_ptlid2str(rx->rx_initiator), rc);
506 }
507
508 void
509 kptllnd_rx_parse(kptl_rx_t *rx)
510 {
511         kptl_msg_t             *msg = rx->rx_msg;
512         kptl_peer_t            *peer;
513         int                     rc;
514         int                     credits;
515         unsigned long           flags;
516         lnet_process_id_t       srcid;
517
518         LASSERT (rx->rx_peer == NULL);
519
520         if ((rx->rx_nob >= 4 &&
521              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
522               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
523             (rx->rx_nob >= 6 &&
524              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
525                msg->ptlm_version != PTLLND_MSG_VERSION) ||
526               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
527                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
528                 /* NAK incompatible versions
529                  * See other LNDs for how to handle this if/when ptllnd begins
530                  * to allow different versions to co-exist */
531                 CERROR("Bad version: got %04x expected %04x from %s\n",
532                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
533                                msg->ptlm_version : __swab16(msg->ptlm_version)),
534                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
535                 kptllnd_nak(rx);
536                 goto rx_done;
537         }
538         
539         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
540         if (rc != 0) {
541                 CERROR ("Error %d unpacking rx from %s\n",
542                         rc, kptllnd_ptlid2str(rx->rx_initiator));
543                 goto rx_done;
544         }
545
546         srcid.nid = msg->ptlm_srcnid;
547         srcid.pid = msg->ptlm_srcpid;
548
549         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
550                kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
551
552         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
553                 CERROR("Bad source id %s from %s\n",
554                        libcfs_id2str(srcid),
555                        kptllnd_ptlid2str(rx->rx_initiator));
556                 goto rx_done;
557         }
558
559         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
560                 peer = kptllnd_id2peer(srcid);
561                 if (peer == NULL)
562                         goto rx_done;
563                 
564                 CWARN("NAK from %s (%s)\n",
565                       libcfs_id2str(srcid),
566                       kptllnd_ptlid2str(rx->rx_initiator));
567
568                 rc = -EPROTO;
569                 goto failed;
570         }
571
572         if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
573             msg->ptlm_dstpid != the_lnet.ln_pid) {
574                 CERROR("Bad dstid %s (expected %s) from %s\n",
575                        libcfs_id2str((lnet_process_id_t) {
576                                .nid = msg->ptlm_dstnid,
577                                .pid = msg->ptlm_dstpid}),
578                        libcfs_id2str((lnet_process_id_t) {
579                                .nid = kptllnd_data.kptl_ni->ni_nid,
580                                .pid = the_lnet.ln_pid}),
581                        kptllnd_ptlid2str(rx->rx_initiator));
582                 goto rx_done;
583         }
584
585         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
586                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
587                 if (peer == NULL)
588                         goto rx_done;
589         } else {
590                 peer = kptllnd_id2peer(srcid);
591                 if (peer == NULL) {
592                         CWARN("NAK %s: no connection; peer must reconnect\n",
593                               libcfs_id2str(srcid));
594                         /* NAK to make the peer reconnect */
595                         kptllnd_nak(rx);
596                         goto rx_done;
597                 }
598
599                 /* Ignore anything apart from HELLO while I'm waiting for it and
600                  * any messages for a previous incarnation of the connection */
601                 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
602                     msg->ptlm_dststamp < peer->peer_myincarnation) {
603                         kptllnd_peer_decref(peer);
604                         goto rx_done;
605                 }
606
607                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
608                         CERROR("%s: Unexpected srcstamp "LPX64" "
609                                "("LPX64" expected)\n",
610                                libcfs_id2str(peer->peer_id),
611                                msg->ptlm_srcstamp,
612                                peer->peer_incarnation);
613                         rc = -EPROTO;
614                         goto failed;
615                 }
616
617                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
618                         CERROR("%s: Unexpected dststamp "LPX64" "
619                                "("LPX64" expected)\n",
620                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
621                                peer->peer_myincarnation);
622                         rc = -EPROTO;
623                         goto failed;
624                 }
625         }
626
627         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
628                  msg->ptlm_srcpid == peer->peer_id.pid);
629
630         spin_lock_irqsave(&peer->peer_lock, flags);
631
632         if (peer->peer_active_rxs == *kptllnd_tunables.kptl_peercredits) {
633                 spin_unlock_irqrestore(&peer->peer_lock, flags);
634                         
635                 CERROR("Message overflow from %s: handling %d already\n",
636                        libcfs_id2str(peer->peer_id),
637                        *kptllnd_tunables.kptl_peercredits);
638                 rc = -EPROTO;
639                 goto failed;
640         }
641         
642         if (msg->ptlm_credits != 0 &&
643             peer->peer_credits + msg->ptlm_credits >
644             *kptllnd_tunables.kptl_peercredits) {
645                 credits = peer->peer_credits;
646                 spin_unlock_irqrestore(&peer->peer_lock, flags);
647
648                 CERROR("Credit overflow from %s: %d + %d > %d\n",
649                        libcfs_id2str(peer->peer_id),
650                        credits, msg->ptlm_credits,
651                        *kptllnd_tunables.kptl_peercredits);
652                 rc = -EPROTO;
653                 goto failed;
654         }
655
656         /* ptllnd-level protocol correct: account credits */
657         peer->peer_credits += msg->ptlm_credits;
658         peer->peer_active_rxs++;
659
660         spin_unlock_irqrestore(&peer->peer_lock, flags);
661
662         /* See if something can go out now that credits have come in */
663         if (msg->ptlm_credits != 0)
664                 kptllnd_peer_check_sends(peer);
665
666         rx->rx_peer = peer;                /* do buffer accounting on rxdone */
667         kptllnd_peer_alive(peer);
668
669         switch (msg->ptlm_type) {
670         default:
671                 /* already checked by kptllnd_msg_unpack() */
672                 LBUG();
673
674         case PTLLND_MSG_TYPE_HELLO:
675                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
676                 goto rx_done;
677
678         case PTLLND_MSG_TYPE_NOOP:
679                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
680                 goto rx_done;
681
682         case PTLLND_MSG_TYPE_IMMEDIATE:
683                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
684                 rc = lnet_parse(kptllnd_data.kptl_ni,
685                                 &msg->ptlm_u.immediate.kptlim_hdr,
686                                 msg->ptlm_srcnid,
687                                 rx, 0);
688                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
689                         return;
690                 goto failed;
691                 
692         case PTLLND_MSG_TYPE_PUT:
693         case PTLLND_MSG_TYPE_GET:
694                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
695                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
696                         "PUT" : "GET");
697
698                 /* checked in kptllnd_msg_unpack() */
699                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
700                          PTL_RESERVED_MATCHBITS);
701
702                 /* Update last match bits seen */
703                 spin_lock_irqsave(&peer->peer_lock, flags);
704
705                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
706                     rx->rx_peer->peer_last_matchbits_seen)
707                         rx->rx_peer->peer_last_matchbits_seen =
708                                 msg->ptlm_u.rdma.kptlrm_matchbits;
709
710                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
711
712                 rc = lnet_parse(kptllnd_data.kptl_ni,
713                                 &msg->ptlm_u.rdma.kptlrm_hdr,
714                                 msg->ptlm_srcnid,
715                                 rx, 1);
716                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
717                         return;
718                 goto failed;
719          }
720
721  failed:
722         kptllnd_peer_close(peer, rc);
723         if (rx->rx_peer == NULL)                /* drop ref on peer */
724                 kptllnd_peer_decref(peer);      /* unless rx_done will */
725  rx_done:
726         kptllnd_rx_done(rx);
727 }