Whamcloud - gitweb
b=3031
[fs/lustre-release.git] / lustre / ptlbd / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26
27 #define DEBUG_SUBSYSTEM S_PTLBD
28
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
34
35 int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd, 
36                    struct buffer_head *first_bh)
37 {
38         struct obd_import *imp = ptlbd->bd_import;
39         struct ptlbd_op *op;
40         struct ptlbd_niob *niob, *niobs;
41         struct ptlbd_rsp *rsp;
42         struct ptlrpc_request *req;
43         struct ptlrpc_bulk_desc *desc;
44         struct buffer_head *bh;
45         unsigned int page_count;
46         int rc, rep_size, size[2];
47         ENTRY;
48
49         LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
50
51         for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
52                 page_count++;
53
54         size[0] = sizeof(struct ptlbd_op);
55         size[1] = page_count * sizeof(struct ptlbd_niob);
56
57         req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 2, size, NULL);
58         if (!req)
59                 RETURN(rc = 1);                  /* need to return error cnt */
60
61         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
62         niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
63
64         /* XXX pack */
65         op->op_cmd = cmd;
66         op->op_lun = 0;
67         op->op_niob_cnt = page_count;
68         op->op__padding = 0;
69         op->op_block_cnt = page_count;
70
71         if (cmd == PTLBD_READ) 
72                 desc = ptlrpc_prep_bulk_imp (req, page_count,
73                                              BULK_PUT_SINK, PTLBD_BULK_PORTAL);
74         else
75                 desc = ptlrpc_prep_bulk_imp (req, page_count,
76                                              BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
77         if ( desc == NULL )
78                 GOTO(out, rc = 1);              /* need to return error cnt */
79         /* NB req now owns desc, and frees it when she frees herself */
80         
81         for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
82                 ptlrpc_prep_bulk_page(desc, bh->b_page,
83                                       bh_offset (bh) & (PAGE_SIZE - 1),
84                                       bh->b_size);
85
86                 niob->n_block_nr = bh->b_blocknr;
87                 niob->n_offset = bh_offset(bh);
88                 niob->n_length = bh->b_size;
89         }
90
91         rep_size = sizeof(struct ptlbd_rsp);
92         req->rq_replen = lustre_msg_size(1, &rep_size);
93
94         /* XXX find out how we're really supposed to manage levels */
95         req->rq_send_state = imp->imp_state;
96         rc = ptlrpc_queue_wait(req);
97
98         if ( rc != 0 )
99                 GOTO(out, rc = 1);              /* need to return error count */
100
101         rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
102                                  lustre_swab_ptlbd_rsp);
103         if (rsp == NULL) {
104                 CERROR ("can't unpack response\n");
105                 GOTO (out, rc = 1);             /* need to return error count */
106         }
107         else if (rsp->r_status != 0) {
108                 rc = rsp->r_error_cnt;
109         }
110
111 out:
112         ptlrpc_req_finished(req);
113         RETURN(rc);
114 }
115
116
117 int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
118 {
119         struct obd_import *imp = ptlbd->bd_import;
120         struct ptlbd_op *op;
121         struct ptlbd_rsp *rsp;
122         struct ptlrpc_request *req;
123         int rc, rep_size, size[1];
124         ENTRY;
125
126         LASSERT(cmd == PTLBD_FLUSH);
127
128         size[0] = sizeof(struct ptlbd_op);
129
130         req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 1, size, NULL);
131         if (!req)
132                 RETURN(-ENOMEM); 
133
134         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
135
136         /* XXX pack */
137         op->op_cmd = cmd;
138         op->op_lun = 0;
139         op->op_niob_cnt = 0;
140         op->op__padding = 0;
141         op->op_block_cnt = 0;
142
143         rep_size = sizeof(struct ptlbd_rsp);
144         req->rq_replen = lustre_msg_size(1, &rep_size);
145
146         /* XXX find out how we're really supposed to manage levels */
147         req->rq_send_state = imp->imp_state;
148
149         rc = ptlrpc_queue_wait(req);
150         if ( rc != 0 )
151                 GOTO(out_req, rc = 1);
152         rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
153                                  lustre_swab_ptlbd_rsp);
154         if (rsp->r_status != 0)
155                 rc = rsp->r_status;
156
157 out_req:
158         ptlrpc_req_finished(req);
159         RETURN(rc);
160 }
161
162
163 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs, 
164                 int page_count, struct list_head *page_list)
165 {
166         mm_segment_t old_fs;
167         struct list_head *pos;
168         int status = 0;
169         ENTRY;
170
171         old_fs = get_fs();
172         set_fs(KERNEL_DS);
173
174         list_for_each(pos, page_list) {
175                 ssize_t ret;
176                 struct page *page = list_entry(pos, struct page, list);
177                 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) + 
178                         niobs->n_offset;
179                 if ( op == PTLBD_READ )
180                         ret = filp->f_op->read(filp, page_address(page), 
181                              niobs->n_length, &offset);
182                 else 
183                         ret = filp->f_op->write(filp, page_address(page), 
184                              niobs->n_length, &offset);
185                 if (ret != niobs->n_length) {
186                         status = ret;
187                         break;
188                 }
189                 niobs++;
190         }
191         set_fs(old_fs);
192         RETURN(status);
193 }
194
195
196 int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index, 
197                      struct ptlrpc_request *req, int swab)
198 {
199         struct ptlbd_niob *niob, *niobs;
200         struct ptlbd_rsp *rsp;
201         struct ptlrpc_bulk_desc *desc = NULL;
202         struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
203         struct l_wait_info lwi;
204         int size[1], i, page_count, rc = 0, error_cnt = 0;
205         struct list_head *pos, *n;
206         struct page *page;
207         LIST_HEAD(tmp_pages);
208         ENTRY;
209
210         niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
211                                     lustre_swab_ptlbd_niob);
212         if (niobs == NULL)
213                 GOTO (out, rc = -EFAULT);
214
215         size[0] = sizeof(struct ptlbd_rsp);
216         rc = lustre_pack_reply(req, 1, size, NULL);
217         if ( rc )
218                 GOTO(out, rc);
219
220         rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
221         if ( rsp == NULL )
222                 GOTO (out, rc = -EFAULT);
223
224         /* FIXME: assumes each niobuf fits in 1 page */
225         page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
226         if (swab) {                             /* swab remaining niobs */
227                 for (i = 1; i < page_count; i++)
228                         lustre_swab_ptlbd_niob(&niobs[i]);
229         }
230         if (req->rq_export == NULL) {
231                 error_cnt++;
232                 GOTO(out_reply, rc = -EFAULT);
233         }
234         
235         if (cmd == PTLBD_READ)
236                 desc = ptlrpc_prep_bulk_exp (req, page_count, 
237                                              BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
238         else
239                 desc = ptlrpc_prep_bulk_exp (req, page_count,
240                                              BULK_GET_SINK, PTLBD_BULK_PORTAL);
241         if (desc == NULL) {
242                 error_cnt++;
243                 GOTO(out_reply, rc = -ENOMEM);
244         }
245         desc->bd_portal = PTLBD_BULK_PORTAL;
246         LASSERT (page_count > 0);
247
248         for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
249                 page = alloc_page(GFP_KERNEL);
250                 if (page == NULL) {
251                         error_cnt++;
252                         GOTO(out_reply, rc = -ENOMEM);
253                 }
254                 list_add_tail(&page->list, &tmp_pages);
255
256                 ptlrpc_prep_bulk_page(desc, page,
257                                       niob->n_offset & (PAGE_SIZE - 1),
258                                       niob->n_length);
259         }
260
261         if ( cmd == PTLBD_READ ) {
262                 rc = ptlbd_do_filp(filp, PTLBD_READ, niobs, 
263                                    page_count, &tmp_pages);
264                 if (rc < 0) {
265                         error_cnt++;
266                         GOTO(out_reply, rc);
267                 }
268         }
269         rc = ptlrpc_start_bulk_transfer(desc);
270
271         if ( rc ) {
272                 error_cnt++;
273                 GOTO(out_reply, rc);
274         }
275
276         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
277         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
278         if (rc != 0) {
279                 LASSERT(rc == -ETIMEDOUT);
280                 ptlrpc_abort_bulk(desc);
281                 error_cnt++;
282                 GOTO(out_reply, rc);
283         }
284
285         /* XXX do some error handling */
286         LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
287         
288         if ( cmd == PTLBD_WRITE ) {
289                 if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs, 
290                                            page_count, &tmp_pages)) < 0) {
291                         error_cnt++;
292                 }
293         }
294
295 out_reply:
296         rsp->r_error_cnt = error_cnt;
297         rsp->r_status = rc;  
298         req->rq_status = rc; 
299
300         ptlrpc_reply(req);
301
302         list_for_each_safe(pos, n, &tmp_pages) {
303                 struct page *page = list_entry(pos, struct page, list);
304                 list_del(&page->list);
305                 __free_page(page);
306         }
307         if (desc)
308                 ptlrpc_free_bulk(desc);
309 out:
310         RETURN(rc);
311 }
312
313
314 int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index, 
315                         struct ptlrpc_request *req)
316 {
317         struct ptlbd_rsp *rsp;
318         struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
319         int size[1], rc, status;
320         ENTRY;
321
322         size[0] = sizeof(struct ptlbd_rsp);
323         rc = lustre_pack_reply(req, 1, size, NULL);
324         if ( rc )
325                 RETURN(rc);
326
327         rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
328         if ( rsp == NULL )
329                 RETURN(-EINVAL);
330
331         if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
332               (filp->f_dentry))
333                 GOTO(out_reply, status = -EINVAL);
334
335         status = filp->f_op->fsync(filp, filp->f_dentry, 1);
336
337 out_reply:
338         rsp->r_error_cnt = 0;
339         rsp->r_status = status;
340         req->rq_status = 0;
341
342         ptlrpc_reply(req);
343         RETURN(0);
344 }
345
346
347 int ptlbd_handle(struct ptlrpc_request *req)
348 {
349         struct ptlbd_op *op;
350         int swab;
351         int rc;
352         ENTRY;
353
354         swab = lustre_msg_swabbed (req->rq_reqmsg);
355
356         if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
357                 rc = target_handle_connect(req);
358                 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
359                 RETURN(0);
360         }
361         if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
362                 rc = target_handle_disconnect(req);
363                 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
364                 RETURN(0);
365         }
366         op = lustre_swab_reqbuf (req, 0, sizeof (*op),
367                                  lustre_swab_ptlbd_op);
368         if (op == NULL)
369                 RETURN(-EFAULT);
370
371         switch (op->op_cmd) {
372                 case PTLBD_READ:
373                 case PTLBD_WRITE:
374                         rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req, 
375                                               swab);
376                         break;
377
378                 case PTLBD_FLUSH:
379                         rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);
380                         break;
381                 default:
382                         rc = -EINVAL;
383         }
384
385         RETURN(rc);
386 }