Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/version.h>
23 #include <linux/module.h>
24 #include <linux/fs.h>
25
26 #define DEBUG_SUBSYSTEM S_PTLBD
27
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_debug.h>
31 #include <linux/lprocfs_status.h>
32 #include <linux/obd_ptlbd.h>
33
34 static __u32 get_next_xid(struct obd_import *imp)
35 {
36         unsigned long flags;
37         __u32 xid;
38         spin_lock_irqsave(&imp->imp_lock, flags);
39         xid = ++imp->imp_last_xid;
40         spin_unlock_irqrestore(&imp->imp_lock, flags);
41         return xid;
42 }
43
44 static int ptlbd_brw_callback(struct obd_brw_set *set, int phase)
45 {
46         ENTRY;
47         RETURN(0);
48 }
49
50 static void decref_bulk_desc(void *data)
51 {
52         struct ptlrpc_bulk_desc *desc = data;
53         ENTRY;
54
55         ptlrpc_bulk_decref(desc);
56         EXIT;
57 }
58
59 /*  this is the callback function which is invoked by the Portals
60  *  event handler associated with the bulk_sink queue and bulk_source queue. 
61  */
62 static void ptlbd_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
63 {
64         ENTRY;
65
66         LASSERT(desc->bd_brw_set != NULL);
67         LASSERT(desc->bd_brw_set->brw_callback != NULL);
68
69         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
70
71         prepare_work(&desc->bd_queue, decref_bulk_desc, desc);
72         schedule_work(&desc->bd_queue);
73
74         EXIT;
75 }
76
77
78 int ptlbd_write_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
79                 struct buffer_head *first_bh, unsigned int page_count)
80 {
81         struct obd_import *imp = &ptlbd->bd_import;
82         struct ptlbd_op *op;
83         struct ptlbd_niob *niob, *niobs;
84         struct ptlbd_rsp *rsp;
85         struct ptlrpc_request *req;
86         struct ptlrpc_bulk_desc *desc;
87         struct buffer_head *bh;
88         int rc, size[2];
89         struct obd_brw_set *set;
90         ENTRY;
91
92         size[0] = sizeof(struct ptlbd_op);
93         size[1] = page_count * sizeof(struct ptlbd_niob);
94
95         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
96         if (!req)
97                 GOTO(out, rc = -ENOMEM);
98         /* XXX might not need these */
99         req->rq_request_portal = PTLBD_REQUEST_PORTAL;
100         req->rq_reply_portal = PTLBD_REPLY_PORTAL;
101
102         op = lustre_msg_buf(req->rq_reqmsg, 0);
103         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
104
105         /* XXX pack */
106         op->op_cmd = cmd;
107         op->op_lun = 0;
108         op->op_niob_cnt = page_count;
109         op->op__padding = 0;
110         op->op_block_cnt = page_count;
111
112         desc = ptlrpc_prep_bulk(imp->imp_connection);
113         if ( desc == NULL )
114                 GOTO(out_req, rc = -ENOMEM);
115         desc->bd_portal = PTLBD_BULK_PORTAL;
116         desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
117
118         /* XXX someone needs to free this */
119         set = obd_brw_set_new();
120         if (set == NULL)
121                 GOTO(out_desc, rc = -ENOMEM);
122
123         set->brw_callback = ptlbd_brw_callback;
124  
125 #if 0
126         xid = get_next_xid(imp);
127 #endif
128
129         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
130 #if 0
131                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
132                 if (bulk == NULL)
133                         GOTO(out_set, rc = -ENOMEM);
134 #endif
135
136 #if 0
137                 niob->n_xid = xid;
138 #endif
139                 niob->n_block_nr = bh->b_blocknr;
140                 niob->n_offset = bh_offset(bh);
141                 niob->n_length = bh->b_size;
142
143
144 #if 0
145                 bulk->bp_xid = xid;
146                 bulk->bp_buf = bh->b_data;
147                 bulk->bp_page = bh->b_page;
148                 bulk->bp_buflen = bh->b_size;
149 #endif
150         }
151
152
153         size[0] = sizeof(struct ptlbd_rsp);
154         size[1] = sizeof(struct ptlbd_niob) * page_count;
155         req->rq_replen = lustre_msg_size(2, size);
156
157         /* XXX find out how we're really supposed to manage levels */
158         req->rq_level = imp->imp_level;
159         rc = ptlrpc_queue_wait(req);
160
161         rsp = lustre_msg_buf(req->rq_repmsg, 0);
162
163         niob = lustre_msg_buf(req->rq_repmsg, 1);
164         /* XXX check that op->num matches ours */
165         for ( bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
166                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
167                 if (bulk == NULL)
168                         GOTO(out_set, rc = -ENOMEM);
169
170                 bulk->bp_xid = niob->n_xid;
171                 bulk->bp_page = bh->b_page;
172                 bulk->bp_buf = bh->b_data;
173                 bulk->bp_buflen = bh->b_size;
174         }
175
176         obd_brw_set_add(set, desc);
177         rc = ptlrpc_send_bulk(desc);
178
179         /* if there's an error, no brw_finish called, just like
180          * osc_brw_read */
181
182         GOTO(out_req, rc);
183
184 out_set:
185         obd_brw_set_free(set);
186 out_desc:
187         ptlrpc_bulk_decref(desc);
188 out_req:
189         ptlrpc_req_finished(req);
190 out:
191         RETURN(rc);
192 }
193
194 int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
195                 struct buffer_head *first_bh, unsigned int page_count)
196 {
197         struct obd_import *imp = &ptlbd->bd_import;
198         struct ptlbd_op *op;
199         struct ptlbd_niob *niob, *niobs;
200         struct ptlbd_rsp *rsp;
201         struct ptlrpc_request *req;
202         struct ptlrpc_bulk_desc *desc;
203         struct buffer_head *bh;
204         int rc, rep_size, size[2];
205         struct obd_brw_set *set;
206         __u32 xid;
207         ENTRY;
208
209         size[0] = sizeof(struct ptlbd_op);
210         size[1] = page_count * sizeof(struct ptlbd_niob);
211
212         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
213         if (!req)
214                 GOTO(out, rc = -ENOMEM);
215         /* XXX might not need these? */
216         req->rq_request_portal = PTLBD_REQUEST_PORTAL;
217         req->rq_reply_portal = PTLBD_REPLY_PORTAL;
218
219         op = lustre_msg_buf(req->rq_reqmsg, 0);
220         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
221
222         /* XXX pack */
223         op->op_cmd = cmd;
224         op->op_lun = 0;
225         op->op_niob_cnt = page_count;
226         op->op__padding = 0;
227         op->op_block_cnt = page_count;
228
229         desc = ptlrpc_prep_bulk(imp->imp_connection);
230         if ( desc == NULL )
231                 GOTO(out_req, rc = -ENOMEM);
232         desc->bd_portal = PTLBD_BULK_PORTAL;
233         desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
234
235         /* XXX someone needs to free this */
236         set = obd_brw_set_new();
237         if (set == NULL)
238                 GOTO(out_desc, rc = -ENOMEM);
239
240         set->brw_callback = ptlbd_brw_callback;
241
242         xid = get_next_xid(imp);
243
244         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
245                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
246                 if (bulk == NULL)
247                         GOTO(out_set, rc = -ENOMEM);
248
249                 niob->n_xid = xid;
250                 niob->n_block_nr = bh->b_blocknr;
251                 niob->n_offset = bh_offset(bh);
252                 niob->n_length = bh->b_size;
253
254                 bulk->bp_xid = xid;
255                 bulk->bp_buf = bh->b_data;
256                 bulk->bp_page = bh->b_page;
257                 bulk->bp_buflen = bh->b_size;
258         }
259
260         /* XXX put in OBD_FAIL_CHECK for ptlbd? */
261         rc = ptlrpc_register_bulk(desc);
262         if (rc)
263                 GOTO(out_set, rc);
264
265         obd_brw_set_add(set, desc);
266
267         rep_size = sizeof(struct ptlbd_rsp);
268         req->rq_replen = lustre_msg_size(1, &rep_size);
269
270         /* XXX find out how we're really supposed to manage levels */
271         req->rq_level = imp->imp_level;
272         rc = ptlrpc_queue_wait(req);
273
274         rsp = lustre_msg_buf(req->rq_repmsg, 0);
275
276         /* if there's an error, no brw_finish called, just like
277          * osc_brw_read */
278
279         GOTO(out_req, rc);
280
281 out_set:
282         obd_brw_set_free(set);
283 out_desc:
284         ptlrpc_bulk_decref(desc);
285 out_req:
286         ptlrpc_req_finished(req);
287 out:
288         RETURN(rc);
289 }
290
291 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
292                 struct buffer_head *first_bh)
293 {
294         unsigned int page_count = 0;
295         struct buffer_head *bh;
296         int rc;
297         ENTRY;
298
299         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
300                 page_count++;
301
302         switch (cmd) {
303                 case PTLBD_READ:
304                         rc = ptlbd_read_put_req(ptlbd, cmd, 
305                                         first_bh, page_count);
306                         break;
307                 case PTLBD_WRITE:
308                         rc = ptlbd_write_put_req(ptlbd, cmd, 
309                                         first_bh, page_count);
310                         break;
311                 default:
312                         rc = -EINVAL;
313                         break;
314         };
315
316         RETURN(rc);
317 }
318
319 static int ptlbd_bulk_timeout(void *data)
320 {
321 /*        struct ptlrpc_bulk_desc *desc = data;*/
322         ENTRY;
323
324         CERROR("ugh, timed out\n");
325
326         RETURN(1);
327 }
328
329 #define SILLY_MAX 2048
330 static struct page *pages[SILLY_MAX] = {NULL,};
331
332 static struct page * fake_page(int block_nr)
333 {
334         if ( block_nr >= SILLY_MAX )
335                 return NULL;
336
337         if (pages[block_nr] == NULL) {
338                 void *vaddr = (void *)get_free_page(GFP_KERNEL);
339                 pages[block_nr] = virt_to_page(vaddr);
340         } 
341         return pages[block_nr];
342 }
343
344 static int ptlbd_put_write(struct ptlrpc_request *req)
345 {
346         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
347         struct ptlbd_op *op;
348         struct ptlbd_niob *reply_niob, *request_niob;
349         struct ptlbd_rsp *rsp;
350         struct ptlrpc_bulk_desc *desc;
351         struct ptlrpc_service *srv;
352         struct l_wait_info lwi;
353         int size[2];
354         int i, page_count, rc;
355         __u32 xid;
356
357         op = lustre_msg_buf(req->rq_reqmsg, 0);
358         request_niob = lustre_msg_buf(req->rq_reqmsg, 1);
359         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
360
361         size[0] = sizeof(struct ptlbd_rsp);
362         size[1] = sizeof(struct ptlbd_niob) * page_count;
363         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
364         if (rc)
365                 GOTO(out, rc);
366         reply_niob = lustre_msg_buf(req->rq_repmsg, 1);
367
368         desc = ptlrpc_prep_bulk(req->rq_connection);
369         if (desc == NULL)
370                 GOTO(out, rc = -ENOMEM);
371         desc->bd_ptl_ev_hdlr = NULL;
372         desc->bd_portal = PTLBD_BULK_PORTAL;
373         memcpy(&(desc->bd_conn), &conn, sizeof(conn)); /* XXX what? */
374
375         srv = req->rq_obd->u.ptlbd.ptlbd_service;
376         spin_lock(&srv->srv_lock);
377         xid = srv->srv_xid++;                   /* single xid for all pages */
378         spin_unlock(&srv->srv_lock);
379
380         for ( i = 0; i < page_count; i++) {
381                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
382                 if (bulk == NULL)
383                         GOTO(out_desc, rc = -ENOMEM);
384                         
385                 reply_niob[i] = request_niob[i];
386                 reply_niob[i].n_xid = xid;
387
388                 bulk->bp_xid = xid;
389                 bulk->bp_page = fake_page(request_niob[i].n_block_nr);
390                 bulk->bp_buf = page_address(bulk->bp_page);
391                 bulk->bp_buflen = request_niob[i].n_length;
392         }
393
394         rc = ptlrpc_register_bulk(desc);
395         if ( rc )
396                 GOTO(out_desc, rc);
397
398         rsp = lustre_msg_buf(req->rq_reqmsg, 0);
399         rsp->r_status = 42;
400         rsp->r_error_cnt = 13;
401         ptlrpc_reply(req->rq_svc, req);
402
403         /* this synchronization probably isn't good enough */
404         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
405         rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_RCVD, 
406                         &lwi);
407
408 out_desc:
409         ptlrpc_free_bulk(desc);
410 out:
411         RETURN(rc);
412 }
413
414 static int ptlbd_put_read(struct ptlrpc_request *req)
415 {
416         struct ptlbd_op *op;
417         struct ptlbd_niob *niob, *niobs;
418         struct ptlbd_rsp *rsp;
419         struct ptlrpc_bulk_desc *desc;
420         struct l_wait_info lwi;
421         int size[1];
422         int i, page_count, rc;
423
424         op = lustre_msg_buf(req->rq_reqmsg, 0);
425         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
426         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
427
428         desc = ptlrpc_prep_bulk(req->rq_connection);
429         if (desc == NULL)
430                 GOTO(out, rc = -ENOMEM);
431         desc->bd_portal = PTLBD_BULK_PORTAL;
432
433         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
434                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
435                 if (bulk == NULL)
436                         GOTO(out_bulk, rc = -ENOMEM);
437
438                 /* 
439                  * XXX what about the block number? 
440                  */
441                 bulk->bp_xid = niob->n_xid;
442                 bulk->bp_page = fake_page(niob->n_block_nr);
443                 bulk->bp_buf = page_address(bulk->bp_page);
444                 bulk->bp_buflen = niob->n_length;
445         }
446
447         rc = ptlrpc_send_bulk(desc);
448         if ( rc )
449                 GOTO(out_bulk, rc);
450
451         /* this synchronization probably isn't good enough */
452         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
453         rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, 
454                         &lwi);
455
456         size[0] = sizeof(struct ptlbd_rsp);
457         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
458         if ( rc )
459                 GOTO(out, rc);
460
461         rsp = lustre_msg_buf(req->rq_repmsg, 0);
462         if ( rsp == NULL )
463                 GOTO(out, rc = -EINVAL);
464
465         rsp->r_error_cnt = 42;
466         rsp->r_status = 69;
467
468         req->rq_status = 0; /* XXX */
469         ptlrpc_reply(req->rq_svc, req);
470
471 out_bulk:
472         ptlrpc_free_bulk(desc);
473 out:
474         RETURN(rc);
475 }
476
477
478 int ptlbd_parse_req(struct ptlrpc_request *req)
479 {
480         struct ptlbd_op *op;
481         int rc;
482         ENTRY;
483
484         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
485         if ( rc )
486                 RETURN(rc);
487
488         op = lustre_msg_buf(req->rq_reqmsg, 0);
489
490         switch(op->op_cmd) {
491                 case PTLBD_READ:
492                         ptlbd_put_read(req);
493                         break;
494                 case PTLBD_WRITE:
495                         ptlbd_put_write(req);
496                         break;
497                 default:
498                         CERROR("fix this %d\n", op->op_cmd);
499                         break;
500         }
501
502         RETURN(0);
503 }
504
505
506 #if 0
507 int ptlbd_bh_req(int cmd, struct ptlbd_state *st, struct buffer_head *first_bh)
508 {
509         struct obd_brw_set *set = NULL;
510         struct brw_page *pg = NULL;
511         struct buffer_head *bh;
512         int rc, i, pg_bytes = 0;
513         ENTRY;
514
515         for ( bh = first_bh ; bh ; bh = bh->b_reqnext ) 
516                 pg_bytes += sizeof(struct brw_page);
517
518         OBD_ALLOC(pg, pg_bytes);
519         if ( pg == NULL )
520                 GOTO(out, rc = -ENOMEM);
521
522         set = obd_brw_set_new();
523         if (set == NULL)
524                 GOTO(out, rc = -ENOMEM);
525
526         for ( i = 0, bh = first_bh ; bh ; bh = bh->b_reqnext, i++) {
527                 pg[i].pg = bh->b_page;
528                 pg[i].off = bh_offset(bh);
529                 pg[i].count = bh->b_size;
530                 pg[i].flag = 0;
531         }
532
533         set->brw_callback = ll_brw_sync_wait;
534         rc = obd_brw(cmd, /* lsm */NULL, num_pages, pg, set);
535         if ( rc )
536                 GOTO(out, rc);
537
538         rc = ll_brw_sync_wait(set, CB_PHASE_START);
539         if (rc)
540                 CERROR("error from callback: rc = %d\n", rc);
541
542 out:
543         if ( pg != NULL )
544                 OBD_FREE(pg, pg_bytes);
545         if ( set != NULL )
546                 obd_brw_set_free(set);
547
548         RETURN(rc); 
549 }
550 #endif