Whamcloud - gitweb
b=15332,i=liang:
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_rx_buf.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41  #include "ptllnd.h"
42
43 void
44 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
45 {
46         memset(rxbp, 0, sizeof(*rxbp));
47         spin_lock_init(&rxbp->rxbp_lock);
48         INIT_LIST_HEAD(&rxbp->rxbp_list);
49 }
50
51 void
52 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
53 {
54         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
55
56         LASSERT(rxb->rxb_refcount == 0);
57         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
58         LASSERT(!rxb->rxb_posted);
59         LASSERT(rxb->rxb_idle);
60
61         list_del(&rxb->rxb_list);
62         rxbp->rxbp_count--;
63
64         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
65         LIBCFS_FREE(rxb, sizeof(*rxb));
66 }
67
68 int
69 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
70 {
71         int               bufsize;
72         int               msgs_per_buffer;
73         int               rc;
74         kptl_rx_buffer_t *rxb;
75         char             *buffer;
76         unsigned long     flags;
77
78         bufsize = kptllnd_rx_buffer_size();
79         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
80
81         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
82
83         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
84
85         for (;;) {
86                 if (rxbp->rxbp_shutdown) {
87                         rc = -ESHUTDOWN;
88                         break;
89                 }
90                 
91                 if (rxbp->rxbp_reserved + count <= 
92                     rxbp->rxbp_count * msgs_per_buffer) {
93                         rc = 0;
94                         break;
95                 }
96                 
97                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
98                 
99                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
100                 LIBCFS_ALLOC(buffer, bufsize);
101
102                 if (rxb == NULL || buffer == NULL) {
103                         CERROR("Failed to allocate rx buffer\n");
104
105                         if (rxb != NULL)
106                                 LIBCFS_FREE(rxb, sizeof(*rxb));
107                         if (buffer != NULL)
108                                 LIBCFS_FREE(buffer, bufsize);
109                         
110                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
111                         rc = -ENOMEM;
112                         break;
113                 }
114
115                 memset(rxb, 0, sizeof(*rxb));
116
117                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
118                 rxb->rxb_refcount = 0;
119                 rxb->rxb_pool = rxbp;
120                 rxb->rxb_idle = 0;
121                 rxb->rxb_posted = 0;
122                 rxb->rxb_buffer = buffer;
123                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
124
125                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
126                 
127                 if (rxbp->rxbp_shutdown) {
128                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
129                         
130                         LIBCFS_FREE(rxb, sizeof(*rxb));
131                         LIBCFS_FREE(buffer, bufsize);
132
133                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
134                         rc = -ESHUTDOWN;
135                         break;
136                 }
137                 
138                 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
139                 rxbp->rxbp_count++;
140
141                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
142                 
143                 kptllnd_rx_buffer_post(rxb);
144
145                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
146         }
147
148         if (rc == 0)
149                 rxbp->rxbp_reserved += count;
150
151         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
152
153         return rc;
154 }
155
156 void
157 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
158                                  int count)
159 {
160         unsigned long flags;
161
162         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
163
164         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
165         rxbp->rxbp_reserved -= count;
166
167         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
168 }
169
170 void
171 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
172 {
173         kptl_rx_buffer_t       *rxb;
174         int                     rc;
175         int                     i;
176         unsigned long           flags;
177         struct list_head       *tmp;
178         struct list_head       *nxt;
179         ptl_handle_md_t         mdh;
180
181         /* CAVEAT EMPTOR: I'm racing with everything here!!!  
182          *
183          * Buffers can still be posted after I set rxbp_shutdown because I
184          * can't hold rxbp_lock while I'm posting them.
185          *
186          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
187          * MD handle could become invalid under me.  I am vulnerable to portals
188          * re-using handles (i.e. make the same handle valid again, but for a
189          * different MD) from when the MD is actually unlinked, to when the
190          * event callback tells me it has been unlinked. */
191
192         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
193
194         rxbp->rxbp_shutdown = 1;
195
196         for (i = 9;; i++) {
197                 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
198                         rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
199                 
200                         if (rxb->rxb_idle) {
201                                 spin_unlock_irqrestore(&rxbp->rxbp_lock, 
202                                                        flags);
203                                 kptllnd_rx_buffer_destroy(rxb);
204                                 spin_lock_irqsave(&rxbp->rxbp_lock, 
205                                                   flags);
206                                 continue;
207                         }
208
209                         mdh = rxb->rxb_mdh;
210                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
211                                 continue;
212                         
213                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
214
215                         rc = PtlMDUnlink(mdh);
216
217                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
218                         
219 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
220                         /* callback clears rxb_mdh and drops net's ref
221                          * (which causes repost, but since I set
222                          * shutdown, it will just set the buffer
223                          * idle) */
224 #else
225                         if (rc == PTL_OK) {
226                                 rxb->rxb_posted = 0;
227                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
228                                 kptllnd_rx_buffer_decref_locked(rxb);
229                         }
230 #endif
231                 }
232
233                 if (list_empty(&rxbp->rxbp_list))
234                         break;
235
236                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
237
238                 /* Wait a bit for references to be dropped */
239                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
240                        "Waiting for %d Busy RX Buffers\n",
241                        rxbp->rxbp_count);
242
243                 cfs_pause(cfs_time_seconds(1));
244
245                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
246         }
247
248         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
249 }
250
251 void
252 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
253 {
254         int                     rc;
255         ptl_md_t                md;
256         ptl_handle_me_t         meh;
257         ptl_handle_md_t         mdh;
258         ptl_process_id_t        any;
259         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
260         unsigned long           flags;
261
262         LASSERT (!in_interrupt());
263         LASSERT (rxb->rxb_refcount == 0);
264         LASSERT (!rxb->rxb_idle);
265         LASSERT (!rxb->rxb_posted);
266         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
267
268         any.nid = PTL_NID_ANY;
269         any.pid = PTL_PID_ANY;
270
271         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
272
273         if (rxbp->rxbp_shutdown) {
274                 rxb->rxb_idle = 1;
275                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
276                 return;
277         }
278
279         rxb->rxb_refcount = 1;                  /* net's ref */
280         rxb->rxb_posted = 1;                    /* I'm posting */
281         
282         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
283
284         rc = PtlMEAttach(kptllnd_data.kptl_nih,
285                          *kptllnd_tunables.kptl_portal,
286                          any,
287                          LNET_MSG_MATCHBITS,
288                          0, /* all matchbits are valid - ignore none */
289                          PTL_UNLINK,
290                          PTL_INS_AFTER,
291                          &meh);
292         if (rc != PTL_OK) {
293                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
294                        kptllnd_errtype2str(rc), rc);
295                 goto failed;
296         }
297
298         /*
299          * Setup MD
300          */
301         md.start = rxb->rxb_buffer;
302         md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
303         md.threshold = PTL_MD_THRESH_INF;
304         md.options = PTL_MD_OP_PUT |
305                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
306                      PTL_MD_EVENT_START_DISABLE |
307                      PTL_MD_MAX_SIZE |
308                      PTL_MD_LOCAL_ALIGN8;
309         md.user_ptr = &rxb->rxb_eventarg;
310         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
311         md.eq_handle = kptllnd_data.kptl_eqh;
312
313         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
314         if (rc == PTL_OK) {
315                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
316                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
317                         rxb->rxb_mdh = mdh;
318                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
319                 return;
320         }
321         
322         CERROR("PtlMDAttach rxb failed %s(%d)\n",
323                kptllnd_errtype2str(rc), rc);
324         rc = PtlMEUnlink(meh);
325         LASSERT(rc == PTL_OK);
326
327  failed:
328         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
329         rxb->rxb_posted = 0;
330         /* XXX this will just try again immediately */
331         kptllnd_rx_buffer_decref_locked(rxb);
332         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
333 }
334
335 kptl_rx_t *
336 kptllnd_rx_alloc(void)
337 {
338         kptl_rx_t* rx;
339
340         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
341                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
342                 return NULL;
343         }
344
345         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
346         if (rx == NULL) {
347                 CERROR("Failed to allocate rx\n");
348                 return NULL;
349         }
350
351         memset(rx, 0, sizeof(*rx));
352         return rx;
353 }
354
355 void
356 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
357 {
358         kptl_rx_buffer_t *rxb = rx->rx_rxb;
359         kptl_peer_t      *peer = rx->rx_peer;
360         unsigned long     flags;
361
362         LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
363                  post_credit == PTLLND_POSTRX_PEER_CREDIT);
364
365         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
366
367         if (rxb != NULL)
368                 kptllnd_rx_buffer_decref(rxb);
369
370         if (peer != NULL) {
371                 /* Update credits (after I've decref-ed the buffer) */
372                 spin_lock_irqsave(&peer->peer_lock, flags);
373
374                 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
375                         peer->peer_outstanding_credits++;
376
377                 LASSERT (peer->peer_outstanding_credits +
378                          peer->peer_sent_credits <=
379                          *kptllnd_tunables.kptl_peertxcredits);
380
381                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
382                        libcfs_id2str(peer->peer_id), peer->peer_credits,
383                        peer->peer_outstanding_credits, peer->peer_sent_credits,
384                        rx);
385
386                 spin_unlock_irqrestore(&peer->peer_lock, flags);
387
388                 /* I might have to send back credits */
389                 kptllnd_peer_check_sends(peer);
390                 kptllnd_peer_decref(peer);
391         }
392
393         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
394 }
395
396 void
397 kptllnd_rx_buffer_callback (ptl_event_t *ev)
398 {
399         kptl_eventarg_t        *eva = ev->md.user_ptr;
400         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
401         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
402         kptl_rx_t              *rx;
403         int                     unlinked;
404         unsigned long           flags;
405
406 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
407         unlinked = ev->unlinked;
408 #else
409         unlinked = ev->type == PTL_EVENT_UNLINK;
410 #endif
411
412         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
413                kptllnd_ptlid2str(ev->initiator), 
414                kptllnd_evtype2str(ev->type), ev->type, rxb, 
415                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
416                unlinked);
417
418         LASSERT (!rxb->rxb_idle);
419         LASSERT (ev->md.start == rxb->rxb_buffer);
420         LASSERT (ev->offset + ev->mlength <= 
421                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
422         LASSERT (ev->type == PTL_EVENT_PUT_END || 
423                  ev->type == PTL_EVENT_UNLINK);
424         LASSERT (ev->type == PTL_EVENT_UNLINK ||
425                  ev->match_bits == LNET_MSG_MATCHBITS);
426
427         if (ev->ni_fail_type != PTL_NI_OK) {
428                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
429                        kptllnd_ptlid2str(ev->initiator),
430                        kptllnd_evtype2str(ev->type), ev->type, rxb,
431                        kptllnd_errtype2str(ev->ni_fail_type),
432                        ev->ni_fail_type, unlinked);
433                 kptllnd_schedule_ptltrace_dump();
434         } else if (ev->type == PTL_EVENT_PUT_END &&
435                    !rxbp->rxbp_shutdown) {
436
437                 /* rxbp_shutdown sampled without locking!  I only treat it as a
438                  * hint since shutdown can start while rx's are queued on
439                  * kptl_sched_rxq. */
440 #if (PTL_MD_LOCAL_ALIGN8 == 0)
441                 /* Portals can't force message alignment - someone sending an
442                  * odd-length message will misalign subsequent messages and
443                  * force the fixup below...  */
444                 if ((ev->mlength & 7) != 0)
445                         CWARN("Message from %s has odd length "LPU64": "
446                               "probable version incompatibility\n",
447                               kptllnd_ptlid2str(ev->initiator),
448                               (__u64)ev->mlength);
449 #endif
450                 rx = kptllnd_rx_alloc();
451                 if (rx == NULL) {
452                         CERROR("Message from %s dropped: ENOMEM",
453                                kptllnd_ptlid2str(ev->initiator));
454                 } else {
455                         if ((ev->offset & 7) == 0) {
456                                 kptllnd_rx_buffer_addref(rxb);
457                                 rx->rx_rxb = rxb;
458                                 rx->rx_nob = ev->mlength;
459                                 rx->rx_msg = (kptl_msg_t *)
460                                              (rxb->rxb_buffer + ev->offset);
461                         } else {
462 #if (PTL_MD_LOCAL_ALIGN8 == 0)
463                                 /* Portals can't force alignment - copy into
464                                  * rx_space (avoiding overflow) to fix */
465                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
466                                 
467                                 rx->rx_rxb = NULL;
468                                 rx->rx_nob = MIN(maxlen, ev->mlength);
469                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
470                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
471                                        rx->rx_nob);
472 #else
473                                 /* Portals should have forced the alignment */
474                                 LBUG();
475 #endif
476                         }
477
478                         rx->rx_initiator = ev->initiator;
479                         rx->rx_treceived = jiffies;
480 #ifdef CRAY_XT3
481                         rx->rx_uid = ev->uid;
482 #endif
483                         /* Queue for attention */
484                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, 
485                                           flags);
486
487                         list_add_tail(&rx->rx_list, 
488                                       &kptllnd_data.kptl_sched_rxq);
489                         wake_up(&kptllnd_data.kptl_sched_waitq);
490
491                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, 
492                                                flags);
493                 }
494         }
495
496         if (unlinked) {
497                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
498
499                 rxb->rxb_posted = 0;
500                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
501                 kptllnd_rx_buffer_decref_locked(rxb);
502
503                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
504         }
505 }
506
507 void
508 kptllnd_nak (kptl_rx_t *rx)
509 {
510         /* Fire-and-forget a stub message that will let the peer know my
511          * protocol magic/version and make her drop/refresh any peer state she
512          * might have with me. */
513         ptl_md_t md = {
514                 .start        = kptllnd_data.kptl_nak_msg,
515                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
516                 .threshold    = 1,
517                 .options      = 0,
518                 .user_ptr     = NULL,
519                 .eq_handle    = PTL_EQ_NONE};
520         ptl_handle_md_t   mdh;
521         int               rc;
522
523         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
524         if (rc != PTL_OK) {
525                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
526                       kptllnd_ptlid2str(rx->rx_initiator),
527                       kptllnd_errtype2str(rc), rc);
528                 return;
529         }
530
531         rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
532                     *kptllnd_tunables.kptl_portal, 0,
533                     LNET_MSG_MATCHBITS, 0, 0);
534
535         if (rc != PTL_OK) {
536                 CWARN("Can't NAK %s: put failed %s(%d)\n",
537                       kptllnd_ptlid2str(rx->rx_initiator),
538                       kptllnd_errtype2str(rc), rc);
539                 kptllnd_schedule_ptltrace_dump();
540         }
541 }
542
543 void
544 kptllnd_rx_parse(kptl_rx_t *rx)
545 {
546         kptl_msg_t             *msg = rx->rx_msg;
547         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
548         kptl_peer_t            *peer;
549         int                     rc;
550         unsigned long           flags;
551         lnet_process_id_t       srcid;
552
553         LASSERT (rx->rx_peer == NULL);
554
555         if ((rx->rx_nob >= 4 &&
556              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
557               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
558             (rx->rx_nob >= 6 &&
559              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
560                msg->ptlm_version != PTLLND_MSG_VERSION) ||
561               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
562                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
563                 /* NAK incompatible versions
564                  * See other LNDs for how to handle this if/when ptllnd begins
565                  * to allow different versions to co-exist */
566                 CERROR("Bad version: got %04x expected %04x from %s\n",
567                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
568                                msg->ptlm_version : __swab16(msg->ptlm_version)),
569                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
570                 kptllnd_nak(rx);
571                 goto rx_done;
572         }
573         
574         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
575         if (rc != 0) {
576                 CERROR ("Error %d unpacking rx from %s\n",
577                         rc, kptllnd_ptlid2str(rx->rx_initiator));
578                 goto rx_done;
579         }
580
581         srcid.nid = msg->ptlm_srcnid;
582         srcid.pid = msg->ptlm_srcpid;
583
584         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
585                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
586                msg->ptlm_credits, rx, rx->rx_rxb, 
587                jiffies - rx->rx_treceived,
588                cfs_duration_sec(jiffies - rx->rx_treceived));
589
590         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
591                 CERROR("Bad source id %s from %s\n",
592                        libcfs_id2str(srcid),
593                        kptllnd_ptlid2str(rx->rx_initiator));
594                 goto rx_done;
595         }
596
597         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
598                 peer = kptllnd_id2peer(srcid);
599                 if (peer == NULL)
600                         goto rx_done;
601                 
602                 CWARN("NAK from %s (%s)\n",
603                       libcfs_id2str(srcid),
604                       kptllnd_ptlid2str(rx->rx_initiator));
605
606                 rc = -EPROTO;
607                 goto failed;
608         }
609
610         if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
611             msg->ptlm_dstpid != the_lnet.ln_pid) {
612                 CERROR("Bad dstid %s (expected %s) from %s\n",
613                        libcfs_id2str((lnet_process_id_t) {
614                                .nid = msg->ptlm_dstnid,
615                                .pid = msg->ptlm_dstpid}),
616                        libcfs_id2str((lnet_process_id_t) {
617                                .nid = kptllnd_data.kptl_ni->ni_nid,
618                                .pid = the_lnet.ln_pid}),
619                        kptllnd_ptlid2str(rx->rx_initiator));
620                 goto rx_done;
621         }
622
623         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
624                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
625                 if (peer == NULL)
626                         goto rx_done;
627         } else {
628                 peer = kptllnd_id2peer(srcid);
629                 if (peer == NULL) {
630                         CWARN("NAK %s: no connection; peer must reconnect\n",
631                               libcfs_id2str(srcid));
632                         /* NAK to make the peer reconnect */
633                         kptllnd_nak(rx);
634                         goto rx_done;
635                 }
636
637                 /* Ignore anything apart from HELLO while I'm waiting for it and
638                  * any messages for a previous incarnation of the connection */
639                 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
640                     msg->ptlm_dststamp < peer->peer_myincarnation) {
641                         kptllnd_peer_decref(peer);
642                         goto rx_done;
643                 }
644
645                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
646                         CERROR("%s: Unexpected srcstamp "LPX64" "
647                                "("LPX64" expected)\n",
648                                libcfs_id2str(peer->peer_id),
649                                msg->ptlm_srcstamp,
650                                peer->peer_incarnation);
651                         rc = -EPROTO;
652                         goto failed;
653                 }
654
655                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
656                         CERROR("%s: Unexpected dststamp "LPX64" "
657                                "("LPX64" expected)\n",
658                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
659                                peer->peer_myincarnation);
660                         rc = -EPROTO;
661                         goto failed;
662                 }
663         }
664
665         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
666                  msg->ptlm_srcpid == peer->peer_id.pid);
667
668         spin_lock_irqsave(&peer->peer_lock, flags);
669
670         /* Check peer only sends when I've sent her credits */
671         if (peer->peer_sent_credits == 0) {
672                 int  c = peer->peer_credits;
673                 int oc = peer->peer_outstanding_credits;
674                 int sc = peer->peer_sent_credits;
675
676                 spin_unlock_irqrestore(&peer->peer_lock, flags);
677
678                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
679                        libcfs_id2str(peer->peer_id), c, sc, oc);
680                 goto failed;
681         }
682         peer->peer_sent_credits--;
683
684         /* No check for credit overflow - the peer may post new
685          * buffers after the startup handshake. */
686         peer->peer_credits += msg->ptlm_credits;
687
688         /* This ensures the credit taken by NOOP can be returned */
689         if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
690                 peer->peer_outstanding_credits++;
691                 post_credit = PTLLND_POSTRX_NO_CREDIT;
692         }
693
694         spin_unlock_irqrestore(&peer->peer_lock, flags);
695
696         /* See if something can go out now that credits have come in */
697         if (msg->ptlm_credits != 0)
698                 kptllnd_peer_check_sends(peer);
699
700         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
701          * peer_outstanding_credits when it completes */
702         rx->rx_peer = peer;
703         kptllnd_peer_alive(peer);
704
705         switch (msg->ptlm_type) {
706         default:
707                 /* already checked by kptllnd_msg_unpack() */
708                 LBUG();
709
710         case PTLLND_MSG_TYPE_HELLO:
711                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
712                 goto rx_done;
713
714         case PTLLND_MSG_TYPE_NOOP:
715                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
716                 goto rx_done;
717
718         case PTLLND_MSG_TYPE_IMMEDIATE:
719                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
720                 rc = lnet_parse(kptllnd_data.kptl_ni,
721                                 &msg->ptlm_u.immediate.kptlim_hdr,
722                                 msg->ptlm_srcnid,
723                                 rx, 0);
724                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
725                         return;
726                 goto failed;
727                 
728         case PTLLND_MSG_TYPE_PUT:
729         case PTLLND_MSG_TYPE_GET:
730                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
731                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
732                         "PUT" : "GET");
733
734                 /* checked in kptllnd_msg_unpack() */
735                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
736                          PTL_RESERVED_MATCHBITS);
737
738                 /* Update last match bits seen */
739                 spin_lock_irqsave(&peer->peer_lock, flags);
740
741                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
742                     rx->rx_peer->peer_last_matchbits_seen)
743                         rx->rx_peer->peer_last_matchbits_seen =
744                                 msg->ptlm_u.rdma.kptlrm_matchbits;
745
746                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
747
748                 rc = lnet_parse(kptllnd_data.kptl_ni,
749                                 &msg->ptlm_u.rdma.kptlrm_hdr,
750                                 msg->ptlm_srcnid,
751                                 rx, 1);
752                 if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
753                         return;
754                 goto failed;
755          }
756
757  failed:
758         kptllnd_peer_close(peer, rc);
759         if (rx->rx_peer == NULL)                /* drop ref on peer */
760                 kptllnd_peer_decref(peer);      /* unless rx_done will */
761  rx_done:
762         kptllnd_rx_done(rx, post_credit);
763 }