X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fost%2Fost_handler.c;h=f8c131741760ed0532bf5020a62f6cd7a17b0bb7;hp=b36beca4ac5c734b2b251913cfe711457c9f2167;hb=00102f342959b094f035b618c0b7acf477de35b4;hpb=14814eb7966c2198ee3dc4c9f6fa45d4ef0e7c5c diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b36beca..f8c1317 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1,20 +1,24 @@ /* - * linux/mds/handler.c + * ost/ost_handler.c + * Storage Target Handling functions * * Lustre Object Server Module (OST) * - * Copyright (C) 2001 Cluster File Systems, Inc. + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution * * by Peter Braam * - * This server is single threaded at present (but can easily be multi threaded). + * This server is single threaded at present (but can easily be multi + * threaded). For testing and management it is treated as an + * obd_device, although it does not export a full OBD method table + * (the requests are coming in over the wire, so object target + * modules do not have a full method table.) * */ - #define EXPORT_SYMTAB #include @@ -25,6 +29,9 @@ #include #include #include + +#define DEBUG_SUBSYSTEM S_OST + #include #include #include @@ -33,45 +40,80 @@ #include #include - // for testing -static struct ost_obd *OST; - -// for testing -static int ost_queue_req(struct ost_request *req) +static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req) { + struct ptlrpc_request *srv_req; + struct ost_obd *ost = &obddev->u.ost; - if (!OST) { + if (!ost) { EXIT; return -1; } - list_add(&req->rq_list, &OST->ost_reqs); - init_waitqueue_head(&req->rq_wait_for_ost_rep); - req->rq_obd = OST; - wake_up(&OST->ost_waitq); - printk("-- sleeping\n"); - interruptible_sleep_on(&req->rq_wait_for_ost_rep); - printk("-- done\n"); + OBD_ALLOC(srv_req, sizeof(*srv_req)); + if (!srv_req) { + EXIT; + return -ENOMEM; + } + + CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n", + __LINE__, ost, req, srv_req); + + memset(srv_req, 0, sizeof(*req)); + + /* move the request buffer */ + srv_req->rq_reqbuf = req->rq_reqbuf; + srv_req->rq_reqlen = req->rq_reqlen; + srv_req->rq_ost = ost; + + /* remember where it came from */ + srv_req->rq_reply_handle = req; + + list_add(&srv_req->rq_list, &ost->ost_reqs); + wake_up(&ost->ost_waitq); return 0; } -int ost_reply(struct ost_request *req) +int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req) { + struct ptlrpc_request *clnt_req = req->rq_reply_handle; + ENTRY; - kfree(req->rq_reqbuf); - req->rq_reqbuf = NULL; - wake_up_interruptible(&req->rq_wait_for_ost_rep); + + if (req->rq_ost->ost_service != NULL) { + /* This is a request that came from the network via portals. */ + + /* FIXME: we need to increment the count of handled events */ + ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0); + } else { + /* This is a local request that came from another thread. */ + + /* move the reply to the client */ + clnt_req->rq_replen = req->rq_replen; + clnt_req->rq_repbuf = req->rq_repbuf; + req->rq_repbuf = NULL; + req->rq_replen = 0; + + /* free the request buffer */ + OBD_FREE(req->rq_reqbuf, req->rq_reqlen); + req->rq_reqbuf = NULL; + + /* wake up the client */ + wake_up_interruptible(&clnt_req->rq_wait_for_rep); + } + EXIT; return 0; } -int ost_error(struct ost_request *req) +int ost_error(struct obd_device *obddev, struct ptlrpc_request *req) { - struct ost_rep_hdr *hdr; + struct ptlrep_hdr *hdr; ENTRY; - hdr = kmalloc(sizeof(*hdr), GFP_KERNEL); + + OBD_ALLOC(hdr, sizeof(*hdr)); if (!hdr) { EXIT; return -ENOMEM; @@ -82,125 +124,284 @@ int ost_error(struct ost_request *req) hdr->seqno = req->rq_reqhdr->seqno; hdr->status = req->rq_status; hdr->type = OST_TYPE_ERR; + req->rq_repbuf = (char *)hdr; + req->rq_replen = sizeof(*hdr); EXIT; - return ost_reply(req); + return ost_reply(obddev, req); } -static int ost_destroy(struct ost_obd *ost, struct ost_request *req) +static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_reqlen, &req->rq_reqbuf); + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_destroy: cannot pack reply\n"); return rc; } - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy - (&conn, &req->rq_req->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy + (&conn, &req->rq_req.ost->oa); EXIT; return 0; } -static int ost_getattr(struct ost_obd *ost, struct ost_request *req) +static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_reqlen, &req->rq_reqbuf); + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_getattr: cannot pack reply\n"); return rc; } - req->rq_rep->oa.o_id = req->rq_req->oa.o_id; + req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; + req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr + (&conn, &req->rq_rep.ost->oa); EXIT; return 0; } -static int ost_create(struct ost_obd *ost, struct ost_request *req) +static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; + conn.oc_id = req->rq_req.ost->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_reqlen, &req->rq_reqbuf); + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_create: cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa)); + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa)); - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create + (&conn, &req->rq_rep.ost->oa); EXIT; return 0; } -static int ost_setattr(struct ost_obd *ost, struct ost_request *req) +static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) +{ + struct obd_conn conn; + int rc; + + ENTRY; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = ost->ost_tgt; + + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + printk("ost_setattr: cannot pack reply\n"); + return rc; + } + + memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, + sizeof(req->rq_req.ost->oa)); + + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr + (&conn, &req->rq_rep.ost->oa); + + EXIT; + return 0; +} + +static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req) { struct obd_conn conn; int rc; ENTRY; - conn.oc_id = req->rq_req->connid; conn.oc_dev = ost->ost_tgt; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_reqlen, &req->rq_reqbuf); + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); if (rc) { printk("ost_setattr: cannot pack reply\n"); return rc; } - memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa)); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn); + + CDEBUG(0, "ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, + conn.oc_id); + req->rq_rep.ost->connid = conn.oc_id; + EXIT; + return 0; +} + +static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req) +{ + struct obd_conn conn; + int rc; + + ENTRY; + + conn.oc_dev = ost->ost_tgt; + conn.oc_id = req->rq_req.ost->connid; + + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + printk("ost_setattr: cannot pack reply\n"); + return rc; + } - req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr - (&conn, &req->rq_rep->oa); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn); EXIT; return 0; } +static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) +{ + struct obd_conn conn; + int rc; + int vallen; + void *val; + char *ptr; + + ENTRY; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = ost->ost_tgt; + + ptr = ost_req_buf1(req->rq_req.ost); + req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info + (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); + + rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + printk("ost_setattr: cannot pack reply\n"); + return rc; + } + + EXIT; + return 0; +} -//int ost_handle(struct ost_conn *conn, int len, char *buf) -int ost_handle(struct ost_obd *ost, struct ost_request *req) +int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) { + struct obd_conn conn; int rc; - struct ost_req_hdr *hdr; + int i, j; + int objcount, niocount; + char *tmp1, *tmp2, *end2; + char *res; + int cmd; + struct niobuf *nb, *src, *dst; + struct obd_ioobj *ioo; + struct ost_req *r = req->rq_req.ost; ENTRY; + + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + end2 = tmp2 + req->rq_req.ost->buflen2; + objcount = r->buflen1 / sizeof(*ioo); + niocount = r->buflen2 / sizeof(*nb); + cmd = r->cmd; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = req->rq_ost->ost_tgt; + + rc = ost_pack_rep(NULL, niocount, NULL, 0, + &req->rq_rephdr, &req->rq_rep.ost, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + printk("ost_create: cannot pack reply\n"); + return rc; + } + res = ost_rep_buf1(req->rq_rep.ost); + + for (i=0; i < objcount; i++) { + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; + } + for (j = 0 ; j < ioo->ioo_bufcnt ; j++) { + ost_unpack_niobuf((void *)&tmp2, &nb); + } + } + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + req->rq_rep.ost->result = + req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)tmp2, (struct niobuf *)res); + + if (cmd == OBD_BRW_WRITE) { + for (i=0; iaddr, + (void *)(unsigned long)src->addr, + src->len); + } + } else { + for (i=0; iaddr, + (void *)(unsigned long)src->addr, + PAGE_SIZE); + } + } + + req->rq_rep.ost->result = + req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)res); - hdr = (struct ost_req_hdr *)req->rq_reqbuf; + EXIT; + return 0; +} +int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req) +{ + int rc; + struct ost_obd *ost = &obddev->u.ost; + struct ptlreq_hdr *hdr; + + ENTRY; + CDEBUG(0, "req at %p\n", req); + + hdr = (struct ptlreq_hdr *)req->rq_reqbuf; if (NTOH__u32(hdr->type) != OST_TYPE_REQ) { printk("lustre_ost: wrong packet type sent %d\n", NTOH__u32(hdr->type)); @@ -209,7 +410,7 @@ int ost_handle(struct ost_obd *ost, struct ost_request *req) } rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req); + &req->rq_reqhdr, &req->rq_req.ost); if (rc) { printk("lustre_ost: Invalid request\n"); EXIT; @@ -218,6 +419,18 @@ int ost_handle(struct ost_obd *ost, struct ost_request *req) switch (req->rq_reqhdr->opc) { + case OST_CONNECT: + CDEBUG(D_INODE, "connect\n"); + rc = ost_connect(ost, req); + break; + case OST_DISCONNECT: + CDEBUG(D_INODE, "disconnect\n"); + rc = ost_disconnect(ost, req); + break; + case OST_GET_INFO: + CDEBUG(D_INODE, "get_info\n"); + rc = ost_get_info(ost, req); + break; case OST_CREATE: CDEBUG(D_INODE, "create\n"); rc = ost_create(ost, req); @@ -234,18 +447,23 @@ int ost_handle(struct ost_obd *ost, struct ost_request *req) CDEBUG(D_INODE, "setattr\n"); rc = ost_setattr(ost, req); break; - + case OST_BRW: + CDEBUG(D_INODE, "brw\n"); + rc = ost_brw(ost, req); + break; default: - return ost_error(req); + req->rq_status = -ENOTSUPP; + return ost_error(obddev, req); } out: + req->rq_status = rc; if (rc) { printk("ost: processing error %d\n", rc); - ost_error(req); + ost_error(obddev, req); } else { CDEBUG(D_INODE, "sending reply\n"); - ost_reply(req); + ost_reply(obddev, req); } return 0; @@ -253,7 +471,9 @@ out: int ost_main(void *arg) { - struct ost_obd *ost = (struct ost_obd *) arg; + struct obd_device *obddev = (struct obd_device *) arg; + struct ost_obd *ost = &obddev->u.ost; + ENTRY; lock_kernel(); daemonize(); @@ -272,26 +492,63 @@ int ost_main(void *arg) /* And now, wait forever for commit wakeup events. */ while (1) { - struct ost_request *request; int rc; if (ost->ost_flags & OST_EXIT) break; - wake_up(&ost->ost_done_waitq); interruptible_sleep_on(&ost->ost_waitq); CDEBUG(D_INODE, "lustre_ost wakes\n"); CDEBUG(D_INODE, "pick up req here and continue\n"); - if (list_empty(&ost->ost_reqs)) { - CDEBUG(D_INODE, "woke because of timer\n"); - } else { - request = list_entry(ost->ost_reqs.next, - struct ost_request, rq_list); - list_del(&request->rq_list); - rc = ost_handle(ost, request); + + if (ost->ost_service != NULL) { + ptl_event_t ev; + + while (1) { + struct ptlrpc_request request; + struct ptlrpc_service *service; + + rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); + if (rc != PTL_OK && rc != PTL_EQ_DROPPED) + break; + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + + /* FIXME: If we move to an event-driven model, + * we should put the request on the stack of + * mds_handle instead. */ + memset(&request, 0, sizeof(request)); + request.rq_reqbuf = ev.mem_desc.start + + ev.offset; + request.rq_reqlen = ev.mem_desc.length; + request.rq_ost = ost; + request.rq_xid = ev.match_bits; + + request.rq_peer.peer_nid = ev.initiator.nid; + /* FIXME: this NI should be the incoming NI. + * We don't know how to find that from here. */ + request.rq_peer.peer_ni = + ost->ost_service->srv_self.peer_ni; + rc = ost_handle(obddev, &request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); + } + } else { + struct ptlrpc_request *request; + + if (list_empty(&ost->ost_reqs)) { + CDEBUG(D_INODE, "woke because of timer\n"); + } else { + request = list_entry(ost->ost_reqs.next, + struct ptlrpc_request, + rq_list); + list_del(&request->rq_list); + rc = ost_handle(obddev, request); + } } } @@ -313,14 +570,18 @@ static void ost_stop_srv_thread(struct ost_obd *ost) } } -static void ost_start_srv_thread(struct ost_obd *ost) +static void ost_start_srv_thread(struct obd_device *obd) { + struct ost_obd *ost = &obd->u.ost; + ENTRY; + init_waitqueue_head(&ost->ost_waitq); init_waitqueue_head(&ost->ost_done_waitq); - kernel_thread(ost_main, (void *)ost, + kernel_thread(ost_main, (void *)obd, CLONE_VM | CLONE_FS | CLONE_FILES); while (!ost->ost_thread) sleep_on(&ost->ost_done_waitq); + EXIT; } /* mount the file system (secretly) */ @@ -331,6 +592,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, struct obd_ioctl_data* data = buf; struct ost_obd *ost = &obddev->u.ost; struct obd_device *tgt; + struct lustre_peer peer; int err; ENTRY; @@ -340,7 +602,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, } tgt = &obd_dev[data->ioc_dev]; - + ost->ost_tgt = tgt; if ( ! (tgt->obd_flags & OBD_ATTACHED) || ! (tgt->obd_flags & OBD_SET_UP) ){ printk("device not attached or not set up (%d)\n", @@ -349,6 +611,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, return -EINVAL; } + ost->ost_conn.oc_dev = tgt; err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn); if (err) { printk("lustre ost: fail to connect to device %d\n", @@ -359,11 +622,23 @@ static int ost_setup(struct obd_device *obddev, obd_count len, INIT_LIST_HEAD(&ost->ost_reqs); ost->ost_thread = NULL; ost->ost_flags = 0; - OST = ost; - spin_lock_init(&obddev->u.ost.fo_lock); + spin_lock_init(&obddev->u.ost.ost_lock); - ost_start_srv_thread(ost); + err = kportal_uuid_to_peer("self", &peer); + if (err == 0) { + OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service)); + if (ost->ost_service == NULL) + return -ENOMEM; + ost->ost_service->srv_buf_size = 64 * 1024; + ost->ost_service->srv_portal = OST_REQUEST_PORTAL; + memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer)); + ost->ost_service->srv_wait_queue = &ost->ost_waitq; + + rpc_register_service(ost->ost_service, "self"); + } + + ost_start_srv_thread(obddev); MOD_INC_USE_COUNT; EXIT; @@ -389,8 +664,9 @@ static int ost_cleanup(struct obd_device * obddev) return -EBUSY; } - OST = NULL; ost_stop_srv_thread(ost); + rpc_unregister_service(ost->ost_service); + OBD_FREE(ost->ost_service, sizeof(*ost->ost_service)); if (!list_empty(&ost->ost_reqs)) { // XXX reply with errors and clean up @@ -431,7 +707,6 @@ MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01"); MODULE_LICENSE("GPL"); - // for testing (maybe this stays) EXPORT_SYMBOL(ost_queue_req);