Whamcloud - gitweb
b=17167 libcfs: ensure all libcfs exported symbols to have cfs_ prefix
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_rx_buf.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41  #include "ptllnd.h"
42
43 void
44 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
45 {
46         memset(rxbp, 0, sizeof(*rxbp));
47         cfs_spin_lock_init(&rxbp->rxbp_lock);
48         CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
49 }
50
51 void
52 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
53 {
54         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
55
56         LASSERT(rxb->rxb_refcount == 0);
57         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
58         LASSERT(!rxb->rxb_posted);
59         LASSERT(rxb->rxb_idle);
60
61         cfs_list_del(&rxb->rxb_list);
62         rxbp->rxbp_count--;
63
64         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
65         LIBCFS_FREE(rxb, sizeof(*rxb));
66 }
67
68 int
69 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
70 {
71         int               bufsize;
72         int               msgs_per_buffer;
73         int               rc;
74         kptl_rx_buffer_t *rxb;
75         char             *buffer;
76         unsigned long     flags;
77
78         bufsize = kptllnd_rx_buffer_size();
79         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
80
81         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
82
83         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
84
85         for (;;) {
86                 if (rxbp->rxbp_shutdown) {
87                         rc = -ESHUTDOWN;
88                         break;
89                 }
90                 
91                 if (rxbp->rxbp_reserved + count <= 
92                     rxbp->rxbp_count * msgs_per_buffer) {
93                         rc = 0;
94                         break;
95                 }
96                 
97                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
98                 
99                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
100                 LIBCFS_ALLOC(buffer, bufsize);
101
102                 if (rxb == NULL || buffer == NULL) {
103                         CERROR("Failed to allocate rx buffer\n");
104
105                         if (rxb != NULL)
106                                 LIBCFS_FREE(rxb, sizeof(*rxb));
107                         if (buffer != NULL)
108                                 LIBCFS_FREE(buffer, bufsize);
109                         
110                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
111                         rc = -ENOMEM;
112                         break;
113                 }
114
115                 memset(rxb, 0, sizeof(*rxb));
116
117                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
118                 rxb->rxb_refcount = 0;
119                 rxb->rxb_pool = rxbp;
120                 rxb->rxb_idle = 0;
121                 rxb->rxb_posted = 0;
122                 rxb->rxb_buffer = buffer;
123                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
124
125                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
126                 
127                 if (rxbp->rxbp_shutdown) {
128                         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
129                         
130                         LIBCFS_FREE(rxb, sizeof(*rxb));
131                         LIBCFS_FREE(buffer, bufsize);
132
133                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
134                         rc = -ESHUTDOWN;
135                         break;
136                 }
137                 
138                 cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
139                 rxbp->rxbp_count++;
140
141                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
142                 
143                 kptllnd_rx_buffer_post(rxb);
144
145                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
146         }
147
148         if (rc == 0)
149                 rxbp->rxbp_reserved += count;
150
151         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
152
153         return rc;
154 }
155
156 void
157 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
158                                  int count)
159 {
160         unsigned long flags;
161
162         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
163
164         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
165         rxbp->rxbp_reserved -= count;
166
167         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
168 }
169
170 void
171 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
172 {
173         kptl_rx_buffer_t       *rxb;
174         int                     rc;
175         int                     i;
176         unsigned long           flags;
177         cfs_list_t             *tmp;
178         cfs_list_t             *nxt;
179         ptl_handle_md_t         mdh;
180
181         /* CAVEAT EMPTOR: I'm racing with everything here!!!
182          *
183          * Buffers can still be posted after I set rxbp_shutdown because I
184          * can't hold rxbp_lock while I'm posting them.
185          *
186          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
187          * MD handle could become invalid under me.  I am vulnerable to portals
188          * re-using handles (i.e. make the same handle valid again, but for a
189          * different MD) from when the MD is actually unlinked, to when the
190          * event callback tells me it has been unlinked. */
191
192         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
193
194         rxbp->rxbp_shutdown = 1;
195
196         for (i = 9;; i++) {
197                 cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
198                         rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
199
200                         if (rxb->rxb_idle) {
201                                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock,
202                                                            flags);
203                                 kptllnd_rx_buffer_destroy(rxb);
204                                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock,
205                                                       flags);
206                                 continue;
207                         }
208
209                         mdh = rxb->rxb_mdh;
210                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
211                                 continue;
212                         
213                         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
214
215                         rc = PtlMDUnlink(mdh);
216
217                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
218                         
219 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
220                         /* callback clears rxb_mdh and drops net's ref
221                          * (which causes repost, but since I set
222                          * shutdown, it will just set the buffer
223                          * idle) */
224 #else
225                         if (rc == PTL_OK) {
226                                 rxb->rxb_posted = 0;
227                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
228                                 kptllnd_rx_buffer_decref_locked(rxb);
229                         }
230 #endif
231                 }
232
233                 if (cfs_list_empty(&rxbp->rxbp_list))
234                         break;
235
236                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
237
238                 /* Wait a bit for references to be dropped */
239                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
240                        "Waiting for %d Busy RX Buffers\n",
241                        rxbp->rxbp_count);
242
243                 cfs_pause(cfs_time_seconds(1));
244
245                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
246         }
247
248         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
249 }
250
251 void
252 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
253 {
254         int                     rc;
255         ptl_md_t                md;
256         ptl_handle_me_t         meh;
257         ptl_handle_md_t         mdh;
258         ptl_process_id_t        any;
259         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
260         unsigned long           flags;
261
262         LASSERT (!cfs_in_interrupt());
263         LASSERT (rxb->rxb_refcount == 0);
264         LASSERT (!rxb->rxb_idle);
265         LASSERT (!rxb->rxb_posted);
266         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
267
268         any.nid = PTL_NID_ANY;
269         any.pid = PTL_PID_ANY;
270
271         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
272
273         if (rxbp->rxbp_shutdown) {
274                 rxb->rxb_idle = 1;
275                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
276                 return;
277         }
278
279         rxb->rxb_refcount = 1;                  /* net's ref */
280         rxb->rxb_posted = 1;                    /* I'm posting */
281         
282         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
283
284         rc = PtlMEAttach(kptllnd_data.kptl_nih,
285                          *kptllnd_tunables.kptl_portal,
286                          any,
287                          LNET_MSG_MATCHBITS,
288                          0, /* all matchbits are valid - ignore none */
289                          PTL_UNLINK,
290                          PTL_INS_AFTER,
291                          &meh);
292         if (rc != PTL_OK) {
293                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
294                        kptllnd_errtype2str(rc), rc);
295                 goto failed;
296         }
297
298         /*
299          * Setup MD
300          */
301         md.start = rxb->rxb_buffer;
302         md.length = kptllnd_rx_buffer_size();
303         md.threshold = PTL_MD_THRESH_INF;
304         md.options = PTL_MD_OP_PUT |
305                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
306                      PTL_MD_EVENT_START_DISABLE |
307                      PTL_MD_MAX_SIZE |
308                      PTL_MD_LOCAL_ALIGN8;
309         md.user_ptr = &rxb->rxb_eventarg;
310         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
311         md.eq_handle = kptllnd_data.kptl_eqh;
312
313         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
314         if (rc == PTL_OK) {
315                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
316                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
317                         rxb->rxb_mdh = mdh;
318                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
319                 return;
320         }
321         
322         CERROR("PtlMDAttach rxb failed %s(%d)\n",
323                kptllnd_errtype2str(rc), rc);
324         rc = PtlMEUnlink(meh);
325         LASSERT(rc == PTL_OK);
326
327  failed:
328         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
329         rxb->rxb_posted = 0;
330         /* XXX this will just try again immediately */
331         kptllnd_rx_buffer_decref_locked(rxb);
332         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
333 }
334
335 kptl_rx_t *
336 kptllnd_rx_alloc(void)
337 {
338         kptl_rx_t* rx;
339
340         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
341                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
342                 return NULL;
343         }
344
345         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
346         if (rx == NULL) {
347                 CERROR("Failed to allocate rx\n");
348                 return NULL;
349         }
350
351         memset(rx, 0, sizeof(*rx));
352         return rx;
353 }
354
355 void
356 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
357 {
358         kptl_rx_buffer_t *rxb = rx->rx_rxb;
359         kptl_peer_t      *peer = rx->rx_peer;
360         unsigned long     flags;
361
362         LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
363                  post_credit == PTLLND_POSTRX_PEER_CREDIT);
364
365         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
366
367         if (rxb != NULL)
368                 kptllnd_rx_buffer_decref(rxb);
369
370         if (peer != NULL) {
371                 /* Update credits (after I've decref-ed the buffer) */
372                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
373
374                 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
375                         peer->peer_outstanding_credits++;
376
377                 LASSERT (peer->peer_outstanding_credits +
378                          peer->peer_sent_credits <=
379                          *kptllnd_tunables.kptl_peertxcredits);
380
381                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
382                        libcfs_id2str(peer->peer_id), peer->peer_credits,
383                        peer->peer_outstanding_credits, peer->peer_sent_credits,
384                        rx);
385
386                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
387
388                 /* I might have to send back credits */
389                 kptllnd_peer_check_sends(peer);
390                 kptllnd_peer_decref(peer);
391         }
392
393         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
394 }
395
396 void
397 kptllnd_rx_buffer_callback (ptl_event_t *ev)
398 {
399         kptl_eventarg_t        *eva = ev->md.user_ptr;
400         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
401         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
402         kptl_rx_t              *rx;
403         int                     unlinked;
404         unsigned long           flags;
405
406 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
407         unlinked = ev->unlinked;
408 #else
409         unlinked = ev->type == PTL_EVENT_UNLINK;
410 #endif
411
412         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
413                kptllnd_ptlid2str(ev->initiator),
414                kptllnd_evtype2str(ev->type), ev->type, rxb,
415                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
416                unlinked);
417
418         LASSERT (!rxb->rxb_idle);
419         LASSERT (ev->md.start == rxb->rxb_buffer);
420         LASSERT (ev->offset + ev->mlength <=
421                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
422         LASSERT (ev->type == PTL_EVENT_PUT_END ||
423                  ev->type == PTL_EVENT_UNLINK);
424         LASSERT (ev->type == PTL_EVENT_UNLINK ||
425                  ev->match_bits == LNET_MSG_MATCHBITS);
426
427         if (ev->ni_fail_type != PTL_NI_OK) {
428                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
429                        kptllnd_ptlid2str(ev->initiator),
430                        kptllnd_evtype2str(ev->type), ev->type, rxb,
431                        kptllnd_errtype2str(ev->ni_fail_type),
432                        ev->ni_fail_type, unlinked);
433                 kptllnd_schedule_ptltrace_dump();
434         } else if (ev->type == PTL_EVENT_PUT_END &&
435                    !rxbp->rxbp_shutdown) {
436
437                 /* rxbp_shutdown sampled without locking!  I only treat it as a
438                  * hint since shutdown can start while rx's are queued on
439                  * kptl_sched_rxq. */
440 #if (PTL_MD_LOCAL_ALIGN8 == 0)
441                 /* Portals can't force message alignment - someone sending an
442                  * odd-length message will misalign subsequent messages and
443                  * force the fixup below...  */
444                 if ((ev->mlength & 7) != 0)
445                         CWARN("Message from %s has odd length "LPU64": "
446                               "probable version incompatibility\n",
447                               kptllnd_ptlid2str(ev->initiator),
448                               (__u64)ev->mlength);
449 #endif
450                 rx = kptllnd_rx_alloc();
451                 if (rx == NULL) {
452                         CERROR("Message from %s dropped: ENOMEM",
453                                kptllnd_ptlid2str(ev->initiator));
454                 } else {
455                         if ((ev->offset & 7) == 0) {
456                                 kptllnd_rx_buffer_addref(rxb);
457                                 rx->rx_rxb = rxb;
458                                 rx->rx_nob = ev->mlength;
459                                 rx->rx_msg = (kptl_msg_t *)
460                                              (rxb->rxb_buffer + ev->offset);
461                         } else {
462 #if (PTL_MD_LOCAL_ALIGN8 == 0)
463                                 /* Portals can't force alignment - copy into
464                                  * rx_space (avoiding overflow) to fix */
465                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
466
467                                 rx->rx_rxb = NULL;
468                                 rx->rx_nob = MIN(maxlen, ev->mlength);
469                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
470                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
471                                        rx->rx_nob);
472 #else
473                                 /* Portals should have forced the alignment */
474                                 LBUG();
475 #endif
476                         }
477
478                         rx->rx_initiator = ev->initiator;
479                         rx->rx_treceived = jiffies;
480 #ifdef CRAY_XT3
481                         rx->rx_uid = ev->uid;
482 #endif
483                         /* Queue for attention */
484                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
485                                               flags);
486
487                         cfs_list_add_tail(&rx->rx_list,
488                                           &kptllnd_data.kptl_sched_rxq);
489                         cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
490
491                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
492                                                    kptl_sched_lock, flags);
493                 }
494         }
495
496         if (unlinked) {
497                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
498
499                 rxb->rxb_posted = 0;
500                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
501                 kptllnd_rx_buffer_decref_locked(rxb);
502
503                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
504         }
505 }
506
507 void
508 kptllnd_nak (ptl_process_id_t dest)
509 {
510         /* Fire-and-forget a stub message that will let the peer know my
511          * protocol magic/version and make her drop/refresh any peer state she
512          * might have with me. */
513         ptl_md_t md = {
514                 .start        = kptllnd_data.kptl_nak_msg,
515                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
516                 .threshold    = 1,
517                 .options      = 0,
518                 .user_ptr     = NULL,
519                 .eq_handle    = PTL_EQ_NONE};
520         ptl_handle_md_t   mdh;
521         int               rc;
522
523         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
524         if (rc != PTL_OK) {
525                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
526                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
527                 return;
528         }
529
530         rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
531                     *kptllnd_tunables.kptl_portal, 0,
532                     LNET_MSG_MATCHBITS, 0, 0);
533         if (rc != PTL_OK) {
534                 CWARN("Can't NAK %s: put failed %s(%d)\n",
535                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
536                 kptllnd_schedule_ptltrace_dump();
537         }
538 }
539
540 kptl_net_t *
541 kptllnd_find_net (lnet_nid_t nid)
542 {
543         kptl_net_t *net;
544
545         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
546         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
547                 LASSERT (!net->net_shutdown);
548
549                 if (net->net_ni->ni_nid == nid) {
550                         kptllnd_net_addref(net);
551                         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
552                         return net;
553                 }
554         }
555         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
556
557         return NULL;
558 }
559
560 void
561 kptllnd_rx_parse(kptl_rx_t *rx)
562 {
563         kptl_msg_t             *msg = rx->rx_msg;
564         int                     rc = 0;
565         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
566         kptl_net_t             *net = NULL;
567         kptl_peer_t            *peer;
568         cfs_list_t              txs;
569         unsigned long           flags;
570         lnet_process_id_t       srcid;
571
572         LASSERT (!cfs_in_interrupt());
573         LASSERT (rx->rx_peer == NULL);
574
575         CFS_INIT_LIST_HEAD(&txs);
576
577         if ((rx->rx_nob >= 4 &&
578              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
579               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
580             (rx->rx_nob >= 6 &&
581              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
582                msg->ptlm_version != PTLLND_MSG_VERSION) ||
583               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
584                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
585                 /* NAK incompatible versions
586                  * See other LNDs for how to handle this if/when ptllnd begins
587                  * to allow different versions to co-exist */
588                 CERROR("Bad version: got %04x expected %04x from %s\n",
589                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
590                                msg->ptlm_version : __swab16(msg->ptlm_version)),
591                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
592                 /* NB backward compatibility */
593                 kptllnd_nak(rx->rx_initiator);
594                 goto rx_done;
595         }
596         
597         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
598         if (rc != 0) {
599                 CERROR ("Error %d unpacking rx from %s\n",
600                         rc, kptllnd_ptlid2str(rx->rx_initiator));
601                 goto rx_done;
602         }
603
604         srcid.nid = msg->ptlm_srcnid;
605         srcid.pid = msg->ptlm_srcpid;
606
607         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
608                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
609                msg->ptlm_credits, rx, rx->rx_rxb, 
610                jiffies - rx->rx_treceived,
611                cfs_duration_sec(jiffies - rx->rx_treceived));
612
613         if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
614                 CERROR("Bad source nid %s from %s\n",
615                        libcfs_id2str(srcid),
616                        kptllnd_ptlid2str(rx->rx_initiator));
617                 goto rx_done;
618         }
619
620         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
621                 peer = kptllnd_id2peer(srcid);
622                 if (peer == NULL)
623                         goto rx_done;
624                 
625                 CWARN("NAK from %s (%d:%s)\n",
626                       libcfs_id2str(srcid), peer->peer_state,
627                       kptllnd_ptlid2str(rx->rx_initiator));
628
629                 /* NB can't nuke new peer - bug 17546 comment 31 */
630                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
631                         CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
632                                libcfs_id2str(srcid),
633                                kptllnd_ptlid2str(rx->rx_initiator));
634                         kptllnd_peer_decref(peer);
635                         goto rx_done;
636                 }
637
638                 rc = -EPROTO;
639                 goto failed;
640         }
641
642         net = kptllnd_find_net(msg->ptlm_dstnid);
643         if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
644                 CERROR("Bad dstid %s from %s\n",
645                        libcfs_id2str((lnet_process_id_t) {
646                                .nid = msg->ptlm_dstnid,
647                                .pid = msg->ptlm_dstpid}),
648                        kptllnd_ptlid2str(rx->rx_initiator));
649                 goto rx_done;
650         }
651
652         if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
653                 lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
654                                             LNET_NIDADDR(srcid.nid));
655                 CERROR("Bad source nid %s from %s, %s expected.\n",
656                        libcfs_id2str(srcid),
657                        kptllnd_ptlid2str(rx->rx_initiator),
658                        libcfs_nid2str(nid));
659                 goto rx_done;
660         }
661
662         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
663                 peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
664                 if (peer == NULL)
665                         goto rx_done;
666         } else {
667                 peer = kptllnd_id2peer(srcid);
668                 if (peer == NULL) {
669                         CWARN("NAK %s: no connection, %s must reconnect\n",
670                               kptllnd_msgtype2str(msg->ptlm_type),
671                               libcfs_id2str(srcid));
672                         /* NAK to make the peer reconnect */
673                         kptllnd_nak(rx->rx_initiator);
674                         goto rx_done;
675                 }
676
677                 /* Ignore any messages for a previous incarnation of me */
678                 if (msg->ptlm_dststamp < peer->peer_myincarnation) {
679                         kptllnd_peer_decref(peer);
680                         goto rx_done;
681                 }
682
683                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
684                         CERROR("%s: Unexpected dststamp "LPX64" "
685                                "("LPX64" expected)\n",
686                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
687                                peer->peer_myincarnation);
688                         rc = -EPROTO;
689                         goto failed;
690                 }
691
692                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
693                         /* recoverable error - restart txs */
694                         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
695                         kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
696                         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
697
698                         CWARN("NAK %s: Unexpected %s message\n",
699                               libcfs_id2str(srcid),
700                               kptllnd_msgtype2str(msg->ptlm_type));
701                         kptllnd_nak(rx->rx_initiator);
702                         rc = -EPROTO;
703                         goto failed;
704                 }
705
706                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
707                         CERROR("%s: Unexpected srcstamp "LPX64" "
708                                "("LPX64" expected)\n",
709                                libcfs_id2str(srcid),
710                                msg->ptlm_srcstamp,
711                                peer->peer_incarnation);
712                         rc = -EPROTO;
713                         goto failed;
714                 }
715         }
716
717         LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
718                          LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
719                   libcfs_nid2str(msg->ptlm_srcnid),
720                   libcfs_nid2str(peer->peer_id.nid));
721         LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
722                   msg->ptlm_srcpid, peer->peer_id.pid);
723
724         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
725
726         /* Check peer only sends when I've sent her credits */
727         if (peer->peer_sent_credits == 0) {
728                 int  c = peer->peer_credits;
729                 int oc = peer->peer_outstanding_credits;
730                 int sc = peer->peer_sent_credits;
731
732                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
733
734                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
735                        libcfs_id2str(peer->peer_id), c, sc, oc);
736                 rc = -EPROTO;
737                 goto failed;
738         }
739         peer->peer_sent_credits--;
740
741         /* No check for credit overflow - the peer may post new
742          * buffers after the startup handshake. */
743         peer->peer_credits += msg->ptlm_credits;
744
745         /* This ensures the credit taken by NOOP can be returned */
746         if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
747                 peer->peer_outstanding_credits++;
748                 post_credit = PTLLND_POSTRX_NO_CREDIT;
749         }
750
751         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
752
753         /* See if something can go out now that credits have come in */
754         if (msg->ptlm_credits != 0)
755                 kptllnd_peer_check_sends(peer);
756
757         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
758          * peer_outstanding_credits when it completes */
759         rx->rx_peer = peer;
760         kptllnd_peer_alive(peer);
761
762         switch (msg->ptlm_type) {
763         default:
764                 /* already checked by kptllnd_msg_unpack() */
765                 LBUG();
766
767         case PTLLND_MSG_TYPE_HELLO:
768                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
769                 goto rx_done;
770
771         case PTLLND_MSG_TYPE_NOOP:
772                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
773                 goto rx_done;
774
775         case PTLLND_MSG_TYPE_IMMEDIATE:
776                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
777                 rc = lnet_parse(net->net_ni,
778                                 &msg->ptlm_u.immediate.kptlim_hdr,
779                                 msg->ptlm_srcnid,
780                                 rx, 0);
781                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
782                         kptllnd_net_decref(net);
783                         return;
784                 }
785                 goto failed;
786                 
787         case PTLLND_MSG_TYPE_PUT:
788         case PTLLND_MSG_TYPE_GET:
789                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
790                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
791                         "PUT" : "GET");
792
793                 /* checked in kptllnd_msg_unpack() */
794                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
795                          PTL_RESERVED_MATCHBITS);
796
797                 /* Update last match bits seen */
798                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
799
800                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
801                     rx->rx_peer->peer_last_matchbits_seen)
802                         rx->rx_peer->peer_last_matchbits_seen =
803                                 msg->ptlm_u.rdma.kptlrm_matchbits;
804
805                 cfs_spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
806
807                 rc = lnet_parse(net->net_ni,
808                                 &msg->ptlm_u.rdma.kptlrm_hdr,
809                                 msg->ptlm_srcnid,
810                                 rx, 1);
811                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
812                         kptllnd_net_decref(net);
813                         return;
814                 }
815                 goto failed;
816         }
817
818  failed:
819         LASSERT (rc != 0);
820         kptllnd_peer_close(peer, rc);
821         if (rx->rx_peer == NULL)                /* drop ref on peer */
822                 kptllnd_peer_decref(peer);      /* unless rx_done will */
823         if (!cfs_list_empty(&txs)) {
824                 LASSERT (net != NULL);
825                 kptllnd_restart_txs(net, srcid, &txs);
826         }
827  rx_done:
828         if (net != NULL)
829                 kptllnd_net_decref(net);
830         kptllnd_rx_done(rx, post_credit);
831 }