1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include <linux/version.h>
24 #include <linux/module.h>
27 #define DEBUG_SUBSYSTEM S_PTLBD
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
39 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
40 struct request *blkreq)
42 struct buffer_head *first_bh = blkreq->bh;
43 struct obd_import *imp = &ptlbd->bd_import;
45 struct ptlbd_niob *niob, *niobs;
46 struct ptlbd_rsp *rsp;
47 struct ptlrpc_request *req;
48 struct ptlrpc_bulk_desc *desc;
49 struct buffer_head *bh;
50 unsigned int page_count;
51 int rc, rep_size, size[2];
55 LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
57 for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
60 size[0] = sizeof(struct ptlbd_op);
61 size[1] = page_count * sizeof(struct ptlbd_niob);
63 req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
67 op = lustre_msg_buf(req->rq_reqmsg, 0);
68 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
73 op->op_niob_cnt = page_count;
75 op->op_block_cnt = page_count;
77 desc = ptlrpc_prep_bulk(imp->imp_connection);
79 GOTO(out_req, rc = -ENOMEM);
80 desc->bd_portal = PTLBD_BULK_PORTAL;
81 desc->bd_ptl_ev_hdlr = NULL;
83 xid = ptlrpc_next_xid();
85 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
86 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
88 GOTO(out_req, rc = -ENOMEM);
91 niob->n_block_nr = bh->b_blocknr;
92 niob->n_offset = bh_offset(bh);
93 niob->n_length = bh->b_size;
96 bulk->bp_buf = bh->b_data;
97 bulk->bp_page = bh->b_page;
98 bulk->bp_buflen = bh->b_size;
101 if ( cmd == PTLBD_READ )
102 rc = ptlrpc_register_bulk_put(desc);
104 rc = ptlrpc_register_bulk_get(desc);
109 rep_size = sizeof(struct ptlbd_rsp);
110 req->rq_replen = lustre_msg_size(1, &rep_size);
112 /* XXX find out how we're really supposed to manage levels */
113 req->rq_level = imp->imp_level;
114 rc = ptlrpc_queue_wait(req);
120 rsp = lustre_msg_buf(req->rq_repmsg, 0);
121 if (rsp->r_status != RSP_OK) {
122 blkreq->errors += rsp->r_error_cnt;
126 ptlrpc_bulk_decref(desc);
128 ptlrpc_req_finished(req);
132 static int ptlbd_bulk_timeout(void *data)
134 /* struct ptlrpc_bulk_desc *desc = data;*/
137 CERROR("ugh, timed out\n");
142 int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
143 int page_count, struct list_head *page_list)
146 struct list_head *pos;
153 list_for_each(pos, page_list) {
155 struct page *page = list_entry(pos, struct page, list);
156 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
158 if ( op == PTLBD_READ ) {
159 if ((ret = filp->f_op->read(filp, page_address(page),
160 niobs->n_length, &offset)) != niobs->n_length)
164 if ((ret = filp->f_op->write(filp, page_address(page),
165 niobs->n_length, &offset)) != niobs->n_length)
177 int ptlbd_parse_req(struct ptlrpc_request *req)
180 struct ptlbd_niob *niob, *niobs;
181 struct ptlbd_rsp *rsp;
182 struct ptlrpc_bulk_desc *desc;
183 struct file *filp = req->rq_obd->u.ptlbd.filp;
184 struct l_wait_info lwi;
185 int size[1], wait_flag, i, page_count, rc, error_cnt = 0,
187 struct list_head *pos, *n;
188 LIST_HEAD(tmp_pages);
191 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
195 op = lustre_msg_buf(req->rq_reqmsg, 0);
196 LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
198 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
199 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
201 desc = ptlrpc_prep_bulk(req->rq_connection);
203 GOTO(out, rc = -ENOMEM);
204 desc->bd_ptl_ev_hdlr = NULL;
205 desc->bd_portal = PTLBD_BULK_PORTAL;
207 for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
208 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
210 GOTO(out_bulk, rc = -ENOMEM);
212 bulk->bp_page = alloc_page(GFP_KERNEL);
213 if (bulk->bp_page == NULL)
214 GOTO(out_bulk, rc = -ENOMEM);
215 list_add(&bulk->bp_page->list, &tmp_pages);
217 bulk->bp_xid = niob->n_xid;
218 bulk->bp_buf = page_address(bulk->bp_page);
219 bulk->bp_buflen = niob->n_length;
222 if ( op->op_cmd == PTLBD_READ ) {
223 if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs,
224 page_count, &tmp_pages)) < 0) {
227 rc = ptlrpc_bulk_put(desc);
228 wait_flag = PTL_BULK_FL_SENT;
230 rc = ptlrpc_bulk_get(desc);
231 wait_flag = PTL_BULK_FL_RCVD;
237 /* this synchronization probably isn't good enough */
238 lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
239 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
241 size[0] = sizeof(struct ptlbd_rsp);
242 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
246 rsp = lustre_msg_buf(req->rq_repmsg, 0);
248 GOTO(out, rc = -EINVAL);
250 if ( op->op_cmd == PTLBD_WRITE ) {
251 if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
252 page_count, &tmp_pages)) < 0) {
257 rsp->r_error_cnt = error_cnt;
258 rsp->r_status = status; /* I/O status */
259 req->rq_status = RQ_OK ; /* XXX */ /* ptlbd req status */
261 ptlrpc_reply(req->rq_svc, req);
264 list_for_each_safe(pos, n, &tmp_pages) {
265 struct page *page = list_entry(pos, struct page, list);
266 list_del(&page->list);
269 ptlrpc_bulk_decref(desc);