1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #include <linux/version.h>
23 #include <linux/module.h>
26 #define DEBUG_SUBSYSTEM S_PTLBD
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_debug.h>
31 #include <linux/lprocfs_status.h>
32 #include <linux/obd_ptlbd.h>
34 static __u32 get_next_xid(struct obd_import *imp)
38 spin_lock_irqsave(&imp->imp_lock, flags);
39 xid = ++imp->imp_last_xid;
40 spin_unlock_irqrestore(&imp->imp_lock, flags);
44 static int ptlbd_brw_callback(struct obd_brw_set *set, int phase)
50 static void decref_bulk_desc(void *data)
52 struct ptlrpc_bulk_desc *desc = data;
55 ptlrpc_bulk_decref(desc);
59 /* this is the callback function which is invoked by the Portals
60 * event handler associated with the bulk_sink queue and bulk_source queue.
62 static void ptlbd_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
66 LASSERT(desc->bd_brw_set != NULL);
67 LASSERT(desc->bd_brw_set->brw_callback != NULL);
69 desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
71 prepare_work(&desc->bd_queue, decref_bulk_desc, desc);
72 schedule_work(&desc->bd_queue);
78 int ptlbd_write_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
79 struct buffer_head *first_bh, unsigned int page_count)
81 struct obd_import *imp = &ptlbd->bd_import;
83 struct ptlbd_niob *niob, *niobs;
84 struct ptlbd_rsp *rsp;
85 struct ptlrpc_request *req;
86 struct ptlrpc_bulk_desc *desc;
87 struct buffer_head *bh;
89 struct obd_brw_set *set;
92 size[0] = sizeof(struct ptlbd_op);
93 size[1] = page_count * sizeof(struct ptlbd_niob);
95 req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
97 GOTO(out, rc = -ENOMEM);
98 /* XXX might not need these */
99 req->rq_request_portal = PTLBD_REQUEST_PORTAL;
100 req->rq_reply_portal = PTLBD_REPLY_PORTAL;
102 op = lustre_msg_buf(req->rq_reqmsg, 0);
103 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
108 op->op_niob_cnt = page_count;
110 op->op_block_cnt = page_count;
112 desc = ptlrpc_prep_bulk(imp->imp_connection);
114 GOTO(out_req, rc = -ENOMEM);
115 desc->bd_portal = PTLBD_BULK_PORTAL;
116 desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
118 /* XXX someone needs to free this */
119 set = obd_brw_set_new();
121 GOTO(out_desc, rc = -ENOMEM);
123 set->brw_callback = ptlbd_brw_callback;
126 xid = get_next_xid(imp);
129 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
131 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
133 GOTO(out_set, rc = -ENOMEM);
139 niob->n_block_nr = bh->b_blocknr;
140 niob->n_offset = bh_offset(bh);
141 niob->n_length = bh->b_size;
146 bulk->bp_buf = bh->b_data;
147 bulk->bp_page = bh->b_page;
148 bulk->bp_buflen = bh->b_size;
153 size[0] = sizeof(struct ptlbd_rsp);
154 size[1] = sizeof(struct ptlbd_niob) * page_count;
155 req->rq_replen = lustre_msg_size(2, size);
157 /* XXX find out how we're really supposed to manage levels */
158 req->rq_level = imp->imp_level;
159 rc = ptlrpc_queue_wait(req);
161 rsp = lustre_msg_buf(req->rq_repmsg, 0);
163 niob = lustre_msg_buf(req->rq_repmsg, 1);
164 /* XXX check that op->num matches ours */
165 for ( bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
166 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
168 GOTO(out_set, rc = -ENOMEM);
170 bulk->bp_xid = niob->n_xid;
171 bulk->bp_page = bh->b_page;
172 bulk->bp_buf = bh->b_data;
173 bulk->bp_buflen = bh->b_size;
176 obd_brw_set_add(set, desc);
177 rc = ptlrpc_send_bulk(desc);
179 /* if there's an error, no brw_finish called, just like
185 obd_brw_set_free(set);
187 ptlrpc_bulk_decref(desc);
189 ptlrpc_req_finished(req);
194 int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
195 struct buffer_head *first_bh, unsigned int page_count)
197 struct obd_import *imp = &ptlbd->bd_import;
199 struct ptlbd_niob *niob, *niobs;
200 struct ptlbd_rsp *rsp;
201 struct ptlrpc_request *req;
202 struct ptlrpc_bulk_desc *desc;
203 struct buffer_head *bh;
204 int rc, rep_size, size[2];
205 struct obd_brw_set *set;
209 size[0] = sizeof(struct ptlbd_op);
210 size[1] = page_count * sizeof(struct ptlbd_niob);
212 req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
214 GOTO(out, rc = -ENOMEM);
215 /* XXX might not need these? */
216 req->rq_request_portal = PTLBD_REQUEST_PORTAL;
217 req->rq_reply_portal = PTLBD_REPLY_PORTAL;
219 op = lustre_msg_buf(req->rq_reqmsg, 0);
220 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
225 op->op_niob_cnt = page_count;
227 op->op_block_cnt = page_count;
229 desc = ptlrpc_prep_bulk(imp->imp_connection);
231 GOTO(out_req, rc = -ENOMEM);
232 desc->bd_portal = PTLBD_BULK_PORTAL;
233 desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
235 /* XXX someone needs to free this */
236 set = obd_brw_set_new();
238 GOTO(out_desc, rc = -ENOMEM);
240 set->brw_callback = ptlbd_brw_callback;
242 xid = get_next_xid(imp);
244 for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
245 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
247 GOTO(out_set, rc = -ENOMEM);
250 niob->n_block_nr = bh->b_blocknr;
251 niob->n_offset = bh_offset(bh);
252 niob->n_length = bh->b_size;
255 bulk->bp_buf = bh->b_data;
256 bulk->bp_page = bh->b_page;
257 bulk->bp_buflen = bh->b_size;
260 /* XXX put in OBD_FAIL_CHECK for ptlbd? */
261 rc = ptlrpc_register_bulk(desc);
265 obd_brw_set_add(set, desc);
267 rep_size = sizeof(struct ptlbd_rsp);
268 req->rq_replen = lustre_msg_size(1, &rep_size);
270 /* XXX find out how we're really supposed to manage levels */
271 req->rq_level = imp->imp_level;
272 rc = ptlrpc_queue_wait(req);
274 rsp = lustre_msg_buf(req->rq_repmsg, 0);
276 /* if there's an error, no brw_finish called, just like
282 obd_brw_set_free(set);
284 ptlrpc_bulk_decref(desc);
286 ptlrpc_req_finished(req);
291 int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
292 struct buffer_head *first_bh)
294 unsigned int page_count = 0;
295 struct buffer_head *bh;
299 for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
304 rc = ptlbd_read_put_req(ptlbd, cmd,
305 first_bh, page_count);
308 rc = ptlbd_write_put_req(ptlbd, cmd,
309 first_bh, page_count);
319 static int ptlbd_bulk_timeout(void *data)
321 /* struct ptlrpc_bulk_desc *desc = data;*/
324 CERROR("ugh, timed out\n");
329 #define SILLY_MAX 2048
330 static struct page *pages[SILLY_MAX] = {NULL,};
332 static struct page * fake_page(int block_nr)
334 if ( block_nr >= SILLY_MAX )
337 if (pages[block_nr] == NULL) {
338 void *vaddr = (void *)get_free_page(GFP_KERNEL);
339 pages[block_nr] = virt_to_page(vaddr);
341 return pages[block_nr];
344 static int ptlbd_put_write(struct ptlrpc_request *req)
346 struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
348 struct ptlbd_niob *reply_niob, *request_niob;
349 struct ptlbd_rsp *rsp;
350 struct ptlrpc_bulk_desc *desc;
351 struct ptlrpc_service *srv;
352 struct l_wait_info lwi;
354 int i, page_count, rc;
357 op = lustre_msg_buf(req->rq_reqmsg, 0);
358 request_niob = lustre_msg_buf(req->rq_reqmsg, 1);
359 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
361 size[0] = sizeof(struct ptlbd_rsp);
362 size[1] = sizeof(struct ptlbd_niob) * page_count;
363 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
366 reply_niob = lustre_msg_buf(req->rq_repmsg, 1);
368 desc = ptlrpc_prep_bulk(req->rq_connection);
370 GOTO(out, rc = -ENOMEM);
371 desc->bd_ptl_ev_hdlr = NULL;
372 desc->bd_portal = PTLBD_BULK_PORTAL;
373 memcpy(&(desc->bd_conn), &conn, sizeof(conn)); /* XXX what? */
375 srv = req->rq_obd->u.ptlbd.ptlbd_service;
376 spin_lock(&srv->srv_lock);
377 xid = srv->srv_xid++; /* single xid for all pages */
378 spin_unlock(&srv->srv_lock);
380 for ( i = 0; i < page_count; i++) {
381 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
383 GOTO(out_desc, rc = -ENOMEM);
385 reply_niob[i] = request_niob[i];
386 reply_niob[i].n_xid = xid;
389 bulk->bp_page = fake_page(request_niob[i].n_block_nr);
390 bulk->bp_buf = page_address(bulk->bp_page);
391 bulk->bp_buflen = request_niob[i].n_length;
394 rc = ptlrpc_register_bulk(desc);
398 rsp = lustre_msg_buf(req->rq_reqmsg, 0);
400 rsp->r_error_cnt = 13;
401 ptlrpc_reply(req->rq_svc, req);
403 /* this synchronization probably isn't good enough */
404 lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
405 rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_RCVD,
409 ptlrpc_free_bulk(desc);
414 static int ptlbd_put_read(struct ptlrpc_request *req)
417 struct ptlbd_niob *niob, *niobs;
418 struct ptlbd_rsp *rsp;
419 struct ptlrpc_bulk_desc *desc;
420 struct l_wait_info lwi;
422 int i, page_count, rc;
424 op = lustre_msg_buf(req->rq_reqmsg, 0);
425 niobs = lustre_msg_buf(req->rq_reqmsg, 1);
426 page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
428 desc = ptlrpc_prep_bulk(req->rq_connection);
430 GOTO(out, rc = -ENOMEM);
431 desc->bd_portal = PTLBD_BULK_PORTAL;
433 for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
434 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
436 GOTO(out_bulk, rc = -ENOMEM);
439 * XXX what about the block number?
441 bulk->bp_xid = niob->n_xid;
442 bulk->bp_page = fake_page(niob->n_block_nr);
443 bulk->bp_buf = page_address(bulk->bp_page);
444 bulk->bp_buflen = niob->n_length;
447 rc = ptlrpc_send_bulk(desc);
451 /* this synchronization probably isn't good enough */
452 lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
453 rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT,
456 size[0] = sizeof(struct ptlbd_rsp);
457 rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
461 rsp = lustre_msg_buf(req->rq_repmsg, 0);
463 GOTO(out, rc = -EINVAL);
465 rsp->r_error_cnt = 42;
468 req->rq_status = 0; /* XXX */
469 ptlrpc_reply(req->rq_svc, req);
472 ptlrpc_free_bulk(desc);
478 int ptlbd_parse_req(struct ptlrpc_request *req)
484 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
488 op = lustre_msg_buf(req->rq_reqmsg, 0);
495 ptlbd_put_write(req);
498 CERROR("fix this %d\n", op->op_cmd);
507 int ptlbd_bh_req(int cmd, struct ptlbd_state *st, struct buffer_head *first_bh)
509 struct obd_brw_set *set = NULL;
510 struct brw_page *pg = NULL;
511 struct buffer_head *bh;
512 int rc, i, pg_bytes = 0;
515 for ( bh = first_bh ; bh ; bh = bh->b_reqnext )
516 pg_bytes += sizeof(struct brw_page);
518 OBD_ALLOC(pg, pg_bytes);
520 GOTO(out, rc = -ENOMEM);
522 set = obd_brw_set_new();
524 GOTO(out, rc = -ENOMEM);
526 for ( i = 0, bh = first_bh ; bh ; bh = bh->b_reqnext, i++) {
527 pg[i].pg = bh->b_page;
528 pg[i].off = bh_offset(bh);
529 pg[i].count = bh->b_size;
533 set->brw_callback = ll_brw_sync_wait;
534 rc = obd_brw(cmd, /* lsm */NULL, num_pages, pg, set);
538 rc = ll_brw_sync_wait(set, CB_PHASE_START);
540 CERROR("error from callback: rc = %d\n", rc);
544 OBD_FREE(pg, pg_bytes);
546 obd_brw_set_free(set);