1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include <linux/version.h>
24 #include <linux/module.h>
27 #define DEBUG_SUBSYSTEM S_PTLBD
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
35 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
36 struct buffer_head *first_bh)
38 struct obd_import *imp = &ptlbd->bd_import;
40 struct ptlbd_niob *niob, *niobs;
41 struct ptlbd_rsp *rsp;
42 struct ptlrpc_request *req;
43 struct ptlrpc_bulk_desc *desc;
44 struct buffer_head *bh;
45 unsigned int page_count;
46 int rc, rep_size, size[2];
50 LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
52 for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
55 size[0] = sizeof(struct ptlbd_op);
56 size[1] = page_count * sizeof(struct ptlbd_niob);
58 req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
62 op = lustre_msg_buf(req->rq_reqmsg, 0);
63 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
68 op->op_niob_cnt = page_count;
70 op->op_block_cnt = page_count;
72 desc = ptlrpc_prep_bulk(imp->imp_connection);
74 GOTO(out_req, rc = -ENOMEM);
75 desc->bd_portal = PTLBD_BULK_PORTAL;
76 desc->bd_ptl_ev_hdlr = NULL;
78 xid = ptlrpc_next_xid();
80 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
81 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
83 GOTO(out_req, rc = -ENOMEM);
86 niob->n_block_nr = bh->b_blocknr;
87 niob->n_offset = bh_offset(bh);
88 niob->n_length = bh->b_size;
91 bulk->bp_buf = bh->b_data;
92 bulk->bp_page = bh->b_page;
93 bulk->bp_buflen = bh->b_size;
96 if ( cmd == PTLBD_READ )
97 rc = ptlrpc_register_bulk_put(desc);
99 rc = ptlrpc_register_bulk_get(desc);
104 rep_size = sizeof(struct ptlbd_rsp);
105 req->rq_replen = lustre_msg_size(1, &rep_size);
107 /* XXX find out how we're really supposed to manage levels */
108 req->rq_level = imp->imp_level;
109 rc = ptlrpc_queue_wait(req);
112 rsp = lustre_msg_buf(req->rq_repmsg, 0);
117 ptlrpc_bulk_decref(desc);
119 ptlrpc_req_finished(req);
123 static int ptlbd_bulk_timeout(void *data)
125 /* struct ptlrpc_bulk_desc *desc = data;*/
128 CERROR("ugh, timed out\n");
133 void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
134 int page_count, struct list_head *page_list)
137 struct list_head *pos;
143 list_for_each(pos, page_list) {
145 struct page *page = list_entry(pos, struct page, list);
146 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
149 if ( op == PTLBD_READ )
150 ret = filp->f_op->read(filp, page_address(page),
151 niobs->n_length, &offset);
153 ret = filp->f_op->write(filp, page_address(page),
154 niobs->n_length, &offset);
163 int ptlbd_parse_req(struct ptlrpc_request *req)
166 struct ptlbd_niob *niob, *niobs;
167 struct ptlbd_rsp *rsp;
168 struct ptlrpc_bulk_desc *desc;
169 struct file *filp = req->rq_obd->u.ptlbd.filp;
170 struct l_wait_info lwi;
171 int size[1], wait_flag, i, page_count, rc;
172 struct list_head *pos, *n;
173 LIST_HEAD(tmp_pages);
176 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
180 op = lustre_msg_buf(req->rq_reqmsg, 0);
181 LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
183 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
184 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
186 desc = ptlrpc_prep_bulk(req->rq_connection);
188 GOTO(out, rc = -ENOMEM);
189 desc->bd_ptl_ev_hdlr = NULL;
190 desc->bd_portal = PTLBD_BULK_PORTAL;
192 for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
193 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
195 GOTO(out_bulk, rc = -ENOMEM);
197 bulk->bp_page = alloc_page(GFP_KERNEL);
198 if (bulk->bp_page == NULL)
199 GOTO(out_bulk, rc = -ENOMEM);
200 list_add(&bulk->bp_page->list, &tmp_pages);
203 * XXX what about the block number?
205 bulk->bp_xid = niob->n_xid;
206 bulk->bp_buf = page_address(bulk->bp_page);
207 bulk->bp_buflen = niob->n_length;
210 if ( op->op_cmd == PTLBD_READ ) {
211 ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
212 rc = ptlrpc_bulk_put(desc);
213 wait_flag = PTL_BULK_FL_SENT;
215 rc = ptlrpc_bulk_get(desc);
216 wait_flag = PTL_BULK_FL_RCVD;
222 /* this synchronization probably isn't good enough */
223 lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
224 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
226 size[0] = sizeof(struct ptlbd_rsp);
227 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
231 rsp = lustre_msg_buf(req->rq_repmsg, 0);
233 GOTO(out, rc = -EINVAL);
235 ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
237 rsp->r_error_cnt = 42;
240 req->rq_status = 0; /* XXX */
241 ptlrpc_reply(req->rq_svc, req);
244 list_for_each_safe(pos, n, &tmp_pages) {
245 struct page *page = list_entry(pos, struct page, list);
246 list_del(&page->list);
249 ptlrpc_bulk_decref(desc);