1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include <linux/version.h>
24 #include <linux/module.h>
27 #define DEBUG_SUBSYSTEM S_PTLBD
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
35 int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
36 struct buffer_head *first_bh)
38 struct obd_import *imp = ptlbd->bd_import;
40 struct ptlbd_niob *niob, *niobs;
41 struct ptlbd_rsp *rsp;
42 struct ptlrpc_request *req;
43 struct ptlrpc_bulk_desc *desc;
44 struct buffer_head *bh;
45 unsigned int page_count;
46 int rc, rep_size, size[2];
49 LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
51 for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
54 size[0] = sizeof(struct ptlbd_op);
55 size[1] = page_count * sizeof(struct ptlbd_niob);
57 req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 2, size, NULL);
59 RETURN(rc = 1); /* need to return error cnt */
61 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
62 niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
67 op->op_niob_cnt = page_count;
69 op->op_block_cnt = page_count;
71 if (cmd == PTLBD_READ)
72 desc = ptlrpc_prep_bulk_imp (req, page_count,
73 BULK_PUT_SINK, PTLBD_BULK_PORTAL);
75 desc = ptlrpc_prep_bulk_imp (req, page_count,
76 BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
78 GOTO(out, rc = 1); /* need to return error cnt */
79 /* NB req now owns desc, and frees it when she frees herself */
81 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
82 ptlrpc_prep_bulk_page(desc, bh->b_page,
83 bh_offset (bh) & (PAGE_SIZE - 1),
86 niob->n_block_nr = bh->b_blocknr;
87 niob->n_offset = bh_offset(bh);
88 niob->n_length = bh->b_size;
91 rep_size = sizeof(struct ptlbd_rsp);
92 req->rq_replen = lustre_msg_size(1, &rep_size);
94 /* XXX find out how we're really supposed to manage levels */
95 req->rq_send_state = imp->imp_state;
96 rc = ptlrpc_queue_wait(req);
99 GOTO(out, rc = 1); /* need to return error count */
101 rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
102 lustre_swab_ptlbd_rsp);
104 CERROR ("can't unpack response\n");
105 GOTO (out, rc = 1); /* need to return error count */
107 else if (rsp->r_status != 0) {
108 rc = rsp->r_error_cnt;
112 ptlrpc_req_finished(req);
117 int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
119 struct obd_import *imp = ptlbd->bd_import;
121 struct ptlbd_rsp *rsp;
122 struct ptlrpc_request *req;
123 int rc, rep_size, size[1];
126 LASSERT(cmd == PTLBD_FLUSH);
128 size[0] = sizeof(struct ptlbd_op);
130 req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 1, size, NULL);
134 op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
141 op->op_block_cnt = 0;
143 rep_size = sizeof(struct ptlbd_rsp);
144 req->rq_replen = lustre_msg_size(1, &rep_size);
146 /* XXX find out how we're really supposed to manage levels */
147 req->rq_send_state = imp->imp_state;
149 rc = ptlrpc_queue_wait(req);
151 GOTO(out_req, rc = 1);
152 rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
153 lustre_swab_ptlbd_rsp);
154 if (rsp->r_status != 0)
158 ptlrpc_req_finished(req);
163 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
164 int page_count, struct list_head *page_list)
167 struct list_head *pos;
174 list_for_each(pos, page_list) {
176 struct page *page = list_entry(pos, struct page, list);
177 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
179 if ( op == PTLBD_READ )
180 ret = filp->f_op->read(filp, page_address(page),
181 niobs->n_length, &offset);
183 ret = filp->f_op->write(filp, page_address(page),
184 niobs->n_length, &offset);
185 if (ret != niobs->n_length) {
196 int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index,
197 struct ptlrpc_request *req, int swab)
199 struct ptlbd_niob *niob, *niobs;
200 struct ptlbd_rsp *rsp;
201 struct ptlrpc_bulk_desc *desc = NULL;
202 struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
203 struct l_wait_info lwi;
204 int size[1], i, page_count, rc = 0, error_cnt = 0;
205 struct list_head *pos, *n;
207 LIST_HEAD(tmp_pages);
210 niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
211 lustre_swab_ptlbd_niob);
213 GOTO (out, rc = -EFAULT);
215 size[0] = sizeof(struct ptlbd_rsp);
216 rc = lustre_pack_reply(req, 1, size, NULL);
220 rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
222 GOTO (out, rc = -EFAULT);
224 /* FIXME: assumes each niobuf fits in 1 page */
225 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
226 if (swab) { /* swab remaining niobs */
227 for (i = 1; i < page_count; i++)
228 lustre_swab_ptlbd_niob(&niobs[i]);
230 if (req->rq_export == NULL) {
232 GOTO(out_reply, rc = -EFAULT);
235 if (cmd == PTLBD_READ)
236 desc = ptlrpc_prep_bulk_exp (req, page_count,
237 BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
239 desc = ptlrpc_prep_bulk_exp (req, page_count,
240 BULK_GET_SINK, PTLBD_BULK_PORTAL);
243 GOTO(out_reply, rc = -ENOMEM);
245 desc->bd_portal = PTLBD_BULK_PORTAL;
246 LASSERT (page_count > 0);
248 for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
249 page = alloc_page(GFP_KERNEL);
252 GOTO(out_reply, rc = -ENOMEM);
254 list_add_tail(&page->list, &tmp_pages);
256 ptlrpc_prep_bulk_page(desc, page,
257 niob->n_offset & (PAGE_SIZE - 1),
261 if ( cmd == PTLBD_READ ) {
262 rc = ptlbd_do_filp(filp, PTLBD_READ, niobs,
263 page_count, &tmp_pages);
269 rc = ptlrpc_start_bulk_transfer(desc);
276 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
277 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
279 LASSERT(rc == -ETIMEDOUT);
280 ptlrpc_abort_bulk(desc);
285 /* XXX do some error handling */
286 LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
288 if ( cmd == PTLBD_WRITE ) {
289 if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
290 page_count, &tmp_pages)) < 0) {
296 rsp->r_error_cnt = error_cnt;
302 list_for_each_safe(pos, n, &tmp_pages) {
303 struct page *page = list_entry(pos, struct page, list);
304 list_del(&page->list);
308 ptlrpc_free_bulk(desc);
314 int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index,
315 struct ptlrpc_request *req)
317 struct ptlbd_rsp *rsp;
318 struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
319 int size[1], rc, status;
322 size[0] = sizeof(struct ptlbd_rsp);
323 rc = lustre_pack_reply(req, 1, size, NULL);
327 rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
331 if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
333 GOTO(out_reply, status = -EINVAL);
335 status = filp->f_op->fsync(filp, filp->f_dentry, 1);
338 rsp->r_error_cnt = 0;
339 rsp->r_status = status;
347 int ptlbd_handle(struct ptlrpc_request *req)
354 swab = lustre_msg_swabbed (req->rq_reqmsg);
356 if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
357 rc = target_handle_connect(req);
358 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
361 if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
362 rc = target_handle_disconnect(req);
363 target_send_reply(req, rc, OBD_FAIL_PTLRPC);
366 op = lustre_swab_reqbuf (req, 0, sizeof (*op),
367 lustre_swab_ptlbd_op);
371 switch (op->op_cmd) {
374 rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req,
379 rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);