Whamcloud - gitweb
4daee83f650ab11213c595ce037b046dbe4a3236
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26
27 #define DEBUG_SUBSYSTEM S_PTLBD
28
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
34
35 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
36                 struct buffer_head *first_bh)
37 {
38         struct obd_import *imp = &ptlbd->bd_import;
39         struct ptlbd_op *op;
40         struct ptlbd_niob *niob, *niobs;
41         struct ptlbd_rsp *rsp;
42         struct ptlrpc_request *req;
43         struct ptlrpc_bulk_desc *desc;
44         struct buffer_head *bh;
45         unsigned int page_count;
46         int rc, rep_size, size[2];
47         __u32 xid;
48         ENTRY;
49
50         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
51
52         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
53                 page_count++;
54
55         size[0] = sizeof(struct ptlbd_op);
56         size[1] = page_count * sizeof(struct ptlbd_niob);
57
58         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
59         if (!req)
60                 RETURN(-ENOMEM);
61
62         op = lustre_msg_buf(req->rq_reqmsg, 0);
63         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
64
65         /* XXX pack */
66         op->op_cmd = cmd;
67         op->op_lun = 0;
68         op->op_niob_cnt = page_count;
69         op->op__padding = 0;
70         op->op_block_cnt = page_count;
71
72         desc = ptlrpc_prep_bulk(imp->imp_connection);
73         if ( desc == NULL )
74                 GOTO(out_req, rc = -ENOMEM);
75         desc->bd_portal = PTLBD_BULK_PORTAL;
76         desc->bd_ptl_ev_hdlr = NULL;
77
78         xid = ptlrpc_next_xid();
79
80         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
81                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
82                 if (bulk == NULL)
83                         GOTO(out_req, rc = -ENOMEM);
84
85                 niob->n_xid = xid;
86                 niob->n_block_nr = bh->b_blocknr;
87                 niob->n_offset = bh_offset(bh);
88                 niob->n_length = bh->b_size;
89
90                 bulk->bp_xid = xid;
91                 bulk->bp_buf = bh->b_data;
92                 bulk->bp_page = bh->b_page;
93                 bulk->bp_buflen = bh->b_size;
94         }
95
96         if ( cmd == PTLBD_READ )
97                 rc = ptlrpc_register_bulk_put(desc);
98         else
99                 rc = ptlrpc_register_bulk_get(desc);
100
101         if (rc)
102                 GOTO(out_desc, rc);
103
104         rep_size = sizeof(struct ptlbd_rsp);
105         req->rq_replen = lustre_msg_size(1, &rep_size);
106
107         /* XXX find out how we're really supposed to manage levels */
108         req->rq_level = imp->imp_level;
109         rc = ptlrpc_queue_wait(req);
110
111         if ( rc == 0 ) {
112                 rsp = lustre_msg_buf(req->rq_repmsg, 0);
113                 /* XXX do stuff */
114         }
115
116 out_desc:
117         ptlrpc_bulk_decref(desc);
118 out_req:
119         ptlrpc_req_finished(req);
120         RETURN(rc);
121 }
122
123 static int ptlbd_bulk_timeout(void *data)
124 {
125 /*        struct ptlrpc_bulk_desc *desc = data;*/
126         ENTRY;
127
128         CERROR("ugh, timed out\n");
129
130         RETURN(1);
131 }
132
133 void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
134                 int page_count, struct list_head *page_list)
135 {
136         mm_segment_t old_fs;
137         struct list_head *pos;
138         ENTRY;
139
140         old_fs = get_fs();
141         set_fs(KERNEL_DS);
142
143         list_for_each(pos, page_list) {
144                 ssize_t ret;
145                 struct page *page = list_entry(pos, struct page, list);
146                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
147                         niobs->n_offset;
148
149                 if ( op == PTLBD_READ )
150                         ret = filp->f_op->read(filp, page_address(page), 
151                                         niobs->n_length, &offset);
152                 else
153                         ret = filp->f_op->write(filp, page_address(page), 
154                                         niobs->n_length, &offset);
155
156                 niobs++;
157         }
158
159         set_fs(old_fs);
160         EXIT;
161 }
162
163 int ptlbd_parse_req(struct ptlrpc_request *req)
164 {
165         struct ptlbd_op *op;
166         struct ptlbd_niob *niob, *niobs;
167         struct ptlbd_rsp *rsp;
168         struct ptlrpc_bulk_desc *desc;
169         struct file *filp = req->rq_obd->u.ptlbd.filp;
170         struct l_wait_info lwi;
171         int size[1], wait_flag, i, page_count, rc;
172         struct list_head *pos, *n;
173         LIST_HEAD(tmp_pages);
174         ENTRY;
175
176         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
177         if ( rc )
178                 RETURN(rc);
179
180         op = lustre_msg_buf(req->rq_reqmsg, 0);
181         LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
182
183         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
184         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
185
186         desc = ptlrpc_prep_bulk(req->rq_connection);
187         if (desc == NULL)
188                 GOTO(out, rc = -ENOMEM);
189         desc->bd_ptl_ev_hdlr = NULL;
190         desc->bd_portal = PTLBD_BULK_PORTAL;
191
192         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
193                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
194                 if (bulk == NULL)
195                         GOTO(out_bulk, rc = -ENOMEM);
196
197                 bulk->bp_page = alloc_page(GFP_KERNEL);
198                 if (bulk->bp_page == NULL)
199                         GOTO(out_bulk, rc = -ENOMEM);
200                 list_add(&bulk->bp_page->list, &tmp_pages);
201
202                 /* 
203                  * XXX what about the block number? 
204                  */
205                 bulk->bp_xid = niob->n_xid;
206                 bulk->bp_buf = page_address(bulk->bp_page);
207                 bulk->bp_buflen = niob->n_length;
208         }
209
210         if ( op->op_cmd == PTLBD_READ ) {
211                 ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
212                 rc = ptlrpc_bulk_put(desc);
213                 wait_flag = PTL_BULK_FL_SENT;
214         } else {
215                 rc = ptlrpc_bulk_get(desc);
216                 wait_flag = PTL_BULK_FL_RCVD;
217         }
218
219         if ( rc )
220                 GOTO(out_bulk, rc);
221
222         /* this synchronization probably isn't good enough */
223         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
224         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
225
226         size[0] = sizeof(struct ptlbd_rsp);
227         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
228         if ( rc )
229                 GOTO(out, rc);
230
231         rsp = lustre_msg_buf(req->rq_repmsg, 0);
232         if ( rsp == NULL )
233                 GOTO(out, rc = -EINVAL);
234         
235         ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
236
237         rsp->r_error_cnt = 42;
238         rsp->r_status = 69;
239
240         req->rq_status = 0; /* XXX */
241         ptlrpc_reply(req->rq_svc, req);
242
243 out_bulk:
244         list_for_each_safe(pos, n, &tmp_pages) {
245                 struct page *page = list_entry(pos, struct page, list);
246                 list_del(&page->list);
247                 __free_page(page);
248         }
249         ptlrpc_bulk_decref(desc);
250 out:
251         RETURN(rc);
252 }