Whamcloud - gitweb
land 1.0.1 fixes on main development branch (head)
[fs/lustre-release.git] / lustre / ptlrpc / events.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #ifdef __KERNEL__
26 #include <linux/module.h>
27 #else
28 #include <liblustre.h>
29 #endif
30 #include <linux/obd_class.h>
31 #include <linux/lustre_net.h>
32
33 struct ptlrpc_ni  ptlrpc_interfaces[NAL_MAX_NR];
34 int               ptlrpc_ninterfaces;
35
36 /*
37  *  Free the packet when it has gone out
38  */
39 static int request_out_callback(ptl_event_t *ev)
40 {
41         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
42         ENTRY;
43
44         /* requests always contiguous */
45         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
46
47         if (ev->type != PTL_EVENT_SENT) {
48                 // XXX make sure we understand all events, including ACK's
49                 CERROR("Unknown event %d\n", ev->type);
50                 LBUG();
51         }
52
53         /* this balances the atomic_inc in ptl_send_rpc() */
54         ptlrpc_req_finished(req);
55         RETURN(1);
56 }
57
58 /*
59  *  Free the packet when it has gone out
60  */
61 static int reply_out_callback(ptl_event_t *ev)
62 {
63         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
64         unsigned long          flags;
65         ENTRY;
66
67         /* replies always contiguous */
68         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
69
70         if (ev->type == PTL_EVENT_SENT) {
71                 /* NB don't even know if this is the current reply! In fact
72                  * we can't touch any state in the request, since the
73                  * service handler zeros it on each incoming request. */
74                 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
75         } else if (ev->type == PTL_EVENT_ACK) {
76                 LASSERT(req->rq_want_ack);
77                 spin_lock_irqsave(&req->rq_lock, flags);
78                 req->rq_want_ack = 0;
79                 wake_up(&req->rq_reply_waitq);
80                 spin_unlock_irqrestore(&req->rq_lock, flags);
81         } else {
82                 // XXX make sure we understand all events
83                 CERROR("Unknown event %d\n", ev->type);
84                 LBUG();
85         }
86
87         RETURN(1);
88 }
89
90 /*
91  * Wake up the thread waiting for the reply once it comes in.
92  */
93 int reply_in_callback(ptl_event_t *ev)
94 {
95         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
96         unsigned long flags;
97         ENTRY;
98
99         /* replies always contiguous */
100         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
101
102         if (req->rq_xid == 0x5a5a5a5a5a5a5a5aULL) {
103                 CERROR("Reply received for freed request!  Probably a missing "
104                        "ptlrpc_abort()\n");
105                 LBUG();
106         }
107
108         if (req->rq_xid != ev->match_bits) {
109                 CERROR("Reply packet for wrong request\n");
110                 LBUG();
111         }
112
113         if (ev->type == PTL_EVENT_PUT) {
114                 /* Bug 1190: should handle non-zero offset as a protocol
115                  * error  */
116                 LASSERT (ev->offset == 0);
117
118                 spin_lock_irqsave (&req->rq_lock, flags);
119                 LASSERT (req->rq_receiving_reply);
120                 req->rq_receiving_reply = 0;
121                 req->rq_replied = 1;
122                 if (req->rq_set != NULL)
123                         wake_up(&req->rq_set->set_waitq);
124                 else
125                         wake_up(&req->rq_reply_waitq);
126                 spin_unlock_irqrestore (&req->rq_lock, flags);
127         } else {
128                 // XXX make sure we understand all events, including ACKs
129                 CERROR("Unknown event %d\n", ev->type);
130                 LBUG();
131         }
132
133         RETURN(1);
134 }
135
136 int request_in_callback(ptl_event_t *ev)
137 {
138         struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
139         struct ptlrpc_srv_ni  *srv_ni = rqbd->rqbd_srv_ni;
140         struct ptlrpc_service *service = srv_ni->sni_service;
141
142         /* requests always contiguous */
143         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
144         /* we only enable puts */
145         LASSERT(ev->type == PTL_EVENT_PUT);
146         LASSERT(atomic_read(&srv_ni->sni_nrqbds_receiving) > 0);
147         LASSERT(atomic_read(&rqbd->rqbd_refcount) > 0);
148
149         if (ev->rlength != ev->mlength)
150                 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
151                        ev->mlength, ev->rlength);
152
153         if (!PtlHandleEqual (ev->unlinked_me, PTL_HANDLE_NONE)) {
154                 /* This is the last request to be received into this
155                  * request buffer.  We don't bump the refcount, since the
156                  * thread servicing this event is effectively taking over
157                  * portals' reference.
158                  */
159                 /* NB ev->unlinked_me.nal_idx is not set properly in a callback */
160                 LASSERT(ev->unlinked_me.cookie==rqbd->rqbd_me_h.cookie);
161
162                 /* we're off the air */
163                 /* we'll probably start dropping packets in portals soon */
164                 if (atomic_dec_and_test(&srv_ni->sni_nrqbds_receiving))
165                         CERROR("All request buffers busy\n");
166         } else {
167                 /* +1 ref for service thread */
168                 atomic_inc(&rqbd->rqbd_refcount);
169         }
170
171         wake_up(&service->srv_waitq);
172
173         return 0;
174 }
175
176 static int bulk_put_source_callback(ptl_event_t *ev)
177 {
178         unsigned long            flags;
179         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
180         ENTRY;
181
182         CDEBUG(D_NET, "got %s event %d\n",
183                (ev->type == PTL_EVENT_SENT) ? "SENT" :
184                (ev->type == PTL_EVENT_ACK)  ? "ACK"  : "UNEXPECTED", ev->type);
185
186         LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_ACK);
187
188         /* 1 fragment for each page always */
189         LASSERT(ev->mem_desc.niov == desc->bd_page_count);
190
191         spin_lock_irqsave (&desc->bd_lock, flags);
192         
193         LASSERT(desc->bd_callback_count > 0 &&
194                 desc->bd_callback_count <= 2);
195         
196         if (--desc->bd_callback_count == 0) {
197                 desc->bd_network_rw = 0;
198                 desc->bd_complete = 1;
199                 wake_up(&desc->bd_waitq);
200         }
201
202         spin_unlock_irqrestore (&desc->bd_lock, flags);
203         RETURN(0);
204 }
205
206 struct ptlrpc_bulk_desc ptlrpc_bad_desc;
207 ptl_event_t ptlrpc_bad_event;
208
209 static int bulk_put_sink_callback(ptl_event_t *ev)
210 {
211         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
212         unsigned long            flags;
213         ENTRY;
214
215         LASSERT(ev->type == PTL_EVENT_PUT);
216
217         /* used iovs */
218         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) ==
219                 PTL_MD_KIOV);
220         /* Honestly, it's best to find out early. */
221         if (desc->bd_page_count == 0x5a5a5a5a ||
222             desc->bd_page_count != ev->mem_desc.niov ||
223             ev->mem_desc.start != &desc->bd_iov) {
224                 /* not guaranteed (don't LASSERT) but good for this bug hunt */
225                 ptlrpc_bad_event = *ev;
226                 ptlrpc_bad_desc = *desc;
227                 CERROR ("XXX ev %p type %d portal %d match "LPX64", seq %ld\n",
228                         ev, ev->type, ev->portal, ev->match_bits, ev->sequence);
229                 CERROR ("XXX desc %p, export %p import %p gen %d "
230                         " portal %d\n", 
231                         desc, desc->bd_export,
232                         desc->bd_import, desc->bd_import_generation,
233                         desc->bd_portal);
234                 RETURN (0);
235         }
236         
237         LASSERT(desc->bd_page_count != 0x5a5a5a5a);
238         /* 1 fragment for each page always */
239         LASSERT(ev->mem_desc.niov == desc->bd_page_count);
240         LASSERT(ev->match_bits == desc->bd_req->rq_xid);
241         
242         /* peer must put with zero offset */
243         if (ev->offset != 0) {
244                 /* Bug 1190: handle this as a protocol failure */
245                 CERROR ("Bad offset %d\n", ev->offset);
246                 LBUG ();
247         }
248
249         /* No check for total # bytes; this could be a short read */
250
251         spin_lock_irqsave (&desc->bd_lock, flags);
252         desc->bd_network_rw = 0;
253         desc->bd_complete = 1;
254         if (desc->bd_req->rq_set != NULL)
255                 wake_up (&desc->bd_req->rq_set->set_waitq);
256         else
257                 wake_up (&desc->bd_req->rq_reply_waitq);
258         spin_unlock_irqrestore (&desc->bd_lock, flags);
259
260         RETURN(1);
261 }
262
263 static int bulk_get_source_callback(ptl_event_t *ev)
264 {
265         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
266         struct ptlrpc_bulk_page *bulk;
267         struct list_head        *tmp;
268         unsigned long            flags;
269         ptl_size_t               total = 0;
270         ENTRY;
271
272         LASSERT(ev->type == PTL_EVENT_GET);
273
274         /* used iovs */
275         LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) ==
276                 PTL_MD_KIOV);
277         /* 1 fragment for each page always */
278         LASSERT(ev->mem_desc.niov == desc->bd_page_count);
279         LASSERT(ev->match_bits == desc->bd_req->rq_xid);
280
281         /* peer must get with zero offset */
282         if (ev->offset != 0) {
283                 /* Bug 1190: handle this as a protocol failure */
284                 CERROR ("Bad offset %d\n", ev->offset);
285                 LBUG ();
286         }
287         
288         list_for_each (tmp, &desc->bd_page_list) {
289                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
290
291                 total += bulk->bp_buflen;
292         }
293
294         /* peer must get everything */
295         if (ev->mem_desc.length != total) {
296                 /* Bug 1190: handle this as a protocol failure */
297                 CERROR ("Bad length/total %d/%d\n", ev->mem_desc.length, total);
298                 LBUG ();
299         }
300
301         spin_lock_irqsave (&desc->bd_lock, flags);
302         desc->bd_network_rw = 0;
303         desc->bd_complete = 1;
304         if (desc->bd_req->rq_set != NULL)
305                 wake_up (&desc->bd_req->rq_set->set_waitq);
306         else
307                 wake_up (&desc->bd_req->rq_reply_waitq);
308         spin_unlock_irqrestore (&desc->bd_lock, flags);
309
310         RETURN(1);
311 }
312
313 static int bulk_get_sink_callback(ptl_event_t *ev)
314 {
315         struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
316         unsigned long            flags;
317         ENTRY;
318
319         CDEBUG(D_NET, "got %s event %d desc %p\n",
320                (ev->type == PTL_EVENT_SENT) ? "SENT" :
321                (ev->type == PTL_EVENT_REPLY)  ? "REPLY"  : "UNEXPECTED",
322                ev->type, desc);
323
324         LASSERT(ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_REPLY);
325
326         /* 1 fragment for each page always */
327         LASSERT(ev->mem_desc.niov == desc->bd_page_count);
328
329         spin_lock_irqsave (&desc->bd_lock, flags);
330         LASSERT(desc->bd_callback_count > 0 &&
331                 desc->bd_callback_count <= 2);
332
333         if (--desc->bd_callback_count == 0) {
334                 desc->bd_network_rw = 0;
335                 desc->bd_complete = 1;
336                 wake_up(&desc->bd_waitq);
337         }
338         spin_unlock_irqrestore (&desc->bd_lock, flags);
339
340         RETURN(0);
341 }
342
343 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
344 {
345         struct ptlrpc_ni   *pni;
346         struct lustre_peer  lpeer;
347         int                 i;
348         int                 rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
349
350         if (rc != 0)
351                 RETURN (rc);
352
353         for (i = 0; i < ptlrpc_ninterfaces; i++) {
354                 pni = &ptlrpc_interfaces[i];
355
356                 if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
357                             sizeof (lpeer.peer_ni))) {
358                         peer->peer_nid = lpeer.peer_nid;
359                         peer->peer_ni = pni;
360                         return (0);
361                 }
362         }
363
364         CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
365                lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
366         return (-ENOENT);
367 }
368
369 void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
370 {
371         PtlEQFree(pni->pni_request_out_eq_h);
372         PtlEQFree(pni->pni_reply_out_eq_h);
373         PtlEQFree(pni->pni_reply_in_eq_h);
374         PtlEQFree(pni->pni_bulk_put_source_eq_h);
375         PtlEQFree(pni->pni_bulk_put_sink_eq_h);
376         PtlEQFree(pni->pni_bulk_get_source_eq_h);
377         PtlEQFree(pni->pni_bulk_get_sink_eq_h);
378
379         kportal_put_ni (pni->pni_number);
380 }
381
382 int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
383 {
384         int              rc;
385         ptl_handle_ni_t *nip = kportal_get_ni (number);
386
387         if (nip == NULL) {
388                 CDEBUG (D_NET, "Network interface %s not loaded\n", name);
389                 return (-ENOENT);
390         }
391
392         CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
393
394         pni->pni_name = name;
395         pni->pni_number = number;
396         pni->pni_ni_h = *nip;
397
398         pni->pni_request_out_eq_h = PTL_HANDLE_NONE;
399         pni->pni_reply_out_eq_h = PTL_HANDLE_NONE;
400         pni->pni_reply_in_eq_h = PTL_HANDLE_NONE;
401         pni->pni_bulk_put_source_eq_h = PTL_HANDLE_NONE;
402         pni->pni_bulk_put_sink_eq_h = PTL_HANDLE_NONE;
403         pni->pni_bulk_get_source_eq_h = PTL_HANDLE_NONE;
404         pni->pni_bulk_get_sink_eq_h = PTL_HANDLE_NONE;
405
406         /* NB We never actually PtlEQGet() out of these events queues since
407          * we're only interested in the event callback, so we can just let
408          * them wrap.  Their sizes aren't a big deal, apart from providing
409          * a little history for debugging... */
410
411         rc = PtlEQAlloc(pni->pni_ni_h, 1024, request_out_callback,
412                         &pni->pni_request_out_eq_h);
413         if (rc != PTL_OK)
414                 GOTO (fail, rc = -ENOMEM);
415
416         rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_out_callback,
417                         &pni->pni_reply_out_eq_h);
418         if (rc != PTL_OK)
419                 GOTO (fail, rc = -ENOMEM);
420
421         rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_in_callback,
422                         &pni->pni_reply_in_eq_h);
423         if (rc != PTL_OK)
424                 GOTO (fail, rc = -ENOMEM);
425
426         rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_source_callback,
427                         &pni->pni_bulk_put_source_eq_h);
428         if (rc != PTL_OK)
429                 GOTO (fail, rc = -ENOMEM);
430
431         rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_sink_callback,
432                         &pni->pni_bulk_put_sink_eq_h);
433         if (rc != PTL_OK)
434                 GOTO (fail, rc = -ENOMEM);
435
436         rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_source_callback,
437                         &pni->pni_bulk_get_source_eq_h);
438         if (rc != PTL_OK)
439                 GOTO (fail, rc = -ENOMEM);
440
441         rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_sink_callback,
442                         &pni->pni_bulk_get_sink_eq_h);
443         if (rc != PTL_OK)
444                 GOTO (fail, rc = -ENOMEM);
445
446         return (0);
447  fail:
448         CERROR ("Failed to initialise network interface %s: %d\n",
449                 name, rc);
450
451         /* OK to do complete teardown since we invalidated the handles above */
452         ptlrpc_ni_fini (pni);
453         return (rc);
454 }
455
456 #ifndef __KERNEL__
457 int
458 liblustre_check_events (int block)
459 {
460         ptl_event_t ev;
461         int         rc;
462         ENTRY;
463
464         if (block) {
465                 /* XXX to accelerate recovery tests XXX */
466                 if (block > 10)
467                         block = 10;
468                 rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, block);
469         } else {
470                 rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
471         }
472         if (rc == PTL_EQ_EMPTY)
473                 RETURN(0);
474         
475         LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
476         
477 #if PORTALS_DOES_NOT_SUPPORT_CALLBACKS
478         if (rc == PTL_EQ_DROPPED)
479                 CERROR ("Dropped an event!!!\n");
480         
481         ptlrpc_master_callback (&ev);
482 #endif
483         RETURN(1);
484 }
485
486 int liblustre_wait_event(struct l_wait_info *lwi) 
487 {
488         ENTRY;
489
490         /* non-blocking checks (actually we might block in a service for
491          * bulk but we won't block in a blocked service)
492          */
493         if (liblustre_check_events(0) ||
494             liblustre_check_services()) {
495                 /* the condition the caller is waiting for may now hold */
496                 RETURN(0);
497         }
498         
499         /* block for an event */
500         liblustre_check_events(lwi->lwi_timeout);
501
502         /* check it's not for some service */
503         liblustre_check_services ();
504
505         /* XXX check this */
506         RETURN(0);
507 }
508 #endif
509
510 int ptlrpc_init_portals(void)
511 {
512         /* Add new portals network interfaces here.
513          * Order is irrelevent! */
514         static struct {
515                 int   number;
516                 char *name;
517         } ptl_nis[] = {
518                 {QSWNAL,  "qswnal"},
519                 {SOCKNAL, "socknal"},
520                 {GMNAL,   "gmnal"},
521                 {IBNAL,   "ibnal"},
522                 {TOENAL,  "toenal"},
523                 {TCPNAL,  "tcpnal"},
524                 {SCIMACNAL, "scimacnal"}};
525         int   rc;
526         int   i;
527
528         LASSERT(ptlrpc_ninterfaces == 0);
529
530         for (i = 0; i < sizeof (ptl_nis) / sizeof (ptl_nis[0]); i++) {
531                 LASSERT(ptlrpc_ninterfaces < (sizeof(ptlrpc_interfaces) /
532                                               sizeof(ptlrpc_interfaces[0])));
533
534                 rc = ptlrpc_ni_init(ptl_nis[i].number, ptl_nis[i].name,
535                                     &ptlrpc_interfaces[ptlrpc_ninterfaces]);
536                 if (rc == 0)
537                         ptlrpc_ninterfaces++;
538         }
539
540         if (ptlrpc_ninterfaces == 0) {
541                 CERROR("network initialisation failed: is a NAL module "
542                        "loaded?\n");
543                 return -EIO;
544         }
545         return 0;
546 }
547
548 void ptlrpc_exit_portals(void)
549 {
550         while (ptlrpc_ninterfaces > 0)
551                 ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);
552 }