Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26
27 #define DEBUG_SUBSYSTEM S_PTLBD
28
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
34
35 int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
36                    struct buffer_head *first_bh)
37 {
38         struct obd_import *imp = ptlbd->bd_import;
39         struct ptlbd_op *op;
40         struct ptlbd_niob *niob, *niobs;
41         struct ptlbd_rsp *rsp;
42         struct ptlrpc_request *req;
43         struct ptlrpc_bulk_desc *desc;
44         struct buffer_head *bh;
45         unsigned int page_count;
46         int rc, rep_size, size[2];
47         ENTRY;
48
49         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
50
51         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
52                 page_count++;
53
54         size[0] = sizeof(struct ptlbd_op);
55         size[1] = page_count * sizeof(struct ptlbd_niob);
56
57         req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
58         if (!req)
59                 RETURN(rc = 1);                  /* need to return error cnt */
60
61         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
62         niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
63
64         /* XXX pack */
65         op->op_cmd = cmd;
66         op->op_lun = 0;
67         op->op_niob_cnt = page_count;
68         op->op__padding = 0;
69         op->op_block_cnt = page_count;
70
71         if (cmd == PTLBD_READ) 
72                 desc = ptlrpc_prep_bulk_imp (req, BULK_PUT_SINK, PTLBD_BULK_PORTAL);
73         else
74                 desc = ptlrpc_prep_bulk_imp (req, BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
75         if ( desc == NULL )
76                 GOTO(out, rc = 1);              /* need to return error cnt */
77         /* NB req now owns desc, and frees it when she frees herself */
78         
79         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
80                 rc = ptlrpc_prep_bulk_page(desc, bh->b_page,
81                                            bh_offset (bh) & (PAGE_SIZE - 1),
82                                            bh->b_size);
83                 if (rc != 0)
84                         GOTO(out, rc = 1);      /* need to return error cnt */
85
86                 niob->n_block_nr = bh->b_blocknr;
87                 niob->n_offset = bh_offset(bh);
88                 niob->n_length = bh->b_size;
89         }
90
91         rep_size = sizeof(struct ptlbd_rsp);
92         req->rq_replen = lustre_msg_size(1, &rep_size);
93
94         /* XXX find out how we're really supposed to manage levels */
95         req->rq_level = imp->imp_level;
96         rc = ptlrpc_queue_wait(req);
97
98         if ( rc != 0 )
99                 GOTO(out, rc = 1);              /* need to return error count */
100
101         rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
102                                  lustre_swab_ptlbd_rsp);
103         if (rsp == NULL) {
104                 CERROR ("can't unpack response\n");
105                 GOTO (out, rc = 1);             /* need to return error count */
106         }
107         else if (rsp->r_status != 0) {
108                 rc = rsp->r_error_cnt;
109         }
110
111 out:
112         ptlrpc_req_finished(req);
113         RETURN(rc);
114 }
115
116
117 int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
118 {
119         struct obd_import *imp = ptlbd->bd_import;
120         struct ptlbd_op *op;
121         struct ptlbd_rsp *rsp;
122         struct ptlrpc_request *req;
123         int rc, rep_size, size[1];
124         ENTRY;
125
126         LASSERT(cmd == PTLBD_FLUSH);
127
128         size[0] = sizeof(struct ptlbd_op);
129
130         req = ptlrpc_prep_req(imp, cmd, 1, size, NULL);
131         if (!req)
132                 RETURN(-ENOMEM); 
133
134         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
135
136         /* XXX pack */
137         op->op_cmd = cmd;
138         op->op_lun = 0;
139         op->op_niob_cnt = 0;
140         op->op__padding = 0;
141         op->op_block_cnt = 0;
142
143         rep_size = sizeof(struct ptlbd_rsp);
144         req->rq_replen = lustre_msg_size(1, &rep_size);
145
146         /* XXX find out how we're really supposed to manage levels */
147         req->rq_level = imp->imp_level;
148
149         rc = ptlrpc_queue_wait(req);
150         if ( rc != 0 )
151                 GOTO(out_req, rc = 1);
152         rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
153                                  lustre_swab_ptlbd_rsp);
154         if (rsp->r_status != 0)
155                 rc = rsp->r_status;
156
157 out_req:
158         ptlrpc_req_finished(req);
159         RETURN(rc);
160 }
161
162
163 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
164                 int page_count, struct list_head *page_list)
165 {
166         mm_segment_t old_fs;
167         struct list_head *pos;
168         int status = 0;
169         ENTRY;
170
171         old_fs = get_fs();
172         set_fs(KERNEL_DS);
173
174         list_for_each(pos, page_list) {
175                 ssize_t ret;
176                 struct page *page = list_entry(pos, struct page, list);
177                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
178                         niobs->n_offset;
179                 if ( op == PTLBD_READ )
180                         ret = filp->f_op->read(filp, page_address(page), 
181                              niobs->n_length, &offset);
182                 else 
183                         ret = filp->f_op->write(filp, page_address(page), 
184                              niobs->n_length, &offset);
185                 if (ret != niobs->n_length) {
186                         status = ret;
187                         break;
188                 }
189                 niobs++;
190         }
191         set_fs(old_fs);
192         RETURN(status);
193 }
194
195
196 int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index, 
197                      struct ptlrpc_request *req, int swab)
198 {
199         struct ptlbd_niob *niob, *niobs;
200         struct ptlbd_rsp *rsp;
201         struct ptlrpc_bulk_desc *desc = NULL;
202         struct file *filp = req->rq_obd->u.ptlbd.filp;
203         struct l_wait_info lwi;
204         int size[1], i, page_count, rc = 0, error_cnt = 0;
205         struct list_head *pos, *n;
206         struct page *page;
207         LIST_HEAD(tmp_pages);
208         ENTRY;
209
210         niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
211                                     lustre_swab_ptlbd_niob);
212         if (niobs == NULL)
213                 GOTO (out, rc = -EFAULT);
214
215         size[0] = sizeof(struct ptlbd_rsp);
216         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
217         if ( rc )
218                 GOTO(out, rc);
219
220         rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
221         if ( rsp == NULL )
222                 GOTO (out, rc = -EFAULT);
223
224         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
225         if (swab) {                             /* swab remaining niobs */
226                 for (i = 1; i < page_count; i++)
227                         lustre_swab_ptlbd_niob(&niobs[i]);
228         }
229         if (req->rq_export == NULL) {
230                 error_cnt++;
231                 GOTO(out_reply, rc = -EFAULT);
232         }
233         
234         if (cmd == PTLBD_READ)
235                 desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
236         else
237                 desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, PTLBD_BULK_PORTAL);
238         if (desc == NULL) {
239                 error_cnt++;
240                 GOTO(out_reply, rc = -ENOMEM);
241         }
242         desc->bd_portal = PTLBD_BULK_PORTAL;
243         LASSERT (page_count > 0);
244
245         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
246                 page = alloc_page(GFP_KERNEL);
247                 if (page == NULL) {
248                         error_cnt++;
249                         GOTO(out_reply, rc = -ENOMEM);
250                 }
251                 list_add_tail(&page->list, &tmp_pages);
252
253                 rc = ptlrpc_prep_bulk_page(desc, page,
254                                            niob->n_offset & (PAGE_SIZE - 1),
255                                            niob->n_length);
256                 if (rc != 0) {
257                         error_cnt++;
258                         GOTO(out_reply, rc);
259                 }
260         }
261
262         if ( cmd == PTLBD_READ ) {
263                 if ((rc = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
264                                         page_count, &tmp_pages)) < 0) {
265                         error_cnt++;
266                         GOTO(out_reply, rc);
267                 }
268                 rc = ptlrpc_bulk_put(desc);
269         } else {
270                 rc = ptlrpc_bulk_get(desc);
271         }
272
273         if ( rc ) {
274                 error_cnt++;
275                 GOTO(out_reply, rc);
276         }
277
278         lwi = LWI_TIMEOUT(obd_timeout * HZ, NULL, desc);
279         rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete(desc), &lwi);
280         if (rc != 0) {
281                 LASSERT(rc == -ETIMEDOUT);
282                 ptlrpc_abort_bulk(desc);
283                 error_cnt++;
284                 GOTO(out_reply, rc);
285         }
286         
287         if ( cmd == PTLBD_WRITE ) {
288                 if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
289                                            page_count, &tmp_pages)) < 0) {
290                         error_cnt++;
291                 }
292         }
293
294 out_reply:
295         rsp->r_error_cnt = error_cnt;
296         rsp->r_status = rc;  
297         req->rq_status = rc; 
298
299         ptlrpc_reply(req);
300
301         list_for_each_safe(pos, n, &tmp_pages) {
302                 struct page *page = list_entry(pos, struct page, list);
303                 list_del(&page->list);
304                 __free_page(page);
305         }
306         if (desc)
307                 ptlrpc_free_bulk(desc);
308 out:
309         RETURN(rc);
310 }
311
312
313 int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index, 
314                         struct ptlrpc_request *req)
315 {
316         struct ptlbd_rsp *rsp;
317         struct file *filp = req->rq_obd->u.ptlbd.filp;
318         int size[1], rc, status;
319         ENTRY;
320
321         size[0] = sizeof(struct ptlbd_rsp);
322         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
323         if ( rc )
324                 RETURN(rc);
325
326         rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
327         if ( rsp == NULL )
328                 RETURN(-EINVAL);
329
330         if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
331               (filp->f_dentry))
332                 GOTO(out_reply, status = -EINVAL);
333
334         status = filp->f_op->fsync(filp, filp->f_dentry, 1);
335
336 out_reply:
337         rsp->r_error_cnt = 0;
338         rsp->r_status = status;
339         req->rq_status = 0;
340
341         ptlrpc_reply(req);
342         RETURN(0);
343 }
344
345
346 int ptlbd_handle(struct ptlrpc_request *req)
347 {
348         struct ptlbd_op *op;
349         int swab;
350         int rc;
351         ENTRY;
352
353         swab = lustre_msg_swabbed (req->rq_reqmsg);
354
355         if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
356                 rc = target_handle_connect(req, ptlbd_handle);
357                 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
358                 RETURN(0);
359         }
360         if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
361                 rc = target_handle_disconnect(req);
362                 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
363                 RETURN(0);
364         }
365         op = lustre_swab_reqbuf (req, 0, sizeof (*op),
366                                  lustre_swab_ptlbd_op);
367         if (op == NULL)
368                 RETURN(-EFAULT);
369
370         switch (op->op_cmd) {
371                 case PTLBD_READ:
372                 case PTLBD_WRITE:
373                         rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req, 
374                                               swab);
375                         break;
376
377                 case PTLBD_FLUSH:
378                         rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);
379                         break;
380                 default:
381                         rc = -EINVAL;
382         }
383
384         RETURN(rc);
385 }