Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26
27 #define DEBUG_SUBSYSTEM S_PTLBD
28
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
34
35 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
36                 struct buffer_head *first_bh)
37 {
38         struct obd_import *imp = &ptlbd->bd_import;
39         struct ptlbd_op *op;
40         struct ptlbd_niob *niob, *niobs;
41         struct ptlbd_rsp *rsp;
42         struct ptlrpc_request *req;
43         struct ptlrpc_bulk_desc *desc;
44         struct buffer_head *bh;
45         unsigned long flags;
46         unsigned int page_count;
47         int rc, rep_size, size[2];
48         __u32 xid;
49         ENTRY;
50
51         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
52
53         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
54                 page_count++;
55
56         size[0] = sizeof(struct ptlbd_op);
57         size[1] = page_count * sizeof(struct ptlbd_niob);
58
59         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
60         if (!req)
61                 RETURN(-ENOMEM);
62
63         op = lustre_msg_buf(req->rq_reqmsg, 0);
64         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
65
66         /* XXX pack */
67         op->op_cmd = cmd;
68         op->op_lun = 0;
69         op->op_niob_cnt = page_count;
70         op->op__padding = 0;
71         op->op_block_cnt = page_count;
72
73         desc = ptlrpc_prep_bulk(imp->imp_connection);
74         if ( desc == NULL )
75                 GOTO(out_req, rc = -ENOMEM);
76         desc->bd_portal = PTLBD_BULK_PORTAL;
77         desc->bd_ptl_ev_hdlr = NULL;
78
79         spin_lock_irqsave(&imp->imp_lock, flags);
80         xid = ++imp->imp_last_xid;
81         spin_unlock_irqrestore(&imp->imp_lock, flags);
82
83         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
84                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
85                 if (bulk == NULL)
86                         GOTO(out_req, rc = -ENOMEM);
87
88                 niob->n_xid = xid;
89                 niob->n_block_nr = bh->b_blocknr;
90                 niob->n_offset = bh_offset(bh);
91                 niob->n_length = bh->b_size;
92
93                 bulk->bp_xid = xid;
94                 bulk->bp_buf = bh->b_data;
95                 bulk->bp_page = bh->b_page;
96                 bulk->bp_buflen = bh->b_size;
97         }
98
99         if ( cmd == PTLBD_READ )
100                 rc = ptlrpc_register_bulk_put(desc);
101         else
102                 rc = ptlrpc_register_bulk_get(desc);
103
104         if (rc)
105                 GOTO(out_desc, rc);
106
107         rep_size = sizeof(struct ptlbd_rsp);
108         req->rq_replen = lustre_msg_size(1, &rep_size);
109
110         /* XXX find out how we're really supposed to manage levels */
111         req->rq_level = imp->imp_level;
112         rc = ptlrpc_queue_wait(req);
113
114         if ( rc == 0 ) {
115                 rsp = lustre_msg_buf(req->rq_repmsg, 0);
116                 /* XXX do stuff */
117         }
118
119 out_desc:
120         ptlrpc_bulk_decref(desc);
121 out_req:
122         ptlrpc_req_finished(req);
123         RETURN(rc);
124 }
125
126 static int ptlbd_bulk_timeout(void *data)
127 {
128 /*        struct ptlrpc_bulk_desc *desc = data;*/
129         ENTRY;
130
131         CERROR("ugh, timed out\n");
132
133         RETURN(1);
134 }
135
136 void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
137                 int page_count, struct list_head *page_list)
138 {
139         mm_segment_t old_fs;
140         struct list_head *pos;
141         ENTRY;
142
143         old_fs = get_fs();
144         set_fs(KERNEL_DS);
145
146         list_for_each(pos, page_list) {
147                 ssize_t ret;
148                 struct page *page = list_entry(pos, struct page, list);
149                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
150                         niobs->n_offset;
151
152                 if ( op == PTLBD_READ )
153                         ret = filp->f_op->read(filp, page_address(page), 
154                                         niobs->n_length, &offset);
155                 else
156                         ret = filp->f_op->write(filp, page_address(page), 
157                                         niobs->n_length, &offset);
158
159                 niobs++;
160         }
161
162         set_fs(old_fs);
163         EXIT;
164 }
165
166 int ptlbd_parse_req(struct ptlrpc_request *req)
167 {
168         struct ptlbd_op *op;
169         struct ptlbd_niob *niob, *niobs;
170         struct ptlbd_rsp *rsp;
171         struct ptlrpc_bulk_desc *desc;
172         struct file *filp = req->rq_obd->u.ptlbd.filp;
173         struct l_wait_info lwi;
174         int size[1], wait_flag, i, page_count, rc;
175         struct list_head *pos, *n;
176         LIST_HEAD(tmp_pages);
177         ENTRY;
178
179         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
180         if ( rc )
181                 RETURN(rc);
182
183         op = lustre_msg_buf(req->rq_reqmsg, 0);
184         LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
185
186         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
187         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
188
189         desc = ptlrpc_prep_bulk(req->rq_connection);
190         if (desc == NULL)
191                 GOTO(out, rc = -ENOMEM);
192         desc->bd_ptl_ev_hdlr = NULL;
193         desc->bd_portal = PTLBD_BULK_PORTAL;
194
195         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
196                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
197                 if (bulk == NULL)
198                         GOTO(out_bulk, rc = -ENOMEM);
199
200                 bulk->bp_page = alloc_page(GFP_KERNEL);
201                 if (bulk->bp_page == NULL)
202                         GOTO(out_bulk, rc = -ENOMEM);
203                 list_add(&bulk->bp_page->list, &tmp_pages);
204
205                 /* 
206                  * XXX what about the block number? 
207                  */
208                 bulk->bp_xid = niob->n_xid;
209                 bulk->bp_buf = page_address(bulk->bp_page);
210                 bulk->bp_buflen = niob->n_length;
211         }
212
213         if ( op->op_cmd == PTLBD_READ ) {
214                 ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
215                 rc = ptlrpc_bulk_put(desc);
216                 wait_flag = PTL_BULK_FL_SENT;
217         } else {
218                 rc = ptlrpc_bulk_get(desc);
219                 wait_flag = PTL_BULK_FL_RCVD;
220         }
221
222         if ( rc )
223                 GOTO(out_bulk, rc);
224
225         /* this synchronization probably isn't good enough */
226         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
227         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
228
229         size[0] = sizeof(struct ptlbd_rsp);
230         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
231         if ( rc )
232                 GOTO(out, rc);
233
234         rsp = lustre_msg_buf(req->rq_repmsg, 0);
235         if ( rsp == NULL )
236                 GOTO(out, rc = -EINVAL);
237         
238         ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
239
240         rsp->r_error_cnt = 42;
241         rsp->r_status = 69;
242
243         req->rq_status = 0; /* XXX */
244         ptlrpc_reply(req->rq_svc, req);
245
246 out_bulk:
247         list_for_each_safe(pos, n, &tmp_pages) {
248                 struct page *page = list_entry(pos, struct page, list);
249                 list_del(&page->list);
250                 __free_page(page);
251         }
252         ptlrpc_bulk_decref(desc);
253 out:
254         RETURN(rc);
255 }