Whamcloud - gitweb
b=12302
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include "selftest.h"
12
13
14 typedef enum {
15         SRPC_STATE_NONE,
16         SRPC_STATE_NI_INIT,
17         SRPC_STATE_EQ_INIT,
18         SRPC_STATE_WI_INIT,
19         SRPC_STATE_RUNNING,
20         SRPC_STATE_STOPPING,
21 } srpc_state_t;
22
23 #define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
24 #define SRPC_PEER_CREDITS         16   /* >= most LND's default peer credit */
25
26 struct smoketest_rpc {
27         spinlock_t        rpc_glock;     /* global lock */
28         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
29         struct list_head *rpc_peers;     /* hash table of known peers */
30         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
31         srpc_state_t      rpc_state;
32         srpc_counters_t   rpc_counters;
33         __u64             rpc_matchbits; /* matchbits counter */
34 } srpc_data;
35
36 /* forward ref's */
37 int srpc_handle_rpc (swi_workitem_t *wi);
38
39 void srpc_get_counters (srpc_counters_t *cnt)
40 {
41         spin_lock(&srpc_data.rpc_glock);
42         *cnt = srpc_data.rpc_counters;
43         spin_unlock(&srpc_data.rpc_glock);
44 }
45
46 void srpc_set_counters (const srpc_counters_t *cnt)
47 {
48         spin_lock(&srpc_data.rpc_glock);
49         srpc_data.rpc_counters = *cnt;
50         spin_unlock(&srpc_data.rpc_glock);
51 }
52
53 void
54 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
55 {
56         LASSERT (i >= 0 && i < bk->bk_niov);
57
58 #ifdef __KERNEL__
59         bk->bk_iovs[i].kiov_offset = 0;
60         bk->bk_iovs[i].kiov_page   = pg;
61         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
62 #else
63         LASSERT (bk->bk_pages != NULL);
64
65         bk->bk_pages[i] = pg;
66         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
67         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
68 #endif
69         return;
70 }
71
72 void
73 srpc_free_bulk (srpc_bulk_t *bk)
74 {
75         int         i;
76         cfs_page_t *pg;
77
78         LASSERT (bk != NULL);
79 #ifndef __KERNEL__
80         LASSERT (bk->bk_pages != NULL);
81 #endif
82
83         for (i = 0; i < bk->bk_niov; i++) {
84 #ifdef __KERNEL__
85                 pg = bk->bk_iovs[i].kiov_page;
86 #else
87                 pg = bk->bk_pages[i];
88 #endif
89                 if (pg == NULL) break;
90
91                 cfs_free_page(pg);
92         }
93
94 #ifndef __KERNEL__
95         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
96 #endif
97         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
98         return;
99 }
100
101 srpc_bulk_t *
102 srpc_alloc_bulk (int npages, int sink)
103 {
104         srpc_bulk_t  *bk;
105         cfs_page_t  **pages;
106         int           i;
107
108         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
109
110         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
111         if (bk == NULL) {
112                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
113                 return NULL;
114         }
115
116         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
117         bk->bk_sink = sink;
118         bk->bk_niov = npages;
119         bk->bk_len  = npages * CFS_PAGE_SIZE;
120 #ifndef __KERNEL__
121         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
122         if (pages == NULL) {
123                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
124                 CERROR ("Can't allocate page array for %d pages\n", npages);
125                 return NULL;
126         }
127
128         memset(pages, 0, sizeof(cfs_page_t *) * npages);
129         bk->bk_pages = pages;
130 #else
131         UNUSED (pages);
132 #endif
133
134         for (i = 0; i < npages; i++) {
135                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
136
137                 if (pg == NULL) {
138                         CERROR ("Can't allocate page %d of %d\n", i, npages);
139                         srpc_free_bulk(bk);
140                         return NULL;
141                 }
142
143                 srpc_add_bulk_page(bk, pg, i);
144         }
145
146         return bk;
147 }
148
149
150 static inline struct list_head *
151 srpc_nid2peerlist (lnet_nid_t nid)
152 {
153         unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
154
155         return &srpc_data.rpc_peers[hash];
156 }
157
158 static inline srpc_peer_t *
159 srpc_create_peer (lnet_nid_t nid)
160 {
161         srpc_peer_t *peer;
162
163         LASSERT (nid != LNET_NID_ANY);
164
165         LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
166         if (peer == NULL) {
167                 CERROR ("Failed to allocate peer structure for %s\n",
168                         libcfs_nid2str(nid));
169                 return NULL;
170         }
171
172         memset(peer, 0, sizeof(srpc_peer_t));
173         peer->stp_nid     = nid;
174         peer->stp_credits = SRPC_PEER_CREDITS;
175
176         spin_lock_init(&peer->stp_lock);
177         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
178         CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
179         return peer;
180 }
181
182 srpc_peer_t *
183 srpc_find_peer_locked (lnet_nid_t nid)
184 {
185         struct list_head *peer_list = srpc_nid2peerlist(nid);
186         srpc_peer_t      *peer;
187
188         LASSERT (nid != LNET_NID_ANY);
189
190         list_for_each_entry (peer, peer_list, stp_list) {
191                 if (peer->stp_nid == nid)
192                         return peer;
193         }
194
195         return NULL;
196 }
197
198 static srpc_peer_t *
199 srpc_nid2peer (lnet_nid_t nid)
200 {
201         srpc_peer_t *peer;
202         srpc_peer_t *new_peer;
203
204         spin_lock(&srpc_data.rpc_glock);
205         peer = srpc_find_peer_locked(nid);
206         spin_unlock(&srpc_data.rpc_glock);
207
208         if (peer != NULL)
209                 return peer;
210         
211         new_peer = srpc_create_peer(nid);
212
213         spin_lock(&srpc_data.rpc_glock);
214
215         peer = srpc_find_peer_locked(nid);
216         if (peer != NULL) {
217                 spin_unlock(&srpc_data.rpc_glock);
218                 if (new_peer != NULL)
219                         LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
220
221                 return peer;
222         }
223
224         if (new_peer == NULL) {
225                 spin_unlock(&srpc_data.rpc_glock);
226                 return NULL;
227         }
228                 
229         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
230         spin_unlock(&srpc_data.rpc_glock);
231         return new_peer;
232 }
233
234 static inline __u64
235 srpc_next_id (void)
236 {
237         __u64 id;
238
239         spin_lock(&srpc_data.rpc_glock);
240         id = srpc_data.rpc_matchbits++;
241         spin_unlock(&srpc_data.rpc_glock);
242         return id;
243 }
244
245 void
246 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
247                       srpc_service_t *sv, srpc_buffer_t *buffer)
248 {
249         memset(rpc, 0, sizeof(*rpc));
250         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
251
252         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
253
254         rpc->srpc_service  = sv;
255         rpc->srpc_reqstbuf = buffer;
256         rpc->srpc_peer     = buffer->buf_peer;
257         rpc->srpc_self     = buffer->buf_self;
258         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
259 }
260
261 int
262 srpc_add_service (srpc_service_t *sv)
263 {
264         int                id = sv->sv_id;
265         int                i;
266         srpc_server_rpc_t *rpc;
267
268         LASSERT (sv->sv_concur > 0);
269         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
270
271         spin_lock(&srpc_data.rpc_glock);
272
273         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
274
275         if (srpc_data.rpc_services[id] != NULL) {
276                 spin_unlock(&srpc_data.rpc_glock);
277                 return -EBUSY;
278         }
279
280         srpc_data.rpc_services[id] = sv;
281         spin_unlock(&srpc_data.rpc_glock);
282
283         sv->sv_nprune       = 0;
284         sv->sv_nposted_msg  = 0;
285         sv->sv_shuttingdown = 0;
286         spin_lock_init(&sv->sv_lock);
287         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
288         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
289         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
290         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
291
292         sv->sv_ev.ev_data = sv;
293         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
294
295         for (i = 0; i < sv->sv_concur; i++) {
296                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
297                 if (rpc == NULL) goto enomem;
298
299                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
300         }
301
302         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
303                 id, sv->sv_name, sv->sv_concur);
304         return 0;
305
306 enomem:
307         while (!list_empty(&sv->sv_free_rpcq)) {
308                 rpc = list_entry(sv->sv_free_rpcq.next,
309                                  srpc_server_rpc_t, srpc_list);
310                 list_del(&rpc->srpc_list);
311                 LIBCFS_FREE(rpc, sizeof(*rpc));
312         }
313
314         spin_lock(&srpc_data.rpc_glock);
315         srpc_data.rpc_services[id] = NULL;
316         spin_unlock(&srpc_data.rpc_glock);
317         return -ENOMEM;
318 }
319
320 int
321 srpc_remove_service (srpc_service_t *sv)
322 {
323         int id = sv->sv_id;
324
325         spin_lock(&srpc_data.rpc_glock);
326
327         if (srpc_data.rpc_services[id] != sv) {
328                 spin_unlock(&srpc_data.rpc_glock);
329                 return -ENOENT;
330         }
331
332         srpc_data.rpc_services[id] = NULL;
333         spin_unlock(&srpc_data.rpc_glock);
334         return 0;
335 }
336
337 int
338 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
339                        int len, int options, lnet_process_id_t peer,
340                        lnet_handle_md_t *mdh, srpc_event_t *ev)
341 {
342         int              rc;
343         lnet_md_t        md;
344         lnet_handle_me_t meh;
345
346         rc = LNetMEAttach(portal, peer, matchbits, 0,
347                           LNET_UNLINK, LNET_INS_AFTER, &meh);
348         if (rc != 0) {
349                 CERROR ("LNetMEAttach failed: %d\n", rc);
350                 LASSERT (rc == -ENOMEM);
351                 return -ENOMEM;
352         }
353
354         md.threshold = 1;
355         md.user_ptr  = ev;
356         md.start     = buf;
357         md.length    = len;
358         md.options   = options;
359         md.eq_handle = srpc_data.rpc_lnet_eq;
360
361         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
362         if (rc != 0) {
363                 CERROR ("LNetMDAttach failed: %d\n", rc);
364                 LASSERT (rc == -ENOMEM);
365
366                 rc = LNetMEUnlink(meh);
367                 LASSERT (rc == 0);
368                 return -ENOMEM;
369         }
370
371         CDEBUG (D_NET,
372                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
373                 libcfs_id2str(peer), portal, matchbits);
374         return 0;
375 }
376
377 int
378 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 
379                       int options, lnet_process_id_t peer, lnet_nid_t self,
380                       lnet_handle_md_t *mdh, srpc_event_t *ev)
381 {
382         int       rc;
383         lnet_md_t md;
384
385         md.user_ptr  = ev;
386         md.start     = buf;
387         md.length    = len;
388         md.eq_handle = srpc_data.rpc_lnet_eq;
389         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
390         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
391
392         rc = LNetMDBind(md, LNET_UNLINK, mdh);
393         if (rc != 0) {
394                 CERROR ("LNetMDBind failed: %d\n", rc);
395                 LASSERT (rc == -ENOMEM);
396                 return -ENOMEM;
397         }
398
399         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
400          * they're only meaningful for MDs attached to an ME (i.e. passive
401          * buffers... */
402         if ((options & LNET_MD_OP_PUT) != 0) {
403                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
404                              portal, matchbits, 0, 0);
405         } else {
406                 LASSERT ((options & LNET_MD_OP_GET) != 0);
407
408                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
409         }
410
411         if (rc != 0) {
412                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
413                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
414                         libcfs_id2str(peer), portal, matchbits, rc);
415
416                 /* The forthcoming unlink event will complete this operation
417                  * with failure, so fall through and return success here.
418                  */
419                 rc = LNetMDUnlink(*mdh);
420                 LASSERT (rc == 0);
421         } else {
422                 CDEBUG (D_NET,
423                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
424                         libcfs_id2str(peer), portal, matchbits);
425         }
426         return 0;
427 }
428
429 int
430 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
431                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
432 {
433         int rc;
434         int portal;
435
436         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
437                 portal = SRPC_REQUEST_PORTAL;
438         else
439                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
440
441         rc = srpc_post_active_rdma(portal, service, buf, len, 
442                                    LNET_MD_OP_PUT, peer,
443                                    LNET_NID_ANY, mdh, ev);
444         return rc;
445 }
446
447 int
448 srpc_post_passive_rqtbuf(int service, void *buf, int len,
449                          lnet_handle_md_t *mdh, srpc_event_t *ev)
450 {
451         int               rc;
452         int               portal;
453         lnet_process_id_t any = {.nid = LNET_NID_ANY,
454                                  .pid = LNET_PID_ANY};
455
456         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
457                 portal = SRPC_REQUEST_PORTAL;
458         else
459                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
460
461         rc = srpc_post_passive_rdma(portal, service, buf, len,
462                                     LNET_MD_OP_PUT, any, mdh, ev);
463         return rc;
464 }
465
466 int
467 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
468 {
469         srpc_msg_t *msg = &buf->buf_msg;
470         int         rc;
471
472         LASSERT (!sv->sv_shuttingdown);
473
474         buf->buf_mdh = LNET_INVALID_HANDLE;
475         list_add(&buf->buf_list, &sv->sv_posted_msgq);
476         sv->sv_nposted_msg++;
477         spin_unlock(&sv->sv_lock);
478
479         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
480                                       &buf->buf_mdh, &sv->sv_ev);
481
482         /* At this point, a RPC (new or delayed) may have arrived in
483          * msg and its event handler has been called. So we must add
484          * buf to sv_posted_msgq _before_ dropping sv_lock */
485
486         spin_lock(&sv->sv_lock);
487
488         if (rc == 0) {
489                 if (sv->sv_shuttingdown) {
490                         spin_unlock(&sv->sv_lock);
491
492                         /* srpc_shutdown_service might have tried to unlink me
493                          * when my buf_mdh was still invalid */
494                         LNetMDUnlink(buf->buf_mdh);
495
496                         spin_lock(&sv->sv_lock);
497                 }
498                 return 0;
499         }
500
501         sv->sv_nposted_msg--;
502         if (sv->sv_shuttingdown) return rc;
503
504         list_del(&buf->buf_list);
505
506         spin_unlock(&sv->sv_lock);
507         LIBCFS_FREE(buf, sizeof(*buf));
508         spin_lock(&sv->sv_lock);
509         return rc; 
510 }
511
512 int
513 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
514 {
515         int                rc;
516         int                posted;
517         srpc_buffer_t     *buf;
518
519         LASSERTF (nbuffer > 0,
520                   "nbuffer must be positive: %d\n", nbuffer);
521
522         for (posted = 0; posted < nbuffer; posted++) {
523                 LIBCFS_ALLOC(buf, sizeof(*buf));
524                 if (buf == NULL) break;
525
526                 spin_lock(&sv->sv_lock);
527                 rc = srpc_service_post_buffer(sv, buf);
528                 spin_unlock(&sv->sv_lock);
529
530                 if (rc != 0) break;
531         }
532
533         return posted;
534 }
535
536 void
537 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
538 {
539         LASSERTF (nbuffer > 0,
540                   "nbuffer must be positive: %d\n", nbuffer);
541
542         spin_lock(&sv->sv_lock);
543
544         LASSERT (sv->sv_nprune >= 0);
545         LASSERT (!sv->sv_shuttingdown);
546
547         sv->sv_nprune += nbuffer;
548
549         spin_unlock(&sv->sv_lock);
550         return;
551 }
552
553 /* returns 1 if sv has finished, otherwise 0 */
554 int
555 srpc_finish_service (srpc_service_t *sv)
556 {
557         srpc_server_rpc_t *rpc;
558         srpc_buffer_t     *buf;
559
560         spin_lock(&sv->sv_lock);
561
562         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
563
564         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
565                 CDEBUG (D_NET,
566                         "waiting for %d posted buffers to unlink and "
567                         "in-flight RPCs to die.\n",
568                         sv->sv_nposted_msg);
569
570                 if (!list_empty(&sv->sv_active_rpcq)) {
571                         rpc = list_entry(sv->sv_active_rpcq.next,
572                                          srpc_server_rpc_t, srpc_list);
573                         CDEBUG (D_NETERROR,
574                                 "Active RPC on shutdown: sv %s, peer %s, "
575                                 "wi %s scheduled %d running %d, "
576                                 "ev fired %d type %d status %d lnet %d\n",
577                                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
578                                 swi_state2str(rpc->srpc_wi.wi_state),
579                                 rpc->srpc_wi.wi_scheduled,
580                                 rpc->srpc_wi.wi_running,
581                                 rpc->srpc_ev.ev_fired,
582                                 rpc->srpc_ev.ev_type,
583                                 rpc->srpc_ev.ev_status,
584                                 rpc->srpc_ev.ev_lnet);
585                 }
586
587                 spin_unlock(&sv->sv_lock);
588                 return 0;
589         }
590
591         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
592
593         for (;;) {
594                 struct list_head *q;
595
596                 if (!list_empty(&sv->sv_posted_msgq))
597                         q = &sv->sv_posted_msgq;
598                 else if (!list_empty(&sv->sv_blocked_msgq))
599                         q = &sv->sv_blocked_msgq;
600                 else
601                         break;
602
603                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
604                 list_del(&buf->buf_list);
605
606                 LIBCFS_FREE(buf, sizeof(*buf));
607         }
608
609         while (!list_empty(&sv->sv_free_rpcq)) {
610                 rpc = list_entry(sv->sv_free_rpcq.next,
611                                  srpc_server_rpc_t, srpc_list);
612                 list_del(&rpc->srpc_list);
613                 LIBCFS_FREE(rpc, sizeof(*rpc));
614         }
615
616         return 1;
617 }
618
619 /* called with sv->sv_lock held */
620 void
621 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
622 {
623         if (sv->sv_shuttingdown) goto free;
624
625         if (sv->sv_nprune == 0) {
626                 if (srpc_service_post_buffer(sv, buf) != 0)
627                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
628                 return;
629         }
630
631         sv->sv_nprune--;
632 free:
633         spin_unlock(&sv->sv_lock);
634         LIBCFS_FREE(buf, sizeof(*buf));
635         spin_lock(&sv->sv_lock);
636 }
637
638 void
639 srpc_shutdown_service (srpc_service_t *sv)
640 {
641         srpc_server_rpc_t *rpc;
642         srpc_buffer_t     *buf;
643
644         spin_lock(&sv->sv_lock);
645
646         CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
647                 sv->sv_id, sv->sv_name);
648
649         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
650
651         /* schedule in-flight RPCs to notice the shutdown */
652         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
653                 swi_schedule_workitem(&rpc->srpc_wi);
654         }
655
656         spin_unlock(&sv->sv_lock);
657
658         /* OK to traverse sv_posted_msgq without lock, since no one
659          * touches sv_posted_msgq now */
660         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
661                 LNetMDUnlink(buf->buf_mdh);
662
663         return;
664 }
665
666 int
667 srpc_send_request (srpc_client_rpc_t *rpc)
668 {
669         srpc_event_t *ev = &rpc->crpc_reqstev;
670         int           rc;
671
672         ev->ev_fired = 0;
673         ev->ev_data  = rpc;
674         ev->ev_type  = SRPC_REQUEST_SENT;
675
676         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
677                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
678                                      &rpc->crpc_reqstmdh, ev);
679         if (rc != 0) {
680                 LASSERT (rc == -ENOMEM);
681                 ev->ev_fired = 1;  /* no more event expected */
682         }
683         return rc;
684 }
685
686 int
687 srpc_prepare_reply (srpc_client_rpc_t *rpc)
688 {
689         srpc_event_t *ev = &rpc->crpc_replyev;
690         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
691         int           rc;
692
693         ev->ev_fired = 0;
694         ev->ev_data  = rpc;
695         ev->ev_type  = SRPC_REPLY_RCVD;
696
697         *id = srpc_next_id();
698
699         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
700                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
701                                     LNET_MD_OP_PUT, rpc->crpc_dest,
702                                     &rpc->crpc_replymdh, ev);
703         if (rc != 0) {
704                 LASSERT (rc == -ENOMEM);
705                 ev->ev_fired = 1;  /* no more event expected */
706         }
707         return rc;
708 }
709
710 int
711 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
712 {
713         srpc_bulk_t  *bk = &rpc->crpc_bulk;
714         srpc_event_t *ev = &rpc->crpc_bulkev;
715         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
716         int           rc;
717         int           opt;
718
719         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
720
721         if (bk->bk_niov == 0) return 0; /* nothing to do */
722
723         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
724 #ifdef __KERNEL__
725         opt |= LNET_MD_KIOV;
726 #else
727         opt |= LNET_MD_IOVEC;
728 #endif
729
730         ev->ev_fired = 0;
731         ev->ev_data  = rpc;
732         ev->ev_type  = SRPC_BULK_REQ_RCVD;
733
734         *id = srpc_next_id();
735
736         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
737                                     &bk->bk_iovs[0], bk->bk_niov, opt,
738                                     rpc->crpc_dest, &bk->bk_mdh, ev);
739         if (rc != 0) {
740                 LASSERT (rc == -ENOMEM);
741                 ev->ev_fired = 1;  /* no more event expected */
742         }
743         return rc;
744 }
745
746 int
747 srpc_do_bulk (srpc_server_rpc_t *rpc)
748 {
749         srpc_event_t  *ev = &rpc->srpc_ev;
750         srpc_bulk_t   *bk = rpc->srpc_bulk;
751         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
752         int            rc;
753         int            opt;
754
755         LASSERT (bk != NULL);
756
757         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
758 #ifdef __KERNEL__
759         opt |= LNET_MD_KIOV;
760 #else
761         opt |= LNET_MD_IOVEC;
762 #endif
763
764         ev->ev_fired = 0;
765         ev->ev_data  = rpc;
766         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
767
768         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
769                                    &bk->bk_iovs[0], bk->bk_niov, opt,
770                                    rpc->srpc_peer, rpc->srpc_self,
771                                    &bk->bk_mdh, ev);
772         if (rc != 0)
773                 ev->ev_fired = 1;  /* no more event expected */
774         return rc;
775 }
776
777 /* called with srpc_service_t::sv_lock held */
778 inline void
779 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
780 {
781         srpc_service_t *sv = rpc->srpc_service;
782
783         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
784                 swi_schedule_workitem(&rpc->srpc_wi);
785         else    /* framework RPCs are handled one by one */
786                 swi_schedule_serial_workitem(&rpc->srpc_wi);
787
788         return;
789 }
790
791 /* only called from srpc_handle_rpc */
792 void
793 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
794 {
795         srpc_service_t *sv = rpc->srpc_service;
796         srpc_buffer_t  *buffer;
797
798         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
799
800         rpc->srpc_status = status;
801
802         CDEBUG (status == 0 ? D_NET : D_NETERROR,
803                 "Server RPC done: service %s, peer %s, status %s:%d\n",
804                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
805                 swi_state2str(rpc->srpc_wi.wi_state), status);
806
807         if (status != 0) {
808                 spin_lock(&srpc_data.rpc_glock);
809                 srpc_data.rpc_counters.rpcs_dropped++;
810                 spin_unlock(&srpc_data.rpc_glock);
811         }
812
813         if (rpc->srpc_done != NULL)
814                 (*rpc->srpc_done) (rpc);
815         LASSERT (rpc->srpc_bulk == NULL);
816
817         spin_lock(&sv->sv_lock);
818
819         if (rpc->srpc_reqstbuf != NULL) {
820                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
821                  * sv won't go away for sv_active_rpcq must not be empty */
822                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
823                 rpc->srpc_reqstbuf = NULL;
824         }
825
826         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
827
828         /*
829          * No one can schedule me now since:
830          * - I'm not on sv_active_rpcq.
831          * - all LNet events have been fired.
832          * Cancel pending schedules and prevent future schedule attempts:
833          */
834         LASSERT (rpc->srpc_ev.ev_fired);
835         swi_kill_workitem(&rpc->srpc_wi);
836
837         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
838                 buffer = list_entry(sv->sv_blocked_msgq.next,
839                                     srpc_buffer_t, buf_list);
840                 list_del(&buffer->buf_list);
841
842                 srpc_init_server_rpc(rpc, sv, buffer);
843                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
844                 srpc_schedule_server_rpc(rpc);
845         } else {
846                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
847         }
848
849         spin_unlock(&sv->sv_lock);
850         return;
851 }
852
853 /* handles an incoming RPC */
854 int
855 srpc_handle_rpc (swi_workitem_t *wi)
856 {
857         srpc_server_rpc_t *rpc = wi->wi_data;
858         srpc_service_t    *sv = rpc->srpc_service;
859         srpc_event_t      *ev = &rpc->srpc_ev;
860         int                rc = 0;
861
862         LASSERT (wi == &rpc->srpc_wi);
863
864         spin_lock(&sv->sv_lock);
865
866         if (sv->sv_shuttingdown) {
867                 spin_unlock(&sv->sv_lock);
868
869                 if (rpc->srpc_bulk != NULL)
870                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
871                 LNetMDUnlink(rpc->srpc_replymdh);
872
873                 if (ev->ev_fired) { /* no more event, OK to finish */
874                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
875                         return 1;
876                 }
877                 return 0;
878         }
879
880         spin_unlock(&sv->sv_lock);
881
882         switch (wi->wi_state) {
883         default:
884                 LBUG ();
885         case SWI_STATE_NEWBORN: {
886                 srpc_msg_t           *msg;
887                 srpc_generic_reply_t *reply;
888
889                 msg = &rpc->srpc_reqstbuf->buf_msg;
890                 reply = &rpc->srpc_replymsg.msg_body.reply;
891
892                 if (msg->msg_version != SRPC_MSG_VERSION &&
893                     msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
894                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
895                                msg->msg_version, SRPC_MSG_VERSION,
896                                libcfs_id2str(rpc->srpc_peer));
897                         reply->status = EPROTO;
898                 } else {
899                         reply->status = 0;
900                         rc = (*sv->sv_handler) (rpc);
901                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
902                 }
903
904                 if (rc != 0) {
905                         srpc_server_rpc_done(rpc, rc);
906                         return 1;
907                 }
908
909                 wi->wi_state = SWI_STATE_BULK_STARTED;
910
911                 if (rpc->srpc_bulk != NULL) {
912                         rc = srpc_do_bulk(rpc);
913                         if (rc == 0)
914                                 return 0; /* wait for bulk */
915
916                         LASSERT (ev->ev_fired);
917                         ev->ev_status = rc;
918                 }
919         }
920         case SWI_STATE_BULK_STARTED:
921                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
922
923                 if (rpc->srpc_bulk != NULL) {
924                         rc = ev->ev_status;
925
926                         if (sv->sv_bulk_ready != NULL)
927                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
928
929                         if (rc != 0) {
930                                 srpc_server_rpc_done(rpc, rc);
931                                 return 1;
932                         }
933                 }
934
935                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
936                 rc = srpc_send_reply(rpc);
937                 if (rc == 0)
938                         return 0; /* wait for reply */
939                 srpc_server_rpc_done(rpc, rc);
940                 return 1;
941
942         case SWI_STATE_REPLY_SUBMITTED:
943                 LASSERT (ev->ev_fired);
944
945                 wi->wi_state = SWI_STATE_DONE;
946                 srpc_server_rpc_done(rpc, ev->ev_status);
947                 return 1;
948         }
949
950         return 0;
951 }
952
953 void
954 srpc_client_rpc_expired (void *data)
955 {
956         srpc_client_rpc_t *rpc = data;
957
958         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
959                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
960                rpc->crpc_timeout);
961
962         spin_lock(&rpc->crpc_lock);
963
964         rpc->crpc_timeout = 0;
965         srpc_abort_rpc(rpc, -ETIMEDOUT);
966
967         spin_unlock(&rpc->crpc_lock);
968
969         spin_lock(&srpc_data.rpc_glock);
970         srpc_data.rpc_counters.rpcs_expired++;
971         spin_unlock(&srpc_data.rpc_glock);
972         return;
973 }
974
975 inline void
976 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
977 {
978         stt_timer_t *timer = &rpc->crpc_timer;
979
980         if (rpc->crpc_timeout == 0) return;
981
982         CFS_INIT_LIST_HEAD(&timer->stt_list);
983         timer->stt_data    = rpc;
984         timer->stt_func    = srpc_client_rpc_expired;
985         timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 
986                                           cfs_time_current_sec());
987         stt_add_timer(timer);
988         return;
989 }
990
991 /* 
992  * Called with rpc->crpc_lock held.
993  *
994  * Upon exit the RPC expiry timer is not queued and the handler is not
995  * running on any CPU. */
996 void
997 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
998 {     
999         /* timer not planted or already exploded */
1000         if (rpc->crpc_timeout == 0) return;
1001
1002         /* timer sucessfully defused */
1003         if (stt_del_timer(&rpc->crpc_timer)) return;
1004
1005 #ifdef __KERNEL__
1006         /* timer detonated, wait for it to explode */
1007         while (rpc->crpc_timeout != 0) {
1008                 spin_unlock(&rpc->crpc_lock);
1009
1010                 cfs_schedule(); 
1011
1012                 spin_lock(&rpc->crpc_lock);
1013         }
1014 #else
1015         LBUG(); /* impossible in single-threaded runtime */
1016 #endif
1017         return;
1018 }
1019
1020 void
1021 srpc_check_sends (srpc_peer_t *peer, int credits)
1022 {
1023         struct list_head  *q;
1024         srpc_client_rpc_t *rpc;
1025
1026         LASSERT (credits >= 0);
1027         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1028
1029         spin_lock(&peer->stp_lock);
1030         peer->stp_credits += credits;
1031
1032         while (peer->stp_credits) {
1033                 if (!list_empty(&peer->stp_ctl_rpcq))
1034                         q = &peer->stp_ctl_rpcq;
1035                 else if (!list_empty(&peer->stp_rpcq))
1036                         q = &peer->stp_rpcq;
1037                 else
1038                         break;
1039
1040                 peer->stp_credits--;
1041
1042                 rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
1043                 list_del_init(&rpc->crpc_privl);
1044                 srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
1045
1046                 swi_schedule_workitem(&rpc->crpc_wi);
1047         }
1048
1049         spin_unlock(&peer->stp_lock);
1050         return;
1051 }
1052
1053 void
1054 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
1055 {
1056         swi_workitem_t *wi = &rpc->crpc_wi;
1057         srpc_peer_t    *peer = rpc->crpc_peer;
1058
1059         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
1060
1061         spin_lock(&rpc->crpc_lock);
1062
1063         rpc->crpc_closed = 1;
1064         if (rpc->crpc_status == 0)
1065                 rpc->crpc_status = status;
1066
1067         srpc_del_client_rpc_timer(rpc);
1068
1069         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
1070                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1071                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1072                 swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1073
1074         /*
1075          * No one can schedule me now since:
1076          * - RPC timer has been defused.
1077          * - all LNet events have been fired.
1078          * - crpc_closed has been set, preventing srpc_abort_rpc from 
1079          *   scheduling me.
1080          * Cancel pending schedules and prevent future schedule attempts:
1081          */
1082         LASSERT (!srpc_event_pending(rpc));
1083         swi_kill_workitem(wi);
1084
1085         spin_unlock(&rpc->crpc_lock);
1086
1087         (*rpc->crpc_done) (rpc);
1088
1089         if (peer != NULL)
1090                 srpc_check_sends(peer, 1);
1091         return;
1092 }
1093
1094 /* sends an outgoing RPC */
1095 int
1096 srpc_send_rpc (swi_workitem_t *wi)
1097 {
1098         int                rc = 0;
1099         srpc_client_rpc_t *rpc = wi->wi_data;
1100         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1101         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1102
1103         LASSERT (rpc != NULL);
1104         LASSERT (wi == &rpc->crpc_wi);
1105
1106         spin_lock(&rpc->crpc_lock);
1107
1108         if (rpc->crpc_aborted) {
1109                 spin_unlock(&rpc->crpc_lock);
1110                 goto abort;
1111         }
1112
1113         spin_unlock(&rpc->crpc_lock);
1114
1115         switch (wi->wi_state) {
1116         default:
1117                 LBUG ();
1118         case SWI_STATE_NEWBORN:
1119                 LASSERT (!srpc_event_pending(rpc));
1120
1121                 rc = srpc_prepare_reply(rpc);
1122                 if (rc != 0) {
1123                         srpc_client_rpc_done(rpc, rc);
1124                         return 1;
1125                 }
1126
1127                 rc = srpc_prepare_bulk(rpc);
1128                 if (rc != 0) break;
1129
1130                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1131                 rc = srpc_send_request(rpc);
1132                 break;
1133
1134         case SWI_STATE_REQUEST_SUBMITTED:
1135                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1136                  * order; however, they're processed in a strict order: 
1137                  * rqt, rpy, and bulk. */
1138                 if (!rpc->crpc_reqstev.ev_fired) break;
1139
1140                 rc = rpc->crpc_reqstev.ev_status;
1141                 if (rc != 0) break;
1142
1143                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1144                 /* perhaps more events, fall thru */
1145         case SWI_STATE_REQUEST_SENT: {
1146                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1147
1148                 if (!rpc->crpc_replyev.ev_fired) break;
1149
1150                 rc = rpc->crpc_replyev.ev_status;
1151                 if (rc != 0) break;
1152
1153                 if ((reply->msg_type != type && 
1154                      reply->msg_type != __swab32(type)) ||
1155                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1156                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1157                         CWARN ("Bad message from %s: type %u (%d expected),"
1158                                " magic %u (%d expected).\n",
1159                                libcfs_id2str(rpc->crpc_dest),
1160                                reply->msg_type, type,
1161                                reply->msg_magic, SRPC_MSG_MAGIC);
1162                         rc = -EBADMSG;
1163                         break;
1164                 }
1165
1166                 if (do_bulk && reply->msg_body.reply.status != 0) {
1167                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1168                                "case peer didn't initiate bulk transfer\n",
1169                                reply->msg_body.reply.status,
1170                                libcfs_id2str(rpc->crpc_dest));
1171                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1172                 }
1173
1174                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1175         }
1176         case SWI_STATE_REPLY_RECEIVED:
1177                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1178
1179                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1180
1181                 /* Bulk buffer was unlinked due to remote error. Clear error
1182                  * since reply buffer still contains valid data.
1183                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1184                  * remote error. */
1185                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1186                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1187                         rc = 0;
1188
1189                 wi->wi_state = SWI_STATE_DONE;
1190                 srpc_client_rpc_done(rpc, rc);
1191                 return 1;
1192         }
1193
1194         if (rc != 0) {
1195                 spin_lock(&rpc->crpc_lock);
1196                 srpc_abort_rpc(rpc, rc);
1197                 spin_unlock(&rpc->crpc_lock);
1198         }
1199
1200 abort:
1201         if (rpc->crpc_aborted) {
1202                 LNetMDUnlink(rpc->crpc_reqstmdh);
1203                 LNetMDUnlink(rpc->crpc_replymdh);
1204                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1205
1206                 if (!srpc_event_pending(rpc)) {
1207                         srpc_client_rpc_done(rpc, -EINTR);
1208                         return 1;
1209                 }
1210         }
1211         return 0;
1212 }
1213
1214 srpc_client_rpc_t *
1215 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1216                         int nbulkiov, int bulklen,
1217                         void (*rpc_done)(srpc_client_rpc_t *),
1218                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1219 {
1220         srpc_client_rpc_t *rpc;
1221
1222         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1223                                    crpc_bulk.bk_iovs[nbulkiov]));
1224         if (rpc == NULL)
1225                 return NULL;
1226
1227         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1228                              bulklen, rpc_done, rpc_fini, priv);
1229         return rpc;
1230 }
1231
1232 /* called with rpc->crpc_lock held */
1233 static inline void
1234 srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
1235 {
1236         int service = rpc->crpc_service;
1237
1238         LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
1239         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1240
1241         rpc->crpc_peer = peer;
1242
1243         spin_lock(&peer->stp_lock);
1244
1245         /* Framework RPCs that alter session state shall take precedence
1246          * over test RPCs and framework query RPCs */
1247         if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
1248             service != SRPC_SERVICE_DEBUG &&
1249             service != SRPC_SERVICE_QUERY_STAT)
1250                 list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
1251         else
1252                 list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
1253
1254         srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
1255         spin_unlock(&peer->stp_lock);
1256         return;
1257 }
1258
1259 /* called with rpc->crpc_lock held */
1260 void
1261 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1262 {
1263         srpc_peer_t *peer = rpc->crpc_peer;
1264
1265         LASSERT (why != 0);
1266
1267         if (rpc->crpc_aborted || /* already aborted */
1268             rpc->crpc_closed)    /* callback imminent */
1269                 return;
1270
1271         CDEBUG (D_NET,
1272                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1273                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1274                 swi_state2str(rpc->crpc_wi.wi_state), why);
1275
1276         rpc->crpc_aborted = 1;
1277         rpc->crpc_status  = why;
1278
1279         if (peer != NULL) {
1280                 spin_lock(&peer->stp_lock);
1281
1282                 if (!list_empty(&rpc->crpc_privl)) { /* still queued */
1283                         list_del_init(&rpc->crpc_privl);
1284                         srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
1285                         rpc->crpc_peer = NULL;       /* no credit taken */
1286                 }
1287
1288                 spin_unlock(&peer->stp_lock);
1289         }
1290
1291         swi_schedule_workitem(&rpc->crpc_wi);
1292         return;
1293 }
1294
1295 /* called with rpc->crpc_lock held */
1296 void
1297 srpc_post_rpc (srpc_client_rpc_t *rpc)
1298 {
1299         srpc_peer_t *peer;
1300
1301         LASSERT (!rpc->crpc_aborted);
1302         LASSERT (rpc->crpc_peer == NULL);
1303         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1304         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1305
1306         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1307                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1308                 rpc->crpc_timeout);
1309
1310         srpc_add_client_rpc_timer(rpc);
1311
1312         peer = srpc_nid2peer(rpc->crpc_dest.nid);
1313         if (peer == NULL) {
1314                 srpc_abort_rpc(rpc, -ENOMEM);
1315                 return;
1316         }
1317
1318         srpc_queue_rpc(peer, rpc);
1319
1320         spin_unlock(&rpc->crpc_lock);
1321         srpc_check_sends(peer, 0);
1322         spin_lock(&rpc->crpc_lock);
1323         return;
1324 }
1325
1326
1327 int
1328 srpc_send_reply (srpc_server_rpc_t *rpc)
1329 {
1330         srpc_event_t   *ev = &rpc->srpc_ev;
1331         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1332         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1333         srpc_service_t *sv = rpc->srpc_service;
1334         __u64           rpyid;
1335         int             rc;
1336
1337         LASSERT (buffer != NULL);
1338         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1339
1340         spin_lock(&sv->sv_lock);
1341
1342         if (!sv->sv_shuttingdown &&
1343             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1344                 /* Repost buffer before replying since test client
1345                  * might send me another RPC once it gets the reply */
1346                 if (srpc_service_post_buffer(sv, buffer) != 0)
1347                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1348                 rpc->srpc_reqstbuf = NULL;
1349         }
1350
1351         spin_unlock(&sv->sv_lock);
1352
1353         ev->ev_fired = 0;
1354         ev->ev_data  = rpc;
1355         ev->ev_type  = SRPC_REPLY_SENT;
1356
1357         msg->msg_magic   = SRPC_MSG_MAGIC;
1358         msg->msg_version = SRPC_MSG_VERSION;
1359         msg->msg_type    = srpc_service2reply(sv->sv_id);
1360
1361         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1362                                    sizeof(*msg), LNET_MD_OP_PUT,
1363                                    rpc->srpc_peer, rpc->srpc_self,
1364                                    &rpc->srpc_replymdh, ev);
1365         if (rc != 0)
1366                 ev->ev_fired = 1;  /* no more event expected */
1367         return rc;
1368 }
1369
1370 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1371 void 
1372 srpc_lnet_ev_handler (lnet_event_t *ev)
1373 {
1374         srpc_event_t      *rpcev = ev->md.user_ptr;
1375         srpc_client_rpc_t *crpc;
1376         srpc_server_rpc_t *srpc;
1377         srpc_buffer_t     *buffer;
1378         srpc_service_t    *sv;
1379         srpc_msg_t        *msg;
1380         srpc_msg_type_t    type;
1381
1382         LASSERT (!in_interrupt());
1383
1384         if (ev->status != 0) {
1385                 spin_lock(&srpc_data.rpc_glock);
1386                 srpc_data.rpc_counters.errors++;
1387                 spin_unlock(&srpc_data.rpc_glock);
1388         }
1389
1390         rpcev->ev_lnet = ev->type;
1391
1392         switch (rpcev->ev_type) {
1393         default:
1394                 LBUG ();
1395         case SRPC_REQUEST_SENT:
1396                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1397                         spin_lock(&srpc_data.rpc_glock);
1398                         srpc_data.rpc_counters.rpcs_sent++;
1399                         spin_unlock(&srpc_data.rpc_glock);
1400                 }
1401         case SRPC_REPLY_RCVD:
1402         case SRPC_BULK_REQ_RCVD:
1403                 crpc = rpcev->ev_data;
1404
1405                 LASSERT (rpcev == &crpc->crpc_reqstev ||
1406                          rpcev == &crpc->crpc_replyev ||
1407                          rpcev == &crpc->crpc_bulkev);
1408
1409                 spin_lock(&crpc->crpc_lock);
1410
1411                 LASSERT (rpcev->ev_fired == 0);
1412                 rpcev->ev_fired  = 1;
1413                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1414                                                 -EINTR : ev->status;
1415                 swi_schedule_workitem(&crpc->crpc_wi);
1416
1417                 spin_unlock(&crpc->crpc_lock);
1418                 break;
1419
1420         case SRPC_REQUEST_RCVD:
1421                 sv = rpcev->ev_data;
1422
1423                 LASSERT (rpcev == &sv->sv_ev);
1424
1425                 spin_lock(&sv->sv_lock);
1426
1427                 LASSERT (ev->unlinked);
1428                 LASSERT (ev->type == LNET_EVENT_PUT ||
1429                          ev->type == LNET_EVENT_UNLINK);
1430                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1431                          sv->sv_shuttingdown);
1432
1433                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1434                 buffer->buf_peer = ev->initiator;
1435                 buffer->buf_self = ev->target.nid;
1436
1437                 sv->sv_nposted_msg--;
1438                 LASSERT (sv->sv_nposted_msg >= 0);
1439
1440                 if (sv->sv_shuttingdown) {
1441                         /* Leave buffer on sv->sv_posted_msgq since 
1442                          * srpc_finish_service needs to traverse it. */
1443                         spin_unlock(&sv->sv_lock);
1444                         break;
1445                 }
1446
1447                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1448                 msg = &buffer->buf_msg;
1449                 type = srpc_service2request(sv->sv_id);
1450
1451                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1452                     (msg->msg_type != type && 
1453                      msg->msg_type != __swab32(type)) ||
1454                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1455                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1456                         CERROR ("Dropping RPC (%s) from %s: "
1457                                 "status %d mlength %d type %u magic %u.\n",
1458                                 sv->sv_name, libcfs_id2str(ev->initiator),
1459                                 ev->status, ev->mlength,
1460                                 msg->msg_type, msg->msg_magic);
1461
1462                         /* NB might drop sv_lock in srpc_service_recycle_buffer,
1463                          * sv_nposted_msg++ as an implicit reference to prevent
1464                          * sv from disappearing under me */
1465                         sv->sv_nposted_msg++;
1466                         srpc_service_recycle_buffer(sv, buffer);
1467                         sv->sv_nposted_msg--;
1468                         spin_unlock(&sv->sv_lock);
1469
1470                         if (ev->status == 0) { /* status!=0 counted already */
1471                                 spin_lock(&srpc_data.rpc_glock);
1472                                 srpc_data.rpc_counters.errors++;
1473                                 spin_unlock(&srpc_data.rpc_glock);
1474                         }
1475                         break;
1476                 }
1477
1478                 if (!list_empty(&sv->sv_free_rpcq)) {
1479                         srpc = list_entry(sv->sv_free_rpcq.next,
1480                                           srpc_server_rpc_t, srpc_list);
1481                         list_del(&srpc->srpc_list);
1482
1483                         srpc_init_server_rpc(srpc, sv, buffer);
1484                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1485                         srpc_schedule_server_rpc(srpc);
1486                 } else {
1487                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1488                 }
1489
1490                 spin_unlock(&sv->sv_lock);
1491
1492                 spin_lock(&srpc_data.rpc_glock);
1493                 srpc_data.rpc_counters.rpcs_rcvd++;
1494                 spin_unlock(&srpc_data.rpc_glock);
1495                 break;
1496
1497         case SRPC_BULK_GET_RPLD:
1498                 LASSERT (ev->type == LNET_EVENT_SEND ||
1499                          ev->type == LNET_EVENT_REPLY ||
1500                          ev->type == LNET_EVENT_UNLINK);
1501
1502                 if (ev->type == LNET_EVENT_SEND && 
1503                     ev->status == 0 && !ev->unlinked)
1504                         break; /* wait for the final LNET_EVENT_REPLY */
1505
1506         case SRPC_BULK_PUT_SENT:
1507                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1508                         spin_lock(&srpc_data.rpc_glock);
1509
1510                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1511                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1512                         else
1513                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1514
1515                         spin_unlock(&srpc_data.rpc_glock);
1516                 }
1517         case SRPC_REPLY_SENT:
1518                 srpc = rpcev->ev_data;
1519                 sv = srpc->srpc_service;
1520
1521                 LASSERT (rpcev == &srpc->srpc_ev);
1522
1523                 spin_lock(&sv->sv_lock);
1524                 rpcev->ev_fired  = 1;
1525                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 
1526                                                 -EINTR : ev->status;
1527                 srpc_schedule_server_rpc(srpc);
1528                 spin_unlock(&sv->sv_lock);
1529                 break;
1530         }
1531
1532         return;
1533 }
1534
1535 #ifndef __KERNEL__
1536
1537 int
1538 srpc_check_event (int timeout)
1539 {
1540         lnet_event_t ev;
1541         int          rc;
1542         int          i;
1543
1544         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1545                         timeout * 1000, &ev, &i);
1546         if (rc == 0) return 0;
1547         
1548         LASSERT (rc == -EOVERFLOW || rc == 1);
1549         
1550         /* We can't affort to miss any events... */
1551         if (rc == -EOVERFLOW) {
1552                 CERROR ("Dropped an event!!!\n");
1553                 abort();
1554         }
1555                 
1556         srpc_lnet_ev_handler(&ev);
1557         return 1;
1558 }
1559
1560 #endif
1561
1562 int
1563 srpc_startup (void)
1564 {
1565         int i;
1566         int rc;
1567
1568         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1569         spin_lock_init(&srpc_data.rpc_glock);
1570
1571         /* 1 second pause to avoid timestamp reuse */
1572         cfs_pause(cfs_time_seconds(1));
1573         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1574
1575         srpc_data.rpc_state = SRPC_STATE_NONE;
1576
1577         LIBCFS_ALLOC(srpc_data.rpc_peers,
1578                      sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1579         if (srpc_data.rpc_peers == NULL) {
1580                 CERROR ("Failed to alloc peer hash.\n");
1581                 return -ENOMEM;
1582         }
1583
1584         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
1585                 CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
1586
1587 #ifdef __KERNEL__
1588         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1589 #else
1590         rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
1591 #endif
1592         if (rc < 0) {
1593                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1594                 LIBCFS_FREE(srpc_data.rpc_peers,
1595                             sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1596                 return rc;
1597         }
1598
1599         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1600
1601         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1602 #ifdef __KERNEL__
1603         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1604 #else
1605         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1606 #endif
1607         if (rc != 0) {
1608                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1609                 goto bail;
1610         }
1611
1612         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1613         LASSERT (rc == 0);
1614
1615         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1616
1617         rc = swi_startup();
1618         if (rc != 0)
1619                 goto bail;
1620
1621         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1622
1623         rc = stt_startup();
1624
1625 bail:
1626         if (rc != 0)
1627                 srpc_shutdown();
1628         else
1629                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1630
1631         return rc;
1632 }
1633
1634 void
1635 srpc_shutdown (void)
1636 {
1637         int i;
1638         int rc;
1639         int state;
1640
1641         state = srpc_data.rpc_state;
1642         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1643
1644         switch (state) {
1645         default:
1646                 LBUG ();
1647         case SRPC_STATE_RUNNING:
1648                 spin_lock(&srpc_data.rpc_glock);
1649
1650                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1651                         srpc_service_t *sv = srpc_data.rpc_services[i];
1652
1653                         LASSERTF (sv == NULL,
1654                                   "service not empty: id %d, name %s\n",
1655                                   i, sv->sv_name);
1656                 }
1657
1658                 spin_unlock(&srpc_data.rpc_glock);
1659
1660                 stt_shutdown();
1661
1662         case SRPC_STATE_WI_INIT:
1663                 swi_shutdown();
1664
1665         case SRPC_STATE_EQ_INIT:
1666                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1667                 LASSERT (rc == 0);
1668                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1669                 LASSERT (rc == 0); /* the EQ should have no user by now */
1670
1671         case SRPC_STATE_NI_INIT:
1672                 LNetNIFini();
1673                 break;
1674         }
1675
1676         /* srpc_peer_t's are kept in hash until shutdown */
1677         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
1678                 srpc_peer_t *peer;
1679
1680                 while (!list_empty(&srpc_data.rpc_peers[i])) {
1681                         peer = list_entry(srpc_data.rpc_peers[i].next,
1682                                           srpc_peer_t, stp_list);
1683                         list_del(&peer->stp_list);
1684
1685                         LASSERT (list_empty(&peer->stp_rpcq));
1686                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
1687                         LASSERT (peer->stp_credits == SRPC_PEER_CREDITS);
1688
1689                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
1690                 }
1691         }
1692
1693         LIBCFS_FREE(srpc_data.rpc_peers,
1694                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1695         return;
1696 }