Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd_rx_buf.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  */
38
39  #include "ptllnd.h"
40
41 void
42 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
43 {
44         memset(rxbp, 0, sizeof(*rxbp));
45         cfs_spin_lock_init(&rxbp->rxbp_lock);
46         CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
47 }
48
49 void
50 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
51 {
52         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
53
54         LASSERT(rxb->rxb_refcount == 0);
55         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
56         LASSERT(!rxb->rxb_posted);
57         LASSERT(rxb->rxb_idle);
58
59         cfs_list_del(&rxb->rxb_list);
60         rxbp->rxbp_count--;
61
62         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
63         LIBCFS_FREE(rxb, sizeof(*rxb));
64 }
65
66 int
67 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
68 {
69         int               bufsize;
70         int               msgs_per_buffer;
71         int               rc;
72         kptl_rx_buffer_t *rxb;
73         char             *buffer;
74         unsigned long     flags;
75
76         bufsize = kptllnd_rx_buffer_size();
77         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
78
79         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
80
81         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
82
83         for (;;) {
84                 if (rxbp->rxbp_shutdown) {
85                         rc = -ESHUTDOWN;
86                         break;
87                 }
88                 
89                 if (rxbp->rxbp_reserved + count <= 
90                     rxbp->rxbp_count * msgs_per_buffer) {
91                         rc = 0;
92                         break;
93                 }
94                 
95                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
96                 
97                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
98                 LIBCFS_ALLOC(buffer, bufsize);
99
100                 if (rxb == NULL || buffer == NULL) {
101                         CERROR("Failed to allocate rx buffer\n");
102
103                         if (rxb != NULL)
104                                 LIBCFS_FREE(rxb, sizeof(*rxb));
105                         if (buffer != NULL)
106                                 LIBCFS_FREE(buffer, bufsize);
107                         
108                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
109                         rc = -ENOMEM;
110                         break;
111                 }
112
113                 memset(rxb, 0, sizeof(*rxb));
114
115                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
116                 rxb->rxb_refcount = 0;
117                 rxb->rxb_pool = rxbp;
118                 rxb->rxb_idle = 0;
119                 rxb->rxb_posted = 0;
120                 rxb->rxb_buffer = buffer;
121                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
122
123                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
124                 
125                 if (rxbp->rxbp_shutdown) {
126                         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
127                         
128                         LIBCFS_FREE(rxb, sizeof(*rxb));
129                         LIBCFS_FREE(buffer, bufsize);
130
131                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
132                         rc = -ESHUTDOWN;
133                         break;
134                 }
135                 
136                 cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
137                 rxbp->rxbp_count++;
138
139                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
140                 
141                 kptllnd_rx_buffer_post(rxb);
142
143                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
144         }
145
146         if (rc == 0)
147                 rxbp->rxbp_reserved += count;
148
149         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
150
151         return rc;
152 }
153
154 void
155 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
156                                  int count)
157 {
158         unsigned long flags;
159
160         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
161
162         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
163         rxbp->rxbp_reserved -= count;
164
165         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
166 }
167
168 void
169 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
170 {
171         kptl_rx_buffer_t       *rxb;
172         int                     rc;
173         int                     i;
174         unsigned long           flags;
175         cfs_list_t             *tmp;
176         cfs_list_t             *nxt;
177         ptl_handle_md_t         mdh;
178
179         /* CAVEAT EMPTOR: I'm racing with everything here!!!
180          *
181          * Buffers can still be posted after I set rxbp_shutdown because I
182          * can't hold rxbp_lock while I'm posting them.
183          *
184          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
185          * MD handle could become invalid under me.  I am vulnerable to portals
186          * re-using handles (i.e. make the same handle valid again, but for a
187          * different MD) from when the MD is actually unlinked, to when the
188          * event callback tells me it has been unlinked. */
189
190         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
191
192         rxbp->rxbp_shutdown = 1;
193
194         for (i = 9;; i++) {
195                 cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
196                         rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
197
198                         if (rxb->rxb_idle) {
199                                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock,
200                                                            flags);
201                                 kptllnd_rx_buffer_destroy(rxb);
202                                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock,
203                                                       flags);
204                                 continue;
205                         }
206
207                         mdh = rxb->rxb_mdh;
208                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
209                                 continue;
210                         
211                         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
212
213                         rc = PtlMDUnlink(mdh);
214
215                         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
216                         
217 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
218                         /* callback clears rxb_mdh and drops net's ref
219                          * (which causes repost, but since I set
220                          * shutdown, it will just set the buffer
221                          * idle) */
222 #else
223                         if (rc == PTL_OK) {
224                                 rxb->rxb_posted = 0;
225                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
226                                 kptllnd_rx_buffer_decref_locked(rxb);
227                         }
228 #endif
229                 }
230
231                 if (cfs_list_empty(&rxbp->rxbp_list))
232                         break;
233
234                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
235
236                 /* Wait a bit for references to be dropped */
237                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
238                        "Waiting for %d Busy RX Buffers\n",
239                        rxbp->rxbp_count);
240
241                 cfs_pause(cfs_time_seconds(1));
242
243                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
244         }
245
246         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
247 }
248
249 void
250 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
251 {
252         int                     rc;
253         ptl_md_t                md;
254         ptl_handle_me_t         meh;
255         ptl_handle_md_t         mdh;
256         ptl_process_id_t        any;
257         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
258         unsigned long           flags;
259
260         LASSERT (!cfs_in_interrupt());
261         LASSERT (rxb->rxb_refcount == 0);
262         LASSERT (!rxb->rxb_idle);
263         LASSERT (!rxb->rxb_posted);
264         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
265
266         any.nid = PTL_NID_ANY;
267         any.pid = PTL_PID_ANY;
268
269         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
270
271         if (rxbp->rxbp_shutdown) {
272                 rxb->rxb_idle = 1;
273                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
274                 return;
275         }
276
277         rxb->rxb_refcount = 1;                  /* net's ref */
278         rxb->rxb_posted = 1;                    /* I'm posting */
279         
280         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
281
282         rc = PtlMEAttach(kptllnd_data.kptl_nih,
283                          *kptllnd_tunables.kptl_portal,
284                          any,
285                          LNET_MSG_MATCHBITS,
286                          0, /* all matchbits are valid - ignore none */
287                          PTL_UNLINK,
288                          PTL_INS_AFTER,
289                          &meh);
290         if (rc != PTL_OK) {
291                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
292                        kptllnd_errtype2str(rc), rc);
293                 goto failed;
294         }
295
296         /*
297          * Setup MD
298          */
299         md.start = rxb->rxb_buffer;
300         md.length = kptllnd_rx_buffer_size();
301         md.threshold = PTL_MD_THRESH_INF;
302         md.options = PTL_MD_OP_PUT |
303                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
304                      PTL_MD_EVENT_START_DISABLE |
305                      PTL_MD_MAX_SIZE |
306                      PTL_MD_LOCAL_ALIGN8;
307         md.user_ptr = &rxb->rxb_eventarg;
308         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
309         md.eq_handle = kptllnd_data.kptl_eqh;
310
311         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
312         if (rc == PTL_OK) {
313                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
314                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
315                         rxb->rxb_mdh = mdh;
316                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
317                 return;
318         }
319         
320         CERROR("PtlMDAttach rxb failed %s(%d)\n",
321                kptllnd_errtype2str(rc), rc);
322         rc = PtlMEUnlink(meh);
323         LASSERT(rc == PTL_OK);
324
325  failed:
326         cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
327         rxb->rxb_posted = 0;
328         /* XXX this will just try again immediately */
329         kptllnd_rx_buffer_decref_locked(rxb);
330         cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
331 }
332
333 kptl_rx_t *
334 kptllnd_rx_alloc(void)
335 {
336         kptl_rx_t* rx;
337
338         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
339                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
340                 return NULL;
341         }
342
343         rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
344         if (rx == NULL) {
345                 CERROR("Failed to allocate rx\n");
346                 return NULL;
347         }
348
349         memset(rx, 0, sizeof(*rx));
350         return rx;
351 }
352
353 void
354 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
355 {
356         kptl_rx_buffer_t *rxb = rx->rx_rxb;
357         kptl_peer_t      *peer = rx->rx_peer;
358         unsigned long     flags;
359
360         LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
361                  post_credit == PTLLND_POSTRX_PEER_CREDIT);
362
363         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
364
365         if (rxb != NULL)
366                 kptllnd_rx_buffer_decref(rxb);
367
368         if (peer != NULL) {
369                 /* Update credits (after I've decref-ed the buffer) */
370                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
371
372                 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
373                         peer->peer_outstanding_credits++;
374
375                 LASSERT (peer->peer_outstanding_credits +
376                          peer->peer_sent_credits <=
377                          *kptllnd_tunables.kptl_peertxcredits);
378
379                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
380                        libcfs_id2str(peer->peer_id), peer->peer_credits,
381                        peer->peer_outstanding_credits, peer->peer_sent_credits,
382                        rx);
383
384                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
385
386                 /* I might have to send back credits */
387                 kptllnd_peer_check_sends(peer);
388                 kptllnd_peer_decref(peer);
389         }
390
391         cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
392 }
393
394 void
395 kptllnd_rx_buffer_callback (ptl_event_t *ev)
396 {
397         kptl_eventarg_t        *eva = ev->md.user_ptr;
398         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
399         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
400         kptl_rx_t              *rx;
401         int                     unlinked;
402         unsigned long           flags;
403
404 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
405         unlinked = ev->unlinked;
406 #else
407         unlinked = ev->type == PTL_EVENT_UNLINK;
408 #endif
409
410         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
411                kptllnd_ptlid2str(ev->initiator),
412                kptllnd_evtype2str(ev->type), ev->type, rxb,
413                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
414                unlinked);
415
416         LASSERT (!rxb->rxb_idle);
417         LASSERT (ev->md.start == rxb->rxb_buffer);
418         LASSERT (ev->offset + ev->mlength <=
419                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
420         LASSERT (ev->type == PTL_EVENT_PUT_END ||
421                  ev->type == PTL_EVENT_UNLINK);
422         LASSERT (ev->type == PTL_EVENT_UNLINK ||
423                  ev->match_bits == LNET_MSG_MATCHBITS);
424
425         if (ev->ni_fail_type != PTL_NI_OK) {
426                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
427                        kptllnd_ptlid2str(ev->initiator),
428                        kptllnd_evtype2str(ev->type), ev->type, rxb,
429                        kptllnd_errtype2str(ev->ni_fail_type),
430                        ev->ni_fail_type, unlinked);
431                 kptllnd_schedule_ptltrace_dump();
432         } else if (ev->type == PTL_EVENT_PUT_END &&
433                    !rxbp->rxbp_shutdown) {
434
435                 /* rxbp_shutdown sampled without locking!  I only treat it as a
436                  * hint since shutdown can start while rx's are queued on
437                  * kptl_sched_rxq. */
438 #if (PTL_MD_LOCAL_ALIGN8 == 0)
439                 /* Portals can't force message alignment - someone sending an
440                  * odd-length message will misalign subsequent messages and
441                  * force the fixup below...  */
442                 if ((ev->mlength & 7) != 0)
443                         CWARN("Message from %s has odd length "LPU64": "
444                               "probable version incompatibility\n",
445                               kptllnd_ptlid2str(ev->initiator),
446                               (__u64)ev->mlength);
447 #endif
448                 rx = kptllnd_rx_alloc();
449                 if (rx == NULL) {
450                         CERROR("Message from %s dropped: ENOMEM",
451                                kptllnd_ptlid2str(ev->initiator));
452                 } else {
453                         if ((ev->offset & 7) == 0) {
454                                 kptllnd_rx_buffer_addref(rxb);
455                                 rx->rx_rxb = rxb;
456                                 rx->rx_nob = ev->mlength;
457                                 rx->rx_msg = (kptl_msg_t *)
458                                              (rxb->rxb_buffer + ev->offset);
459                         } else {
460 #if (PTL_MD_LOCAL_ALIGN8 == 0)
461                                 /* Portals can't force alignment - copy into
462                                  * rx_space (avoiding overflow) to fix */
463                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
464
465                                 rx->rx_rxb = NULL;
466                                 rx->rx_nob = MIN(maxlen, ev->mlength);
467                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
468                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
469                                        rx->rx_nob);
470 #else
471                                 /* Portals should have forced the alignment */
472                                 LBUG();
473 #endif
474                         }
475
476                         rx->rx_initiator = ev->initiator;
477                         rx->rx_treceived = jiffies;
478 #ifdef CRAY_XT3
479                         rx->rx_uid = ev->uid;
480 #endif
481                         /* Queue for attention */
482                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
483                                               flags);
484
485                         cfs_list_add_tail(&rx->rx_list,
486                                           &kptllnd_data.kptl_sched_rxq);
487                         cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
488
489                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
490                                                    kptl_sched_lock, flags);
491                 }
492         }
493
494         if (unlinked) {
495                 cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
496
497                 rxb->rxb_posted = 0;
498                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
499                 kptllnd_rx_buffer_decref_locked(rxb);
500
501                 cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
502         }
503 }
504
505 void
506 kptllnd_nak (ptl_process_id_t dest)
507 {
508         /* Fire-and-forget a stub message that will let the peer know my
509          * protocol magic/version and make her drop/refresh any peer state she
510          * might have with me. */
511         ptl_md_t md = {
512                 .start        = kptllnd_data.kptl_nak_msg,
513                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
514                 .threshold    = 1,
515                 .options      = 0,
516                 .user_ptr     = NULL,
517                 .eq_handle    = PTL_EQ_NONE};
518         ptl_handle_md_t   mdh;
519         int               rc;
520
521         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
522         if (rc != PTL_OK) {
523                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
524                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
525                 return;
526         }
527
528         rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
529                     *kptllnd_tunables.kptl_portal, 0,
530                     LNET_MSG_MATCHBITS, 0, 0);
531         if (rc != PTL_OK) {
532                 CWARN("Can't NAK %s: put failed %s(%d)\n",
533                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
534                 kptllnd_schedule_ptltrace_dump();
535         }
536 }
537
538 kptl_net_t *
539 kptllnd_find_net (lnet_nid_t nid)
540 {
541         kptl_net_t *net;
542
543         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
544         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
545                 LASSERT (!net->net_shutdown);
546
547                 if (net->net_ni->ni_nid == nid) {
548                         kptllnd_net_addref(net);
549                         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
550                         return net;
551                 }
552         }
553         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
554
555         return NULL;
556 }
557
558 void
559 kptllnd_rx_parse(kptl_rx_t *rx)
560 {
561         kptl_msg_t             *msg = rx->rx_msg;
562         int                     rc = 0;
563         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
564         kptl_net_t             *net = NULL;
565         kptl_peer_t            *peer;
566         cfs_list_t              txs;
567         unsigned long           flags;
568         lnet_process_id_t       srcid;
569
570         LASSERT (!cfs_in_interrupt());
571         LASSERT (rx->rx_peer == NULL);
572
573         CFS_INIT_LIST_HEAD(&txs);
574
575         if ((rx->rx_nob >= 4 &&
576              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
577               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
578             (rx->rx_nob >= 6 &&
579              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
580                msg->ptlm_version != PTLLND_MSG_VERSION) ||
581               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
582                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
583                 /* NAK incompatible versions
584                  * See other LNDs for how to handle this if/when ptllnd begins
585                  * to allow different versions to co-exist */
586                 CERROR("Bad version: got %04x expected %04x from %s\n",
587                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
588                                msg->ptlm_version : __swab16(msg->ptlm_version)),
589                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
590                 /* NB backward compatibility */
591                 kptllnd_nak(rx->rx_initiator);
592                 goto rx_done;
593         }
594         
595         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
596         if (rc != 0) {
597                 CERROR ("Error %d unpacking rx from %s\n",
598                         rc, kptllnd_ptlid2str(rx->rx_initiator));
599                 goto rx_done;
600         }
601
602         srcid.nid = msg->ptlm_srcnid;
603         srcid.pid = msg->ptlm_srcpid;
604
605         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
606                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
607                msg->ptlm_credits, rx, rx->rx_rxb, 
608                jiffies - rx->rx_treceived,
609                cfs_duration_sec(jiffies - rx->rx_treceived));
610
611         if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
612                 CERROR("Bad source nid %s from %s\n",
613                        libcfs_id2str(srcid),
614                        kptllnd_ptlid2str(rx->rx_initiator));
615                 goto rx_done;
616         }
617
618         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
619                 peer = kptllnd_id2peer(srcid);
620                 if (peer == NULL)
621                         goto rx_done;
622                 
623                 CWARN("NAK from %s (%d:%s)\n",
624                       libcfs_id2str(srcid), peer->peer_state,
625                       kptllnd_ptlid2str(rx->rx_initiator));
626
627                 /* NB can't nuke new peer - bug 17546 comment 31 */
628                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
629                         CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
630                                libcfs_id2str(srcid),
631                                kptllnd_ptlid2str(rx->rx_initiator));
632                         kptllnd_peer_decref(peer);
633                         goto rx_done;
634                 }
635
636                 rc = -EPROTO;
637                 goto failed;
638         }
639
640         net = kptllnd_find_net(msg->ptlm_dstnid);
641         if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
642                 CERROR("Bad dstid %s from %s\n",
643                        libcfs_id2str((lnet_process_id_t) {
644                                .nid = msg->ptlm_dstnid,
645                                .pid = msg->ptlm_dstpid}),
646                        kptllnd_ptlid2str(rx->rx_initiator));
647                 goto rx_done;
648         }
649
650         if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
651                 lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
652                                             LNET_NIDADDR(srcid.nid));
653                 CERROR("Bad source nid %s from %s, %s expected.\n",
654                        libcfs_id2str(srcid),
655                        kptllnd_ptlid2str(rx->rx_initiator),
656                        libcfs_nid2str(nid));
657                 goto rx_done;
658         }
659
660         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
661                 peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
662                 if (peer == NULL)
663                         goto rx_done;
664         } else {
665                 peer = kptllnd_id2peer(srcid);
666                 if (peer == NULL) {
667                         CWARN("NAK %s: no connection, %s must reconnect\n",
668                               kptllnd_msgtype2str(msg->ptlm_type),
669                               libcfs_id2str(srcid));
670                         /* NAK to make the peer reconnect */
671                         kptllnd_nak(rx->rx_initiator);
672                         goto rx_done;
673                 }
674
675                 /* Ignore any messages for a previous incarnation of me */
676                 if (msg->ptlm_dststamp < peer->peer_myincarnation) {
677                         kptllnd_peer_decref(peer);
678                         goto rx_done;
679                 }
680
681                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
682                         CERROR("%s: Unexpected dststamp "LPX64" "
683                                "("LPX64" expected)\n",
684                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
685                                peer->peer_myincarnation);
686                         rc = -EPROTO;
687                         goto failed;
688                 }
689
690                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
691                         /* recoverable error - restart txs */
692                         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
693                         kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
694                         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
695
696                         CWARN("NAK %s: Unexpected %s message\n",
697                               libcfs_id2str(srcid),
698                               kptllnd_msgtype2str(msg->ptlm_type));
699                         kptllnd_nak(rx->rx_initiator);
700                         rc = -EPROTO;
701                         goto failed;
702                 }
703
704                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
705                         CERROR("%s: Unexpected srcstamp "LPX64" "
706                                "("LPX64" expected)\n",
707                                libcfs_id2str(srcid),
708                                msg->ptlm_srcstamp,
709                                peer->peer_incarnation);
710                         rc = -EPROTO;
711                         goto failed;
712                 }
713         }
714
715         LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
716                          LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
717                   libcfs_nid2str(msg->ptlm_srcnid),
718                   libcfs_nid2str(peer->peer_id.nid));
719         LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
720                   msg->ptlm_srcpid, peer->peer_id.pid);
721
722         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
723
724         /* Check peer only sends when I've sent her credits */
725         if (peer->peer_sent_credits == 0) {
726                 int  c = peer->peer_credits;
727                 int oc = peer->peer_outstanding_credits;
728                 int sc = peer->peer_sent_credits;
729
730                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
731
732                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
733                        libcfs_id2str(peer->peer_id), c, sc, oc);
734                 rc = -EPROTO;
735                 goto failed;
736         }
737         peer->peer_sent_credits--;
738
739         /* No check for credit overflow - the peer may post new
740          * buffers after the startup handshake. */
741         peer->peer_credits += msg->ptlm_credits;
742
743         /* This ensures the credit taken by NOOP can be returned */
744         if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
745                 peer->peer_outstanding_credits++;
746                 post_credit = PTLLND_POSTRX_NO_CREDIT;
747         }
748
749         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
750
751         /* See if something can go out now that credits have come in */
752         if (msg->ptlm_credits != 0)
753                 kptllnd_peer_check_sends(peer);
754
755         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
756          * peer_outstanding_credits when it completes */
757         rx->rx_peer = peer;
758         kptllnd_peer_alive(peer);
759
760         switch (msg->ptlm_type) {
761         default:
762                 /* already checked by kptllnd_msg_unpack() */
763                 LBUG();
764
765         case PTLLND_MSG_TYPE_HELLO:
766                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
767                 goto rx_done;
768
769         case PTLLND_MSG_TYPE_NOOP:
770                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
771                 goto rx_done;
772
773         case PTLLND_MSG_TYPE_IMMEDIATE:
774                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
775                 rc = lnet_parse(net->net_ni,
776                                 &msg->ptlm_u.immediate.kptlim_hdr,
777                                 msg->ptlm_srcnid,
778                                 rx, 0);
779                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
780                         kptllnd_net_decref(net);
781                         return;
782                 }
783                 goto failed;
784                 
785         case PTLLND_MSG_TYPE_PUT:
786         case PTLLND_MSG_TYPE_GET:
787                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
788                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
789                         "PUT" : "GET");
790
791                 /* checked in kptllnd_msg_unpack() */
792                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
793                          PTL_RESERVED_MATCHBITS);
794
795                 /* Update last match bits seen */
796                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
797
798                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
799                     rx->rx_peer->peer_last_matchbits_seen)
800                         rx->rx_peer->peer_last_matchbits_seen =
801                                 msg->ptlm_u.rdma.kptlrm_matchbits;
802
803                 cfs_spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
804
805                 rc = lnet_parse(net->net_ni,
806                                 &msg->ptlm_u.rdma.kptlrm_hdr,
807                                 msg->ptlm_srcnid,
808                                 rx, 1);
809                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
810                         kptllnd_net_decref(net);
811                         return;
812                 }
813                 goto failed;
814         }
815
816  failed:
817         LASSERT (rc != 0);
818         kptllnd_peer_close(peer, rc);
819         if (rx->rx_peer == NULL)                /* drop ref on peer */
820                 kptllnd_peer_decref(peer);      /* unless rx_done will */
821         if (!cfs_list_empty(&txs)) {
822                 LASSERT (net != NULL);
823                 kptllnd_restart_txs(net, srcid, &txs);
824         }
825  rx_done:
826         if (net != NULL)
827                 kptllnd_net_decref(net);
828         kptllnd_rx_done(rx, post_credit);
829 }