Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26
27 #define DEBUG_SUBSYSTEM S_PTLBD
28
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
34
35 #define RSP_OK       0
36 #define RSP_NOTOK   -1
37 #define RQ_OK        0
38
39 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
40                 struct request *blkreq)
41 {
42         struct buffer_head *first_bh = blkreq->bh;
43         struct obd_import *imp = &ptlbd->bd_import;
44         struct ptlbd_op *op;
45         struct ptlbd_niob *niob, *niobs;
46         struct ptlbd_rsp *rsp;
47         struct ptlrpc_request *req;
48         struct ptlrpc_bulk_desc *desc;
49         struct buffer_head *bh;
50         unsigned int page_count;
51         int rc, rep_size, size[2];
52         __u32 xid;
53         ENTRY;
54
55         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
56
57         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
58                 page_count++;
59
60         size[0] = sizeof(struct ptlbd_op);
61         size[1] = page_count * sizeof(struct ptlbd_niob);
62
63         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
64         if (!req)
65                 RETURN(-ENOMEM);
66
67         op = lustre_msg_buf(req->rq_reqmsg, 0);
68         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
69
70         /* XXX pack */
71         op->op_cmd = cmd;
72         op->op_lun = 0;
73         op->op_niob_cnt = page_count;
74         op->op__padding = 0;
75         op->op_block_cnt = page_count;
76
77         desc = ptlrpc_prep_bulk(imp->imp_connection);
78         if ( desc == NULL )
79                 GOTO(out_req, rc = -ENOMEM);
80         desc->bd_portal = PTLBD_BULK_PORTAL;
81         desc->bd_ptl_ev_hdlr = NULL;
82
83         xid = ptlrpc_next_xid();
84
85         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
86                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
87                 if (bulk == NULL)
88                         GOTO(out_req, rc = -ENOMEM);
89
90                 niob->n_xid = xid;
91                 niob->n_block_nr = bh->b_blocknr;
92                 niob->n_offset = bh_offset(bh);
93                 niob->n_length = bh->b_size;
94
95                 bulk->bp_xid = xid;
96                 bulk->bp_buf = bh->b_data;
97                 bulk->bp_page = bh->b_page;
98                 bulk->bp_buflen = bh->b_size;
99         }
100
101         if ( cmd == PTLBD_READ )
102                 rc = ptlrpc_register_bulk_put(desc);
103         else
104                 rc = ptlrpc_register_bulk_get(desc);
105
106         if (rc)
107                 GOTO(out_desc, rc);
108
109         rep_size = sizeof(struct ptlbd_rsp);
110         req->rq_replen = lustre_msg_size(1, &rep_size);
111
112         /* XXX find out how we're really supposed to manage levels */
113         req->rq_level = imp->imp_level;
114         rc = ptlrpc_queue_wait(req);
115
116         if ( rc != 0 ) {
117                 blkreq->errors++;
118                 GOTO(out_desc, rc);
119         }
120         rsp = lustre_msg_buf(req->rq_repmsg, 0);
121         if (rsp->r_status != RSP_OK) {
122                 blkreq->errors += rsp->r_error_cnt;
123         }
124
125 out_desc:
126         ptlrpc_bulk_decref(desc);
127 out_req:
128         ptlrpc_req_finished(req);
129         RETURN(rc);
130 }
131
132 static int ptlbd_bulk_timeout(void *data)
133 {
134 /*        struct ptlrpc_bulk_desc *desc = data;*/
135         ENTRY;
136
137         CERROR("ugh, timed out\n");
138
139         RETURN(1);
140 }
141
142 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
143                 int page_count, struct list_head *page_list)
144 {
145         mm_segment_t old_fs;
146         struct list_head *pos;
147         int status = RSP_OK;
148         ENTRY;
149
150         old_fs = get_fs();
151         set_fs(KERNEL_DS);
152
153         list_for_each(pos, page_list) {
154                 ssize_t ret;
155                 struct page *page = list_entry(pos, struct page, list);
156                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
157                         niobs->n_offset;
158                 if ( op == PTLBD_READ ) {
159                         if ((ret = filp->f_op->read(filp, page_address(page), 
160                              niobs->n_length, &offset)) != niobs->n_length)
161                                 status = ret;
162                                 goto out;             
163                 } else {
164                         if ((ret = filp->f_op->write(filp, page_address(page), 
165                              niobs->n_length, &offset)) != niobs->n_length)
166                                 status = ret;
167                                 goto out;             
168                 }               
169
170                 niobs++;
171         }
172 out:
173         set_fs(old_fs);
174         RETURN(status);
175 }
176
177 int ptlbd_parse_req(struct ptlrpc_request *req)
178 {
179         struct ptlbd_op *op;
180         struct ptlbd_niob *niob, *niobs;
181         struct ptlbd_rsp *rsp;
182         struct ptlrpc_bulk_desc *desc;
183         struct file *filp = req->rq_obd->u.ptlbd.filp;
184         struct l_wait_info lwi;
185         int size[1], wait_flag, i, page_count, rc, error_cnt = 0, 
186             status = RSP_OK;
187         struct list_head *pos, *n;
188         LIST_HEAD(tmp_pages);
189         ENTRY;
190
191         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
192         if ( rc )
193                 RETURN(rc);
194
195         op = lustre_msg_buf(req->rq_reqmsg, 0);
196         LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
197
198         niobs = lustre_msg_buf(req->rq_reqmsg, 1);
199         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
200
201         desc = ptlrpc_prep_bulk(req->rq_connection);
202         if (desc == NULL)
203                 GOTO(out, rc = -ENOMEM);
204         desc->bd_ptl_ev_hdlr = NULL;
205         desc->bd_portal = PTLBD_BULK_PORTAL;
206
207         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
208                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
209                 if (bulk == NULL)
210                         GOTO(out_bulk, rc = -ENOMEM);
211
212                 bulk->bp_page = alloc_page(GFP_KERNEL);
213                 if (bulk->bp_page == NULL)
214                         GOTO(out_bulk, rc = -ENOMEM);
215                 list_add(&bulk->bp_page->list, &tmp_pages);
216
217                 bulk->bp_xid = niob->n_xid;
218                 bulk->bp_buf = page_address(bulk->bp_page);
219                 bulk->bp_buflen = niob->n_length;
220         }
221
222         if ( op->op_cmd == PTLBD_READ ) {
223                 if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
224                                           page_count, &tmp_pages)) < 0) {
225                         error_cnt++;
226                 }
227                 rc = ptlrpc_bulk_put(desc);
228                 wait_flag = PTL_BULK_FL_SENT;
229         } else {
230                 rc = ptlrpc_bulk_get(desc);
231                 wait_flag = PTL_BULK_FL_RCVD;
232         }
233
234         if ( rc )
235                 GOTO(out_bulk, rc);
236
237         /* this synchronization probably isn't good enough */
238         lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
239         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
240
241         size[0] = sizeof(struct ptlbd_rsp);
242         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
243         if ( rc )
244                 GOTO(out, rc);
245
246         rsp = lustre_msg_buf(req->rq_repmsg, 0);
247         if ( rsp == NULL )
248                 GOTO(out, rc = -EINVAL);
249         
250         if ( op->op_cmd == PTLBD_WRITE ) {
251                 if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
252                                            page_count, &tmp_pages)) < 0) {
253                         error_cnt++;
254                 }
255         }
256
257         rsp->r_error_cnt = error_cnt;
258         rsp->r_status = status;                         /* I/O status */
259         req->rq_status = RQ_OK ; /* XXX */              /* ptlbd req status */
260
261         ptlrpc_reply(req->rq_svc, req);
262
263 out_bulk:
264         list_for_each_safe(pos, n, &tmp_pages) {
265                 struct page *page = list_entry(pos, struct page, list);
266                 list_del(&page->list);
267                 __free_page(page);
268         }
269         ptlrpc_bulk_decref(desc);
270 out:
271         RETURN(rc);
272 }