1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include <linux/version.h>
24 #include <linux/module.h>
27 #define DEBUG_SUBSYSTEM S_PTLBD
29 #include <linux/obd_support.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_debug.h>
32 #include <linux/lprocfs_status.h>
33 #include <linux/obd_ptlbd.h>
35 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
36 struct buffer_head *first_bh)
38 struct obd_import *imp = &ptlbd->bd_import;
40 struct ptlbd_niob *niob, *niobs;
41 struct ptlbd_rsp *rsp;
42 struct ptlrpc_request *req;
43 struct ptlrpc_bulk_desc *desc;
44 struct buffer_head *bh;
46 unsigned int page_count;
47 int rc, rep_size, size[2];
51 LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
53 for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
56 size[0] = sizeof(struct ptlbd_op);
57 size[1] = page_count * sizeof(struct ptlbd_niob);
59 req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
63 op = lustre_msg_buf(req->rq_reqmsg, 0);
64 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
69 op->op_niob_cnt = page_count;
71 op->op_block_cnt = page_count;
73 desc = ptlrpc_prep_bulk(imp->imp_connection);
75 GOTO(out_req, rc = -ENOMEM);
76 desc->bd_portal = PTLBD_BULK_PORTAL;
77 desc->bd_ptl_ev_hdlr = NULL;
79 spin_lock_irqsave(&imp->imp_lock, flags);
80 xid = ++imp->imp_last_xid;
81 spin_unlock_irqrestore(&imp->imp_lock, flags);
83 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
84 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
86 GOTO(out_req, rc = -ENOMEM);
89 niob->n_block_nr = bh->b_blocknr;
90 niob->n_offset = bh_offset(bh);
91 niob->n_length = bh->b_size;
94 bulk->bp_buf = bh->b_data;
95 bulk->bp_page = bh->b_page;
96 bulk->bp_buflen = bh->b_size;
99 if ( cmd == PTLBD_READ )
100 rc = ptlrpc_register_bulk_put(desc);
102 rc = ptlrpc_register_bulk_get(desc);
107 rep_size = sizeof(struct ptlbd_rsp);
108 req->rq_replen = lustre_msg_size(1, &rep_size);
110 /* XXX find out how we're really supposed to manage levels */
111 req->rq_level = imp->imp_level;
112 rc = ptlrpc_queue_wait(req);
115 rsp = lustre_msg_buf(req->rq_repmsg, 0);
120 ptlrpc_bulk_decref(desc);
122 ptlrpc_req_finished(req);
126 static int ptlbd_bulk_timeout(void *data)
128 /* struct ptlrpc_bulk_desc *desc = data;*/
131 CERROR("ugh, timed out\n");
136 void ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
137 int page_count, struct list_head *page_list)
140 struct list_head *pos;
146 list_for_each(pos, page_list) {
148 struct page *page = list_entry(pos, struct page, list);
149 loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
152 if ( op == PTLBD_READ )
153 ret = filp->f_op->read(filp, page_address(page),
154 niobs->n_length, &offset);
156 ret = filp->f_op->write(filp, page_address(page),
157 niobs->n_length, &offset);
166 int ptlbd_parse_req(struct ptlrpc_request *req)
169 struct ptlbd_niob *niob, *niobs;
170 struct ptlbd_rsp *rsp;
171 struct ptlrpc_bulk_desc *desc;
172 struct file *filp = req->rq_obd->u.ptlbd.filp;
173 struct l_wait_info lwi;
174 int size[1], wait_flag, i, page_count, rc;
175 struct list_head *pos, *n;
176 LIST_HEAD(tmp_pages);
179 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
183 op = lustre_msg_buf(req->rq_reqmsg, 0);
184 LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
186 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
187 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
189 desc = ptlrpc_prep_bulk(req->rq_connection);
191 GOTO(out, rc = -ENOMEM);
192 desc->bd_ptl_ev_hdlr = NULL;
193 desc->bd_portal = PTLBD_BULK_PORTAL;
195 for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
196 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
198 GOTO(out_bulk, rc = -ENOMEM);
200 bulk->bp_page = alloc_page(GFP_KERNEL);
201 if (bulk->bp_page == NULL)
202 GOTO(out_bulk, rc = -ENOMEM);
203 list_add(&bulk->bp_page->list, &tmp_pages);
206 * XXX what about the block number?
208 bulk->bp_xid = niob->n_xid;
209 bulk->bp_buf = page_address(bulk->bp_page);
210 bulk->bp_buflen = niob->n_length;
213 if ( op->op_cmd == PTLBD_READ ) {
214 ptlbd_do_filp(filp, PTLBD_READ, niobs, page_count, &tmp_pages);
215 rc = ptlrpc_bulk_put(desc);
216 wait_flag = PTL_BULK_FL_SENT;
218 rc = ptlrpc_bulk_get(desc);
219 wait_flag = PTL_BULK_FL_RCVD;
225 /* this synchronization probably isn't good enough */
226 lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
227 rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
229 size[0] = sizeof(struct ptlbd_rsp);
230 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
234 rsp = lustre_msg_buf(req->rq_repmsg, 0);
236 GOTO(out, rc = -EINVAL);
238 ptlbd_do_filp(filp, PTLBD_WRITE, niobs, page_count, &tmp_pages);
240 rsp->r_error_cnt = 42;
243 req->rq_status = 0; /* XXX */
244 ptlrpc_reply(req->rq_svc, req);
247 list_for_each_safe(pos, n, &tmp_pages) {
248 struct page *page = list_entry(pos, struct page, list);
249 list_del(&page->list);
252 ptlrpc_bulk_decref(desc);