X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fost%2Fost_handler.c;h=f8c131741760ed0532bf5020a62f6cd7a17b0bb7;hp=6e87dd046e7b35b8839592960f090ca4809540a8;hb=00102f342959b094f035b618c0b7acf477de35b4;hpb=96ec9477e8e47300815282710bfbe4b175574a47 diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 6e87dd0..f8c1317 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1,23 +1,24 @@ /* - * linux/mds/handler.c + * ost/ost_handler.c + * Storage Target Handling functions * * Lustre Object Server Module (OST) * - * Copyright (C) 2001 Cluster File Systems, Inc. + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution * * by Peter Braam * - * This server is single threaded at present (but can easily be multi threaded). - * For testing and management it is treated as an obd_device, although it does - * not export a full OBD method table (the requests are coming in over the wire, - * so object target modules do not have a full method table.) + * This server is single threaded at present (but can easily be multi + * threaded). For testing and management it is treated as an + * obd_device, although it does not export a full OBD method table + * (the requests are coming in over the wire, so object target + * modules do not have a full method table.) * */ - #define EXPORT_SYMTAB #include @@ -28,6 +29,9 @@ #include #include #include + +#define DEBUG_SUBSYSTEM S_OST + #include #include #include @@ -37,9 +41,9 @@ #include // for testing -static int ost_queue_req(struct obd_device *obddev, struct ost_request *req) +static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req) { - struct ost_request *srv_req; + struct ptlrpc_request *srv_req; struct ost_obd *ost = &obddev->u.ost; if (!ost) { @@ -47,19 +51,23 @@ static int ost_queue_req(struct obd_device *obddev, struct ost_request *req) return -1; } - srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); + OBD_ALLOC(srv_req, sizeof(*srv_req)); if (!srv_req) { EXIT; return -ENOMEM; } - printk("---> OST at %d %p, incoming req %p, srv_req %p\n", + CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n", __LINE__, ost, req, srv_req); memset(srv_req, 0, sizeof(*req)); + + /* move the request buffer */ srv_req->rq_reqbuf = req->rq_reqbuf; srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = ost; + srv_req->rq_ost = ost; + + /* remember where it came from */ srv_req->rq_reply_handle = req; list_add(&srv_req->rq_list, &ost->ost_reqs); @@ -67,44 +75,45 @@ static int ost_queue_req(struct obd_device *obddev, struct ost_request *req) return 0; } - -/* XXX replace with networking code */ -int ost_reply(struct obd_device *obddev, struct ost_request *req) +int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req) { - struct ost_request *clnt_req = req->rq_reply_handle; + struct ptlrpc_request *clnt_req = req->rq_reply_handle; ENTRY; - printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); - /* free the request buffer */ - kfree(req->rq_reqbuf); - req->rq_reqbuf = NULL; - - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; + if (req->rq_ost->ost_service != NULL) { + /* This is a request that came from the network via portals. */ - printk("---> client req %p repbuf %p len %d status %d\n", - clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, - req->rq_rephdr->status); + /* FIXME: we need to increment the count of handled events */ + ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0); + } else { + /* This is a local request that came from another thread. */ + + /* move the reply to the client */ + clnt_req->rq_replen = req->rq_replen; + clnt_req->rq_repbuf = req->rq_repbuf; + req->rq_repbuf = NULL; + req->rq_replen = 0; + + /* free the request buffer */ + OBD_FREE(req->rq_reqbuf, req->rq_reqlen); + req->rq_reqbuf = NULL; + + /* wake up the client */ + wake_up_interruptible(&clnt_req->rq_wait_for_rep); + } - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the server request */ - kfree(req); - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); EXIT; return 0; } -int ost_error(struct obd_device *obddev, struct ost_request *req) +int ost_error(struct obd_device *obddev, struct ptlrpc_request *req) { - struct ost_rep_hdr *hdr; + struct ptlrep_hdr *hdr; ENTRY; - hdr = kmalloc(sizeof(*hdr), GFP_KERNEL); + + OBD_ALLOC(hdr, sizeof(*hdr)); if (!hdr) { EXIT; return -ENOMEM; @@ -115,117 +124,119 @@ int ost_error(struct obd_device *obddev, struct ost_request *req) hdr->seqno = req->rq_reqhdr->seqno; hdr->status = req->rq_status; hdr->type = OST_TYPE_ERR; + req->rq_repbuf = (char *)hdr; + req->rq_replen = sizeof(*hdr); EXIT; return ost_reply(obddev, req); } -static int ost_destroy(struct ost_obd *ost, struct ost_request *req) +static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_destroy: cannot pack reply\n"); return rc; } - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy - (&conn, &req->rq_req->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy + (&conn, &req->rq_req.ost->oa); EXIT; return 0; } -static int ost_getattr(struct ost_obd *ost, struct ost_request *req) +static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - printk("ost getattr entered\n"); - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_getattr: cannot pack reply\n"); return rc; } - req->rq_rep->oa.o_id = req->rq_req->oa.o_id; - req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid; + req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; + req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr + (&conn, &req->rq_rep.ost->oa); EXIT; return 0; } -static int ost_create(struct ost_obd *ost, struct ost_request *req) +static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_create: cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa)); + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa)); - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create + (&conn, &req->rq_rep.ost->oa); EXIT; return 0; } -static int ost_setattr(struct ost_obd *ost, struct ost_request *req) +static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_setattr: cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa)); + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, + sizeof(req->rq_req.ost->oa)); - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr + (&conn, &req->rq_rep.ost->oa); EXIT; return 0; } -static int ost_connect(struct ost_obd *ost, struct ost_request *req) +static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; @@ -234,24 +245,23 @@ static int ost_connect(struct ost_obd *ost, struct ost_request *req) conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_setattr: cannot pack reply\n"); return rc; } - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn); - printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, + CDEBUG(0, "ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id); - req->rq_rep->connid = conn.oc_id; + req->rq_rep.ost->connid = conn.oc_id; EXIT; return 0; } - -static int ost_disconnect(struct ost_obd *ost, struct ost_request *req) +static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; @@ -259,38 +269,39 @@ static int ost_disconnect(struct ost_obd *ost, struct ost_request *req) ENTRY; conn.oc_dev = ost->ost_tgt; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_setattr: cannot pack reply\n"); return rc; } - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn); EXIT; return 0; } -static int ost_get_info(struct ost_obd *ost, struct ost_request *req) +static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; int vallen; void *val; + char *ptr; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info - (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); - + ptr = ost_req_buf1(req->rq_req.ost); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info + (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); - rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep, + rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_setattr: cannot pack reply\n"); @@ -301,19 +312,96 @@ static int ost_get_info(struct ost_obd *ost, struct ost_request *req) return 0; } +int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) +{ + struct obd_conn conn; + int rc; + int i, j; + int objcount, niocount; + char *tmp1, *tmp2, *end2; + char *res; + int cmd; + struct niobuf *nb, *src, *dst; + struct obd_ioobj *ioo; + struct ost_req *r = req->rq_req.ost; + ENTRY; + + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + end2 = tmp2 + req->rq_req.ost->buflen2; + objcount = r->buflen1 / sizeof(*ioo); + niocount = r->buflen2 / sizeof(*nb); + cmd = r->cmd; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = req->rq_ost->ost_tgt; + + rc = ost_pack_rep(NULL, niocount, NULL, 0, + &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + printk("ost_create: cannot pack reply\n"); + return rc; + } + res = ost_rep_buf1(req->rq_rep.ost); + + for (i=0; i < objcount; i++) { + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; + } + for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { + ost_unpack_niobuf((void *)&tmp2, &nb); + } + } + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + req->rq_rep.ost->result = + req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)tmp2, (struct niobuf *)res); + + if (cmd == OBD_BRW_WRITE) { + for (i=0; iaddr, + (void *)(unsigned long)src->addr, + src->len); + } + } else { + for (i=0; iaddr, + (void *)(unsigned long)src->addr, + PAGE_SIZE); + } + } + + req->rq_rep.ost->result = + req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)res); + + EXIT; + return 0; +} -//int ost_handle(struct ost_conn *conn, int len, char *buf) -int ost_handle(struct obd_device *obddev, struct ost_request *req) +int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req) { int rc; struct ost_obd *ost = &obddev->u.ost; - struct ost_req_hdr *hdr; + struct ptlreq_hdr *hdr; ENTRY; - printk("ost_handle: req at %p\n", req); + CDEBUG(0, "req at %p\n", req); - hdr = (struct ost_req_hdr *)req->rq_reqbuf; + hdr = (struct ptlreq_hdr *)req->rq_reqbuf; if (NTOH__u32(hdr->type) != OST_TYPE_REQ) { printk("lustre_ost: wrong packet type sent %d\n", NTOH__u32(hdr->type)); @@ -322,7 +410,7 @@ int ost_handle(struct obd_device *obddev, struct ost_request *req) } rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req); + &req->rq_reqhdr, &req->rq_req.ost); if (rc) { printk("lustre_ost: Invalid request\n"); EXIT; @@ -333,7 +421,6 @@ int ost_handle(struct obd_device *obddev, struct ost_request *req) case OST_CONNECT: CDEBUG(D_INODE, "connect\n"); - printk("----> connect \n"); rc = ost_connect(ost, req); break; case OST_DISCONNECT: @@ -360,13 +447,17 @@ int ost_handle(struct obd_device *obddev, struct ost_request *req) CDEBUG(D_INODE, "setattr\n"); rc = ost_setattr(ost, req); break; - + case OST_BRW: + CDEBUG(D_INODE, "brw\n"); + rc = ost_brw(ost, req); + break; default: + req->rq_status = -ENOTSUPP; return ost_error(obddev, req); } out: - req->rq_rephdr->status = rc; + req->rq_status = rc; if (rc) { printk("ost: processing error %d\n", rc); ost_error(obddev, req); @@ -383,65 +474,87 @@ int ost_main(void *arg) struct obd_device *obddev = (struct obd_device *) arg; struct ost_obd *ost = &obddev->u.ost; ENTRY; - printk("---> %d\n", __LINE__); - lock_kernel(); - printk("---> %d\n", __LINE__); daemonize(); - printk("---> %d\n", __LINE__); spin_lock_irq(¤t->sigmask_lock); - printk("---> %d\n", __LINE__); sigfillset(¤t->blocked); - printk("---> %d\n", __LINE__); recalc_sigpending(current); - printk("---> %d\n", __LINE__); spin_unlock_irq(¤t->sigmask_lock); - printk("---> %d\n", __LINE__); - printk("---> %d\n", __LINE__); sprintf(current->comm, "lustre_ost"); - printk("---> %d\n", __LINE__); /* Record that the thread is running */ ost->ost_thread = current; - printk("---> %d\n", __LINE__); wake_up(&ost->ost_done_waitq); - printk("---> %d\n", __LINE__); /* XXX maintain a list of all managed devices: insert here */ /* And now, wait forever for commit wakeup events. */ while (1) { - struct ost_request *request; int rc; if (ost->ost_flags & OST_EXIT) break; - wake_up(&ost->ost_done_waitq); interruptible_sleep_on(&ost->ost_waitq); CDEBUG(D_INODE, "lustre_ost wakes\n"); CDEBUG(D_INODE, "pick up req here and continue\n"); - if (list_empty(&ost->ost_reqs)) { - CDEBUG(D_INODE, "woke because of timer\n"); - } else { - printk("---> %d\n", __LINE__); - request = list_entry(ost->ost_reqs.next, - struct ost_request, rq_list); - printk("---> %d\n", __LINE__); - list_del(&request->rq_list); - rc = ost_handle(obddev, request); + + if (ost->ost_service != NULL) { + ptl_event_t ev; + + while (1) { + struct ptlrpc_request request; + struct ptlrpc_service *service; + + rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); + if (rc != PTL_OK && rc != PTL_EQ_DROPPED) + break; + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + + /* FIXME: If we move to an event-driven model, + * we should put the request on the stack of + * mds_handle instead. */ + memset(&request, 0, sizeof(request)); + request.rq_reqbuf = ev.mem_desc.start + + ev.offset; + request.rq_reqlen = ev.mem_desc.length; + request.rq_ost = ost; + request.rq_xid = ev.match_bits; + + request.rq_peer.peer_nid = ev.initiator.nid; + /* FIXME: this NI should be the incoming NI. + * We don't know how to find that from here. */ + request.rq_peer.peer_ni = + ost->ost_service->srv_self.peer_ni; + rc = ost_handle(obddev, &request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); + } + } else { + struct ptlrpc_request *request; + + if (list_empty(&ost->ost_reqs)) { + CDEBUG(D_INODE, "woke because of timer\n"); + } else { + request = list_entry(ost->ost_reqs.next, + struct ptlrpc_request, + rq_list); + list_del(&request->rq_list); + rc = ost_handle(obddev, request); + } } } /* XXX maintain a list of all managed devices: cleanup here */ - printk("---> %d\n", __LINE__); + ost->ost_thread = NULL; - printk("---> %d\n", __LINE__); wake_up(&ost->ost_done_waitq); printk("lustre_ost: exiting\n"); return 0; @@ -463,15 +576,11 @@ static void ost_start_srv_thread(struct obd_device *obd) ENTRY; init_waitqueue_head(&ost->ost_waitq); - printk("---> %d\n", __LINE__); init_waitqueue_head(&ost->ost_done_waitq); - printk("---> %d\n", __LINE__); kernel_thread(ost_main, (void *)obd, CLONE_VM | CLONE_FS | CLONE_FILES); - printk("---> %d\n", __LINE__); while (!ost->ost_thread) sleep_on(&ost->ost_done_waitq); - printk("---> %d\n", __LINE__); EXIT; } @@ -483,6 +592,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, struct obd_ioctl_data* data = buf; struct ost_obd *ost = &obddev->u.ost; struct obd_device *tgt; + struct lustre_peer peer; int err; ENTRY; @@ -513,7 +623,20 @@ static int ost_setup(struct obd_device *obddev, obd_count len, ost->ost_thread = NULL; ost->ost_flags = 0; - spin_lock_init(&obddev->u.ost.fo_lock); + spin_lock_init(&obddev->u.ost.ost_lock); + + err = kportal_uuid_to_peer("self", &peer); + if (err == 0) { + OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service)); + if (ost->ost_service == NULL) + return -ENOMEM; + ost->ost_service->srv_buf_size = 64 * 1024; + ost->ost_service->srv_portal = OST_REQUEST_PORTAL; + memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer)); + ost->ost_service->srv_wait_queue = &ost->ost_waitq; + + rpc_register_service(ost->ost_service, "self"); + } ost_start_srv_thread(obddev); @@ -542,6 +665,8 @@ static int ost_cleanup(struct obd_device * obddev) } ost_stop_srv_thread(ost); + rpc_unregister_service(ost->ost_service); + OBD_FREE(ost->ost_service, sizeof(*ost->ost_service)); if (!list_empty(&ost->ost_reqs)) { // XXX reply with errors and clean up