/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (c) 2002 Cluster File Systems, Inc.
+ * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ * Author: Zach Brown <zab@clusterfs.com>
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <linux/lprocfs_status.h>
#include <linux/obd_ptlbd.h>
-static __u32 get_next_xid(struct obd_import *imp)
+int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
+ struct buffer_head *first_bh)
{
- unsigned long flags;
- __u32 xid;
- spin_lock_irqsave(&imp->imp_lock, flags);
- xid = ++imp->imp_last_xid;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- return xid;
-}
-
-static int ptlbd_brw_callback(struct obd_brw_set *set, int phase)
-{
- ENTRY;
- RETURN(0);
-}
-
-static void decref_bulk_desc(void *data)
-{
- struct ptlrpc_bulk_desc *desc = data;
- ENTRY;
-
- ptlrpc_bulk_decref(desc);
- EXIT;
-}
-
-/* this is the callback function which is invoked by the Portals
- * event handler associated with the bulk_sink queue and bulk_source queue.
- */
-static void ptlbd_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
-{
- ENTRY;
-
- LASSERT(desc->bd_brw_set != NULL);
- LASSERT(desc->bd_brw_set->brw_callback != NULL);
-
- desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
-
- prepare_work(&desc->bd_queue, decref_bulk_desc, desc);
- schedule_work(&desc->bd_queue);
-
- EXIT;
-}
-
-
-int ptlbd_write_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
- struct buffer_head *first_bh, unsigned int page_count)
-{
- struct obd_import *imp = &ptlbd->bd_import;
+ struct obd_import *imp = ptlbd->bd_import;
struct ptlbd_op *op;
struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
struct buffer_head *bh;
- int rc, size[2];
- struct obd_brw_set *set;
+ unsigned int page_count;
+ int rc, rep_size, size[2];
ENTRY;
+ LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
+
+ for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
+ page_count++;
+
size[0] = sizeof(struct ptlbd_op);
size[1] = page_count * sizeof(struct ptlbd_niob);
- req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 2, size, NULL);
if (!req)
- GOTO(out, rc = -ENOMEM);
- /* XXX might not need these */
- req->rq_request_portal = PTLBD_REQUEST_PORTAL;
- req->rq_reply_portal = PTLBD_REPLY_PORTAL;
+ RETURN(rc = 1); /* need to return error cnt */
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+ niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
/* XXX pack */
op->op_cmd = cmd;
op->op__padding = 0;
op->op_block_cnt = page_count;
- desc = ptlrpc_prep_bulk(imp->imp_connection);
+ if (cmd == PTLBD_READ)
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_PUT_SINK, PTLBD_BULK_PORTAL);
+ else
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
if ( desc == NULL )
- GOTO(out_req, rc = -ENOMEM);
- desc->bd_portal = PTLBD_BULK_PORTAL;
- desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
-
- /* XXX someone needs to free this */
- set = obd_brw_set_new();
- if (set == NULL)
- GOTO(out_desc, rc = -ENOMEM);
-
- set->brw_callback = ptlbd_brw_callback;
-
-#if 0
- xid = get_next_xid(imp);
-#endif
-
- for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
-#if 0
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_set, rc = -ENOMEM);
-#endif
-
-#if 0
- niob->n_xid = xid;
-#endif
+ GOTO(out, rc = 1); /* need to return error cnt */
+ /* NB req now owns desc, and frees it when she frees herself */
+
+ for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
+ ptlrpc_prep_bulk_page(desc, bh->b_page,
+ bh_offset (bh) & (PAGE_SIZE - 1),
+ bh->b_size);
+
niob->n_block_nr = bh->b_blocknr;
niob->n_offset = bh_offset(bh);
niob->n_length = bh->b_size;
-
-
-#if 0
- bulk->bp_xid = xid;
- bulk->bp_buf = bh->b_data;
- bulk->bp_page = bh->b_page;
- bulk->bp_buflen = bh->b_size;
-#endif
}
-
- size[0] = sizeof(struct ptlbd_rsp);
- size[1] = sizeof(struct ptlbd_niob) * page_count;
- req->rq_replen = lustre_msg_size(2, size);
+ rep_size = sizeof(struct ptlbd_rsp);
+ req->rq_replen = lustre_msg_size(1, &rep_size);
/* XXX find out how we're really supposed to manage levels */
- req->rq_level = imp->imp_level;
+ req->rq_send_state = imp->imp_state;
rc = ptlrpc_queue_wait(req);
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
+ if ( rc != 0 )
+ GOTO(out, rc = 1); /* need to return error count */
- niob = lustre_msg_buf(req->rq_repmsg, 1);
- /* XXX check that op->num matches ours */
- for ( bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_set, rc = -ENOMEM);
-
- bulk->bp_xid = niob->n_xid;
- bulk->bp_page = bh->b_page;
- bulk->bp_buf = bh->b_data;
- bulk->bp_buflen = bh->b_size;
+ rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+ lustre_swab_ptlbd_rsp);
+ if (rsp == NULL) {
+ CERROR ("can't unpack response\n");
+ GOTO (out, rc = 1); /* need to return error count */
+ }
+ else if (rsp->r_status != 0) {
+ rc = rsp->r_error_cnt;
}
- obd_brw_set_add(set, desc);
- rc = ptlrpc_send_bulk(desc);
-
- /* if there's an error, no brw_finish called, just like
- * osc_brw_read */
-
- GOTO(out_req, rc);
-
-out_set:
- obd_brw_set_free(set);
-out_desc:
- ptlrpc_bulk_decref(desc);
-out_req:
- ptlrpc_req_finished(req);
out:
+ ptlrpc_req_finished(req);
RETURN(rc);
}
-int ptlbd_read_put_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
- struct buffer_head *first_bh, unsigned int page_count)
+
+int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
{
- struct obd_import *imp = &ptlbd->bd_import;
+ struct obd_import *imp = ptlbd->bd_import;
struct ptlbd_op *op;
- struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
struct ptlrpc_request *req;
- struct ptlrpc_bulk_desc *desc;
- struct buffer_head *bh;
- int rc, rep_size, size[2];
- struct obd_brw_set *set;
- __u32 xid;
+ int rc, rep_size, size[1];
ENTRY;
+ LASSERT(cmd == PTLBD_FLUSH);
+
size[0] = sizeof(struct ptlbd_op);
- size[1] = page_count * sizeof(struct ptlbd_niob);
- req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 1, size, NULL);
if (!req)
- GOTO(out, rc = -ENOMEM);
- /* XXX might not need these? */
- req->rq_request_portal = PTLBD_REQUEST_PORTAL;
- req->rq_reply_portal = PTLBD_REPLY_PORTAL;
+ RETURN(-ENOMEM);
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
/* XXX pack */
op->op_cmd = cmd;
op->op_lun = 0;
- op->op_niob_cnt = page_count;
+ op->op_niob_cnt = 0;
op->op__padding = 0;
- op->op_block_cnt = page_count;
-
- desc = ptlrpc_prep_bulk(imp->imp_connection);
- if ( desc == NULL )
- GOTO(out_req, rc = -ENOMEM);
- desc->bd_portal = PTLBD_BULK_PORTAL;
- desc->bd_ptl_ev_hdlr = ptlbd_ptl_ev_hdlr;
-
- /* XXX someone needs to free this */
- set = obd_brw_set_new();
- if (set == NULL)
- GOTO(out_desc, rc = -ENOMEM);
-
- set->brw_callback = ptlbd_brw_callback;
-
- xid = get_next_xid(imp);
-
- for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_set, rc = -ENOMEM);
-
- niob->n_xid = xid;
- niob->n_block_nr = bh->b_blocknr;
- niob->n_offset = bh_offset(bh);
- niob->n_length = bh->b_size;
-
- bulk->bp_xid = xid;
- bulk->bp_buf = bh->b_data;
- bulk->bp_page = bh->b_page;
- bulk->bp_buflen = bh->b_size;
- }
-
- /* XXX put in OBD_FAIL_CHECK for ptlbd? */
- rc = ptlrpc_register_bulk(desc);
- if (rc)
- GOTO(out_set, rc);
-
- obd_brw_set_add(set, desc);
+ op->op_block_cnt = 0;
rep_size = sizeof(struct ptlbd_rsp);
req->rq_replen = lustre_msg_size(1, &rep_size);
/* XXX find out how we're really supposed to manage levels */
- req->rq_level = imp->imp_level;
- rc = ptlrpc_queue_wait(req);
+ req->rq_send_state = imp->imp_state;
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
-
- /* if there's an error, no brw_finish called, just like
- * osc_brw_read */
-
- GOTO(out_req, rc);
+ rc = ptlrpc_queue_wait(req);
+ if ( rc != 0 )
+ GOTO(out_req, rc = 1);
+ rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+ lustre_swab_ptlbd_rsp);
+ if (rsp->r_status != 0)
+ rc = rsp->r_status;
-out_set:
- obd_brw_set_free(set);
-out_desc:
- ptlrpc_bulk_decref(desc);
out_req:
ptlrpc_req_finished(req);
-out:
RETURN(rc);
}
-int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
- struct buffer_head *first_bh)
-{
- unsigned int page_count = 0;
- struct buffer_head *bh;
- int rc;
- ENTRY;
-
- for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
- page_count++;
- switch (cmd) {
- case PTLBD_READ:
- rc = ptlbd_read_put_req(ptlbd, cmd,
- first_bh, page_count);
- break;
- case PTLBD_WRITE:
- rc = ptlbd_write_put_req(ptlbd, cmd,
- first_bh, page_count);
- break;
- default:
- rc = -EINVAL;
- break;
- };
-
- RETURN(rc);
-}
-
-static int ptlbd_bulk_timeout(void *data)
+int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
+ int page_count, struct list_head *page_list)
{
-/* struct ptlrpc_bulk_desc *desc = data;*/
+ mm_segment_t old_fs;
+ struct list_head *pos;
+ int status = 0;
ENTRY;
- CERROR("ugh, timed out\n");
-
- RETURN(1);
+ old_fs = get_fs();
+ set_fs(KERNEL_DS);
+
+ list_for_each(pos, page_list) {
+ ssize_t ret;
+ struct page *page = list_entry(pos, struct page, list);
+ loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
+ niobs->n_offset;
+ if ( op == PTLBD_READ )
+ ret = filp->f_op->read(filp, page_address(page),
+ niobs->n_length, &offset);
+ else
+ ret = filp->f_op->write(filp, page_address(page),
+ niobs->n_length, &offset);
+ if (ret != niobs->n_length) {
+ status = ret;
+ break;
+ }
+ niobs++;
+ }
+ set_fs(old_fs);
+ RETURN(status);
}
-#define SILLY_MAX 2048
-static struct page *pages[SILLY_MAX] = {NULL,};
-static struct page * fake_page(int block_nr)
+int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index,
+ struct ptlrpc_request *req, int swab)
{
- if ( block_nr >= SILLY_MAX )
- return NULL;
-
- if (pages[block_nr] == NULL) {
- void *vaddr = (void *)get_free_page(GFP_KERNEL);
- pages[block_nr] = virt_to_page(vaddr);
- }
- return pages[block_nr];
-}
-
-static int ptlbd_put_write(struct ptlrpc_request *req)
-{
- struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
- struct ptlbd_op *op;
- struct ptlbd_niob *reply_niob, *request_niob;
+ struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
- struct ptlrpc_bulk_desc *desc;
- struct ptlrpc_service *srv;
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
struct l_wait_info lwi;
- int size[2];
- int i, page_count, rc;
- __u32 xid;
+ int size[1], i, page_count, rc = 0, error_cnt = 0;
+ struct list_head *pos, *n;
+ struct page *page;
+ LIST_HEAD(tmp_pages);
+ ENTRY;
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- request_niob = lustre_msg_buf(req->rq_reqmsg, 1);
- page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
+ niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
+ lustre_swab_ptlbd_niob);
+ if (niobs == NULL)
+ GOTO (out, rc = -EFAULT);
size[0] = sizeof(struct ptlbd_rsp);
- size[1] = sizeof(struct ptlbd_niob) * page_count;
- rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- GOTO(out, rc);
- reply_niob = lustre_msg_buf(req->rq_repmsg, 1);
-
- desc = ptlrpc_prep_bulk(req->rq_connection);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
- desc->bd_ptl_ev_hdlr = NULL;
- desc->bd_portal = PTLBD_BULK_PORTAL;
- memcpy(&(desc->bd_conn), &conn, sizeof(conn)); /* XXX what? */
-
- srv = req->rq_obd->u.ptlbd.ptlbd_service;
- spin_lock(&srv->srv_lock);
- xid = srv->srv_xid++; /* single xid for all pages */
- spin_unlock(&srv->srv_lock);
-
- for ( i = 0; i < page_count; i++) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_desc, rc = -ENOMEM);
-
- reply_niob[i] = request_niob[i];
- reply_niob[i].n_xid = xid;
-
- bulk->bp_xid = xid;
- bulk->bp_page = fake_page(request_niob[i].n_block_nr);
- bulk->bp_buf = page_address(bulk->bp_page);
- bulk->bp_buflen = request_niob[i].n_length;
- }
-
- rc = ptlrpc_register_bulk(desc);
+ rc = lustre_pack_reply(req, 1, size, NULL);
if ( rc )
- GOTO(out_desc, rc);
-
- rsp = lustre_msg_buf(req->rq_reqmsg, 0);
- rsp->r_status = 42;
- rsp->r_error_cnt = 13;
- ptlrpc_reply(req->rq_svc, req);
-
- /* this synchronization probably isn't good enough */
- lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_RCVD,
- &lwi);
-
-out_desc:
- ptlrpc_free_bulk(desc);
-out:
- RETURN(rc);
-}
+ GOTO(out, rc);
-static int ptlbd_put_read(struct ptlrpc_request *req)
-{
- struct ptlbd_op *op;
- struct ptlbd_niob *niob, *niobs;
- struct ptlbd_rsp *rsp;
- struct ptlrpc_bulk_desc *desc;
- struct l_wait_info lwi;
- int size[1];
- int i, page_count, rc;
+ rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
+ if ( rsp == NULL )
+ GOTO (out, rc = -EFAULT);
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+ /* FIXME: assumes each niobuf fits in 1 page */
page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
-
- desc = ptlrpc_prep_bulk(req->rq_connection);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
+ if (swab) { /* swab remaining niobs */
+ for (i = 1; i < page_count; i++)
+ lustre_swab_ptlbd_niob(&niobs[i]);
+ }
+ if (req->rq_export == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -EFAULT);
+ }
+
+ if (cmd == PTLBD_READ)
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
+ else
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_GET_SINK, PTLBD_BULK_PORTAL);
+ if (desc == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -ENOMEM);
+ }
desc->bd_portal = PTLBD_BULK_PORTAL;
+ LASSERT (page_count > 0);
for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
-
- /*
- * XXX what about the block number?
- */
- bulk->bp_xid = niob->n_xid;
- bulk->bp_page = fake_page(niob->n_block_nr);
- bulk->bp_buf = page_address(bulk->bp_page);
- bulk->bp_buflen = niob->n_length;
+ page = alloc_page(GFP_KERNEL);
+ if (page == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -ENOMEM);
+ }
+ list_add_tail(&page->list, &tmp_pages);
+
+ ptlrpc_prep_bulk_page(desc, page,
+ niob->n_offset & (PAGE_SIZE - 1),
+ niob->n_length);
}
- rc = ptlrpc_send_bulk(desc);
- if ( rc )
- GOTO(out_bulk, rc);
+ if ( cmd == PTLBD_READ ) {
+ rc = ptlbd_do_filp(filp, PTLBD_READ, niobs,
+ page_count, &tmp_pages);
+ if (rc < 0) {
+ error_cnt++;
+ GOTO(out_reply, rc);
+ }
+ }
+ rc = ptlrpc_start_bulk_transfer(desc);
- /* this synchronization probably isn't good enough */
- lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT,
- &lwi);
+ if ( rc ) {
+ error_cnt++;
+ GOTO(out_reply, rc);
+ }
- size[0] = sizeof(struct ptlbd_rsp);
- rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
- if ( rc )
- GOTO(out, rc);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+ if (rc != 0) {
+ LASSERT(rc == -ETIMEDOUT);
+ ptlrpc_abort_bulk(desc);
+ error_cnt++;
+ GOTO(out_reply, rc);
+ }
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
- if ( rsp == NULL )
- GOTO(out, rc = -EINVAL);
+ /* XXX do some error handling */
+ LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
+
+ if ( cmd == PTLBD_WRITE ) {
+ if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
+ page_count, &tmp_pages)) < 0) {
+ error_cnt++;
+ }
+ }
- rsp->r_error_cnt = 42;
- rsp->r_status = 69;
+out_reply:
+ rsp->r_error_cnt = error_cnt;
+ rsp->r_status = rc;
+ req->rq_status = rc;
- req->rq_status = 0; /* XXX */
- ptlrpc_reply(req->rq_svc, req);
+ ptlrpc_reply(req);
-out_bulk:
- ptlrpc_free_bulk(desc);
+ list_for_each_safe(pos, n, &tmp_pages) {
+ struct page *page = list_entry(pos, struct page, list);
+ list_del(&page->list);
+ __free_page(page);
+ }
+ if (desc)
+ ptlrpc_free_bulk(desc);
out:
RETURN(rc);
}
-int ptlbd_parse_req(struct ptlrpc_request *req)
+int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index,
+ struct ptlrpc_request *req)
{
- struct ptlbd_op *op;
- int rc;
+ struct ptlbd_rsp *rsp;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
+ int size[1], rc, status;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+ size[0] = sizeof(struct ptlbd_rsp);
+ rc = lustre_pack_reply(req, 1, size, NULL);
if ( rc )
RETURN(rc);
- op = lustre_msg_buf(req->rq_reqmsg, 0);
+ rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
+ if ( rsp == NULL )
+ RETURN(-EINVAL);
- switch(op->op_cmd) {
- case PTLBD_READ:
- ptlbd_put_read(req);
- break;
- case PTLBD_WRITE:
- ptlbd_put_write(req);
- break;
- default:
- CERROR("fix this %d\n", op->op_cmd);
- break;
- }
+ if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
+ (filp->f_dentry))
+ GOTO(out_reply, status = -EINVAL);
+ status = filp->f_op->fsync(filp, filp->f_dentry, 1);
+
+out_reply:
+ rsp->r_error_cnt = 0;
+ rsp->r_status = status;
+ req->rq_status = 0;
+
+ ptlrpc_reply(req);
RETURN(0);
}
-#if 0
-int ptlbd_bh_req(int cmd, struct ptlbd_state *st, struct buffer_head *first_bh)
+int ptlbd_handle(struct ptlrpc_request *req)
{
- struct obd_brw_set *set = NULL;
- struct brw_page *pg = NULL;
- struct buffer_head *bh;
- int rc, i, pg_bytes = 0;
+ struct ptlbd_op *op;
+ int swab;
+ int rc;
ENTRY;
- for ( bh = first_bh ; bh ; bh = bh->b_reqnext )
- pg_bytes += sizeof(struct brw_page);
+ swab = lustre_msg_swabbed (req->rq_reqmsg);
- OBD_ALLOC(pg, pg_bytes);
- if ( pg == NULL )
- GOTO(out, rc = -ENOMEM);
-
- set = obd_brw_set_new();
- if (set == NULL)
- GOTO(out, rc = -ENOMEM);
-
- for ( i = 0, bh = first_bh ; bh ; bh = bh->b_reqnext, i++) {
- pg[i].pg = bh->b_page;
- pg[i].off = bh_offset(bh);
- pg[i].count = bh->b_size;
- pg[i].flag = 0;
+ if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
+ rc = target_handle_connect(req);
+ target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+ RETURN(0);
}
+ if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
+ rc = target_handle_disconnect(req);
+ target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+ RETURN(0);
+ }
+ op = lustre_swab_reqbuf (req, 0, sizeof (*op),
+ lustre_swab_ptlbd_op);
+ if (op == NULL)
+ RETURN(-EFAULT);
- set->brw_callback = ll_brw_sync_wait;
- rc = obd_brw(cmd, /* lsm */NULL, num_pages, pg, set);
- if ( rc )
- GOTO(out, rc);
-
- rc = ll_brw_sync_wait(set, CB_PHASE_START);
- if (rc)
- CERROR("error from callback: rc = %d\n", rc);
+ switch (op->op_cmd) {
+ case PTLBD_READ:
+ case PTLBD_WRITE:
+ rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req,
+ swab);
+ break;
-out:
- if ( pg != NULL )
- OBD_FREE(pg, pg_bytes);
- if ( set != NULL )
- obd_brw_set_free(set);
+ case PTLBD_FLUSH:
+ rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);
+ break;
+ default:
+ rc = -EINVAL;
+ }
- RETURN(rc);
+ RETURN(rc);
}
-#endif