1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
35 #include <linux/version.h>
36 #include <linux/module.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
44 #define DEBUG_SUBSYSTEM S_OST
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
55 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
57 struct ptlrpc_request *srv_req;
58 struct ost_obd *ost = &obddev->u.ost;
65 OBD_ALLOC(srv_req, sizeof(*srv_req));
71 CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
72 __LINE__, ost, req, srv_req);
74 memset(srv_req, 0, sizeof(*req));
76 /* move the request buffer */
77 srv_req->rq_reqbuf = req->rq_reqbuf;
78 srv_req->rq_reqlen = req->rq_reqlen;
79 srv_req->rq_ost = ost;
81 /* remember where it came from */
82 srv_req->rq_reply_handle = req;
84 spin_lock(&ost->ost_lock);
85 list_add(&srv_req->rq_list, &ost->ost_reqs);
86 spin_unlock(&ost->ost_lock);
87 wake_up(&ost->ost_waitq);
91 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
93 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
97 if (req->rq_ost->ost_service != NULL) {
98 /* This is a request that came from the network via portals. */
100 /* FIXME: we need to increment the count of handled events */
101 req->rq_type = PTLRPC_REPLY;
102 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL);
104 /* This is a local request that came from another thread. */
106 /* move the reply to the client */
107 clnt_req->rq_replen = req->rq_replen;
108 clnt_req->rq_repbuf = req->rq_repbuf;
109 req->rq_repbuf = NULL;
112 /* free the request buffer */
113 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
114 req->rq_reqbuf = NULL;
116 /* wake up the client */
117 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
124 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
126 struct ptlrep_hdr *hdr;
130 OBD_ALLOC(hdr, sizeof(*hdr));
136 memset(hdr, 0, sizeof(*hdr));
138 hdr->seqno = req->rq_reqhdr->seqno;
139 hdr->status = req->rq_status;
140 hdr->type = OST_TYPE_ERR;
142 req->rq_repbuf = (char *)hdr;
143 req->rq_replen = sizeof(*hdr);
146 return ost_reply(obddev, req);
149 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
151 struct obd_conn conn;
156 conn.oc_id = req->rq_req.ost->connid;
157 conn.oc_dev = ost->ost_tgt;
159 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
160 &req->rq_replen, &req->rq_repbuf);
162 CERROR("cannot pack reply\n");
166 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
167 (&conn, &req->rq_req.ost->oa);
173 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
175 struct obd_conn conn;
180 conn.oc_id = req->rq_req.ost->connid;
181 conn.oc_dev = ost->ost_tgt;
183 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
184 &req->rq_replen, &req->rq_repbuf);
186 CERROR("cannot pack reply\n");
189 req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
190 req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
192 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
193 (&conn, &req->rq_rep.ost->oa);
199 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
201 struct obd_conn conn;
206 conn.oc_id = req->rq_req.ost->connid;
207 conn.oc_dev = ost->ost_tgt;
209 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
210 &req->rq_replen, &req->rq_repbuf);
212 CERROR("cannot pack reply\n");
216 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
218 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
219 (&conn, &req->rq_rep.ost->oa);
225 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
227 struct obd_conn conn;
232 conn.oc_id = req->rq_req.ost->connid;
233 conn.oc_dev = ost->ost_tgt;
235 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
236 &req->rq_replen, &req->rq_repbuf);
238 CERROR("cannot pack reply\n");
242 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
244 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
245 (&conn, &req->rq_rep.ost->oa,
246 req->rq_rep.ost->oa.o_size,
247 req->rq_rep.ost->oa.o_blocks);
254 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
256 struct obd_conn conn;
261 conn.oc_id = req->rq_req.ost->connid;
262 conn.oc_dev = ost->ost_tgt;
264 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
265 &req->rq_replen, &req->rq_repbuf);
267 CERROR("cannot pack reply\n");
271 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
272 sizeof(req->rq_req.ost->oa));
274 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
275 (&conn, &req->rq_rep.ost->oa);
281 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
283 struct obd_conn conn;
288 conn.oc_dev = ost->ost_tgt;
290 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
291 &req->rq_replen, &req->rq_repbuf);
293 CERROR("cannot pack reply\n");
297 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
299 CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
301 req->rq_rep.ost->connid = conn.oc_id;
306 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
308 struct obd_conn conn;
313 conn.oc_dev = ost->ost_tgt;
314 conn.oc_id = req->rq_req.ost->connid;
316 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
317 &req->rq_replen, &req->rq_repbuf);
319 CERROR("cannot pack reply\n");
323 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
329 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
331 struct obd_conn conn;
339 conn.oc_id = req->rq_req.ost->connid;
340 conn.oc_dev = ost->ost_tgt;
342 ptr = ost_req_buf1(req->rq_req.ost);
343 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
344 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
346 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
347 &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf);
349 CERROR("cannot pack reply\n");
357 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
359 struct obd_conn conn;
362 int objcount, niocount;
363 char *tmp1, *tmp2, *end2;
366 struct niobuf *nb, *src, *dst;
367 struct obd_ioobj *ioo;
368 struct ost_req *r = req->rq_req.ost;
372 tmp1 = ost_req_buf1(r);
373 tmp2 = ost_req_buf2(r);
374 end2 = tmp2 + req->rq_req.ost->buflen2;
375 objcount = r->buflen1 / sizeof(*ioo);
376 niocount = r->buflen2 / sizeof(*nb);
379 conn.oc_id = req->rq_req.ost->connid;
380 conn.oc_dev = req->rq_ost->ost_tgt;
382 rc = ost_pack_rep(NULL, niocount, NULL, 0,
383 &req->rq_rephdr, &req->rq_rep.ost,
384 &req->rq_replen, &req->rq_repbuf);
386 CERROR("cannot pack reply\n");
389 res = ost_rep_buf1(req->rq_rep.ost);
391 for (i=0; i < objcount; i++) {
392 ost_unpack_ioo((void *)&tmp1, &ioo);
393 if (tmp2 + ioo->ioo_bufcnt > end2) {
397 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) {
398 ost_unpack_niobuf((void *)&tmp2, &nb);
402 /* The unpackers move tmp1 and tmp2, so reset them before using */
403 tmp1 = ost_req_buf1(r);
404 tmp2 = ost_req_buf2(r);
405 req->rq_rep.ost->result =
406 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
407 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
408 niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
410 if (req->rq_rep.ost->result) {
415 if (cmd == OBD_BRW_WRITE) {
416 for (i = 0; i < niocount; i++) {
417 src = &((struct niobuf *)tmp2)[i];
418 dst = &((struct niobuf *)res)[i];
419 memcpy((void *)(unsigned long)dst->addr,
420 (void *)(unsigned long)src->addr,
425 for (i = 0; i < niocount; i++) {
426 dst = &((struct niobuf *)tmp2)[i];
427 src = &((struct niobuf *)res)[i];
428 memcpy((void *)(unsigned long)dst->addr,
429 (void *)(unsigned long)src->addr,
435 req->rq_rep.ost->result =
436 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
437 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
438 niocount, (struct niobuf *)res);
445 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
448 struct ost_obd *ost = &obddev->u.ost;
449 struct ptlreq_hdr *hdr;
452 CDEBUG(0, "req at %p\n", req);
454 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
455 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
456 CERROR("lustre_ost: wrong packet type sent %d\n",
457 NTOH__u32(hdr->type));
462 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
463 &req->rq_reqhdr, &req->rq_req.ost);
465 CERROR("lustre_ost: Invalid request\n");
470 switch (req->rq_reqhdr->opc) {
473 CDEBUG(D_INODE, "connect\n");
474 rc = ost_connect(ost, req);
477 CDEBUG(D_INODE, "disconnect\n");
478 rc = ost_disconnect(ost, req);
481 CDEBUG(D_INODE, "get_info\n");
482 rc = ost_get_info(ost, req);
485 CDEBUG(D_INODE, "create\n");
486 rc = ost_create(ost, req);
489 CDEBUG(D_INODE, "destroy\n");
490 rc = ost_destroy(ost, req);
493 CDEBUG(D_INODE, "getattr\n");
494 rc = ost_getattr(ost, req);
497 CDEBUG(D_INODE, "setattr\n");
498 rc = ost_setattr(ost, req);
501 CDEBUG(D_INODE, "brw\n");
502 rc = ost_brw(ost, req);
505 CDEBUG(D_INODE, "punch\n");
506 rc = ost_punch(ost, req);
509 req->rq_status = -ENOTSUPP;
510 return ost_error(obddev, req);
516 CERROR("ost: processing error %d\n", rc);
517 ost_error(obddev, req);
519 CDEBUG(D_INODE, "sending reply\n");
520 ost_reply(obddev, req);
526 /* FIXME: Serious refactoring needed */
527 int ost_main(void *arg)
530 struct obd_device *obddev = (struct obd_device *) arg;
531 struct ost_obd *ost = &obddev->u.ost;
532 DECLARE_WAITQUEUE(wait, current);
538 spin_lock_irq(¤t->sigmask_lock);
539 sigfillset(¤t->blocked);
540 recalc_sigpending(current);
541 spin_unlock_irq(¤t->sigmask_lock);
543 sprintf(current->comm, "lustre_ost");
545 /* Record that the thread is running */
546 ost->ost_thread = current;
547 wake_up(&ost->ost_done_waitq);
549 /* XXX maintain a list of all managed devices: insert here */
551 /* And now, wait forever for commit wakeup events. */
555 if (ost->ost_service != NULL) {
557 struct ptlrpc_request request;
558 struct ptlrpc_service *service;
560 CDEBUG(D_IOCTL, "-- sleeping\n");
562 add_wait_queue(&ost->ost_waitq, &wait);
564 set_current_state(TASK_INTERRUPTIBLE);
565 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
566 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
568 if (ost->ost_flags & OST_EXIT)
572 /* if this process really wants to die,
574 if (sigismember(&(current->pending.signal),
576 sigismember(&(current->pending.signal),
584 remove_wait_queue(&ost->ost_waitq, &wait);
585 set_current_state(TASK_RUNNING);
586 CDEBUG(D_IOCTL, "-- done\n");
589 /* We broke out because of a signal */
593 if (ost->ost_flags & OST_EXIT) {
598 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
600 /* FIXME: If we move to an event-driven model,
601 * we should put the request on the stack of
602 * mds_handle instead. */
603 memset(&request, 0, sizeof(request));
604 request.rq_reqbuf = ev.mem_desc.start + ev.offset;
605 request.rq_reqlen = ev.mem_desc.length;
606 request.rq_ost = ost;
607 request.rq_xid = ev.match_bits;
609 request.rq_peer.peer_nid = ev.initiator.nid;
610 /* FIXME: this NI should be the incoming NI.
611 * We don't know how to find that from here. */
612 request.rq_peer.peer_ni =
613 ost->ost_service->srv_self.peer_ni;
614 rc = ost_handle(obddev, &request);
616 /* Inform the rpc layer the event has been handled */
617 ptl_received_rpc(service);
619 struct ptlrpc_request *request;
621 CDEBUG(D_IOCTL, "-- sleeping\n");
622 add_wait_queue(&ost->ost_waitq, &wait);
624 spin_lock(&ost->ost_lock);
625 if (!list_empty(&ost->ost_reqs))
628 set_current_state(TASK_INTERRUPTIBLE);
630 /* if this process really wants to die,
632 if (sigismember(&(current->pending.signal),
634 sigismember(&(current->pending.signal),
638 spin_unlock(&ost->ost_lock);
642 remove_wait_queue(&ost->ost_waitq, &wait);
643 set_current_state(TASK_RUNNING);
644 CDEBUG(D_IOCTL, "-- done\n");
646 if (list_empty(&ost->ost_reqs)) {
647 CDEBUG(D_INODE, "woke because of signal\n");
648 spin_unlock(&ost->ost_lock);
650 request = list_entry(ost->ost_reqs.next,
651 struct ptlrpc_request,
653 list_del(&request->rq_list);
654 spin_unlock(&ost->ost_lock);
655 rc = ost_handle(obddev, request);
660 /* XXX maintain a list of all managed devices: cleanup here */
662 ost->ost_thread = NULL;
663 wake_up(&ost->ost_done_waitq);
664 CERROR("lustre_ost: exiting\n");
668 static void ost_stop_srv_thread(struct ost_obd *ost)
670 ost->ost_flags |= OST_EXIT;
672 while (ost->ost_thread) {
673 wake_up(&ost->ost_waitq);
674 sleep_on(&ost->ost_done_waitq);
678 static void ost_start_srv_thread(struct obd_device *obd)
680 struct ost_obd *ost = &obd->u.ost;
683 init_waitqueue_head(&ost->ost_waitq);
684 init_waitqueue_head(&ost->ost_done_waitq);
685 kernel_thread(ost_main, (void *)obd,
686 CLONE_VM | CLONE_FS | CLONE_FILES);
687 while (!ost->ost_thread)
688 sleep_on(&ost->ost_done_waitq);
692 /* mount the file system (secretly) */
693 static int ost_setup(struct obd_device *obddev, obd_count len,
697 struct obd_ioctl_data* data = buf;
698 struct ost_obd *ost = &obddev->u.ost;
699 struct obd_device *tgt;
700 struct lustre_peer peer;
704 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
709 tgt = &obd_dev[data->ioc_dev];
711 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
712 ! (tgt->obd_flags & OBD_SET_UP) ){
713 CERROR("device not attached or not set up (%d)\n",
719 ost->ost_conn.oc_dev = tgt;
720 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
722 CERROR("lustre ost: fail to connect to device %d\n",
727 INIT_LIST_HEAD(&ost->ost_reqs);
728 ost->ost_thread = NULL;
731 spin_lock_init(&obddev->u.ost.ost_lock);
733 err = kportal_uuid_to_peer("self", &peer);
735 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
736 if (ost->ost_service == NULL)
738 ost->ost_service->srv_buf_size = 64 * 1024;
739 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
740 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
741 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
743 rpc_register_service(ost->ost_service, "self");
746 ost_start_srv_thread(obddev);
753 static int ost_cleanup(struct obd_device * obddev)
755 struct ost_obd *ost = &obddev->u.ost;
756 struct obd_device *tgt;
761 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
766 if ( !list_empty(&obddev->obd_gen_clients) ) {
767 CERROR("still has clients!\n");
772 ost_stop_srv_thread(ost);
773 rpc_unregister_service(ost->ost_service);
774 OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
776 if (!list_empty(&ost->ost_reqs)) {
777 // XXX reply with errors and clean up
778 CDEBUG(D_INODE, "Request list not empty!\n");
782 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
784 CERROR("lustre ost: fail to disconnect device\n");
794 /* use obd ops to offer management infrastructure */
795 static struct obd_ops ost_obd_ops = {
797 o_cleanup: ost_cleanup,
800 static int __init ost_init(void)
802 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
806 static void __exit ost_exit(void)
808 obd_unregister_type(LUSTRE_OST_NAME);
811 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
812 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
813 MODULE_LICENSE("GPL");
815 // for testing (maybe this stays)
816 EXPORT_SYMBOL(ost_queue_req);
818 module_init(ost_init);
819 module_exit(ost_exit);