1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
35 #include <linux/version.h>
36 #include <linux/module.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
44 #define DEBUG_SUBSYSTEM S_OST
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
55 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
57 struct ptlrpc_request *srv_req;
58 struct ost_obd *ost = &obddev->u.ost;
65 OBD_ALLOC(srv_req, sizeof(*srv_req));
71 CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
72 __LINE__, ost, req, srv_req);
74 memset(srv_req, 0, sizeof(*req));
76 /* move the request buffer */
77 srv_req->rq_reqbuf = req->rq_reqbuf;
78 srv_req->rq_reqlen = req->rq_reqlen;
79 srv_req->rq_ost = ost;
81 /* remember where it came from */
82 srv_req->rq_reply_handle = req;
84 spin_lock(&ost->ost_lock);
85 list_add(&srv_req->rq_list, &ost->ost_reqs);
86 spin_unlock(&ost->ost_lock);
87 wake_up(&ost->ost_waitq);
91 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
93 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
97 if (req->rq_ost->ost_service != NULL) {
98 /* This is a request that came from the network via portals. */
100 /* FIXME: we need to increment the count of handled events */
101 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
103 /* This is a local request that came from another thread. */
105 /* move the reply to the client */
106 clnt_req->rq_replen = req->rq_replen;
107 clnt_req->rq_repbuf = req->rq_repbuf;
108 req->rq_repbuf = NULL;
111 /* free the request buffer */
112 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
113 req->rq_reqbuf = NULL;
115 /* wake up the client */
116 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
123 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
125 struct ptlrep_hdr *hdr;
129 OBD_ALLOC(hdr, sizeof(*hdr));
135 memset(hdr, 0, sizeof(*hdr));
137 hdr->seqno = req->rq_reqhdr->seqno;
138 hdr->status = req->rq_status;
139 hdr->type = OST_TYPE_ERR;
141 req->rq_repbuf = (char *)hdr;
142 req->rq_replen = sizeof(*hdr);
145 return ost_reply(obddev, req);
148 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
150 struct obd_conn conn;
155 conn.oc_id = req->rq_req.ost->connid;
156 conn.oc_dev = ost->ost_tgt;
158 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
159 &req->rq_replen, &req->rq_repbuf);
161 CERROR("cannot pack reply\n");
165 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
166 (&conn, &req->rq_req.ost->oa);
172 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
174 struct obd_conn conn;
179 conn.oc_id = req->rq_req.ost->connid;
180 conn.oc_dev = ost->ost_tgt;
182 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
183 &req->rq_replen, &req->rq_repbuf);
185 CERROR("cannot pack reply\n");
188 req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
189 req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
191 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
192 (&conn, &req->rq_rep.ost->oa);
198 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
200 struct obd_conn conn;
205 conn.oc_id = req->rq_req.ost->connid;
206 conn.oc_dev = ost->ost_tgt;
208 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
209 &req->rq_replen, &req->rq_repbuf);
211 CERROR("cannot pack reply\n");
215 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
217 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
218 (&conn, &req->rq_rep.ost->oa);
224 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
226 struct obd_conn conn;
231 conn.oc_id = req->rq_req.ost->connid;
232 conn.oc_dev = ost->ost_tgt;
234 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
235 &req->rq_replen, &req->rq_repbuf);
237 CERROR("cannot pack reply\n");
241 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
243 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
244 (&conn, &req->rq_rep.ost->oa,
245 req->rq_rep.ost->oa.o_size,
246 req->rq_rep.ost->oa.o_blocks);
253 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
255 struct obd_conn conn;
260 conn.oc_id = req->rq_req.ost->connid;
261 conn.oc_dev = ost->ost_tgt;
263 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
264 &req->rq_replen, &req->rq_repbuf);
266 CERROR("cannot pack reply\n");
270 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
271 sizeof(req->rq_req.ost->oa));
273 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
274 (&conn, &req->rq_rep.ost->oa);
280 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
282 struct obd_conn conn;
287 conn.oc_dev = ost->ost_tgt;
289 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
290 &req->rq_replen, &req->rq_repbuf);
292 CERROR("cannot pack reply\n");
296 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
298 CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
300 req->rq_rep.ost->connid = conn.oc_id;
305 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
307 struct obd_conn conn;
312 conn.oc_dev = ost->ost_tgt;
313 conn.oc_id = req->rq_req.ost->connid;
315 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
316 &req->rq_replen, &req->rq_repbuf);
318 CERROR("cannot pack reply\n");
322 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
328 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
330 struct obd_conn conn;
338 conn.oc_id = req->rq_req.ost->connid;
339 conn.oc_dev = ost->ost_tgt;
341 ptr = ost_req_buf1(req->rq_req.ost);
342 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
343 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
345 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
346 &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf);
348 CERROR("cannot pack reply\n");
356 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
358 struct obd_conn conn;
361 int objcount, niocount;
362 char *tmp1, *tmp2, *end2;
365 struct niobuf *nb, *src, *dst;
366 struct obd_ioobj *ioo;
367 struct ost_req *r = req->rq_req.ost;
371 tmp1 = ost_req_buf1(r);
372 tmp2 = ost_req_buf2(r);
373 end2 = tmp2 + req->rq_req.ost->buflen2;
374 objcount = r->buflen1 / sizeof(*ioo);
375 niocount = r->buflen2 / sizeof(*nb);
378 conn.oc_id = req->rq_req.ost->connid;
379 conn.oc_dev = req->rq_ost->ost_tgt;
381 rc = ost_pack_rep(NULL, niocount, NULL, 0,
382 &req->rq_rephdr, &req->rq_rep.ost,
383 &req->rq_replen, &req->rq_repbuf);
385 CERROR("cannot pack reply\n");
388 res = ost_rep_buf1(req->rq_rep.ost);
390 for (i=0; i < objcount; i++) {
391 ost_unpack_ioo((void *)&tmp1, &ioo);
392 if (tmp2 + ioo->ioo_bufcnt > end2) {
396 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) {
397 ost_unpack_niobuf((void *)&tmp2, &nb);
401 /* The unpackers move tmp1 and tmp2, so reset them before using */
402 tmp1 = ost_req_buf1(r);
403 tmp2 = ost_req_buf2(r);
404 req->rq_rep.ost->result =
405 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
406 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
407 niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
409 if (req->rq_rep.ost->result) {
414 if (cmd == OBD_BRW_WRITE) {
415 for (i = 0; i < niocount; i++) {
416 src = &((struct niobuf *)tmp2)[i];
417 dst = &((struct niobuf *)res)[i];
418 memcpy((void *)(unsigned long)dst->addr,
419 (void *)(unsigned long)src->addr,
424 for (i = 0; i < niocount; i++) {
425 dst = &((struct niobuf *)tmp2)[i];
426 src = &((struct niobuf *)res)[i];
427 memcpy((void *)(unsigned long)dst->addr,
428 (void *)(unsigned long)src->addr,
434 req->rq_rep.ost->result =
435 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
436 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
437 niocount, (struct niobuf *)res);
444 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
447 struct ost_obd *ost = &obddev->u.ost;
448 struct ptlreq_hdr *hdr;
451 CDEBUG(0, "req at %p\n", req);
453 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
454 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
455 CERROR("lustre_ost: wrong packet type sent %d\n",
456 NTOH__u32(hdr->type));
461 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
462 &req->rq_reqhdr, &req->rq_req.ost);
464 CERROR("lustre_ost: Invalid request\n");
469 switch (req->rq_reqhdr->opc) {
472 CDEBUG(D_INODE, "connect\n");
473 rc = ost_connect(ost, req);
476 CDEBUG(D_INODE, "disconnect\n");
477 rc = ost_disconnect(ost, req);
480 CDEBUG(D_INODE, "get_info\n");
481 rc = ost_get_info(ost, req);
484 CDEBUG(D_INODE, "create\n");
485 rc = ost_create(ost, req);
488 CDEBUG(D_INODE, "destroy\n");
489 rc = ost_destroy(ost, req);
492 CDEBUG(D_INODE, "getattr\n");
493 rc = ost_getattr(ost, req);
496 CDEBUG(D_INODE, "setattr\n");
497 rc = ost_setattr(ost, req);
500 CDEBUG(D_INODE, "brw\n");
501 rc = ost_brw(ost, req);
504 CDEBUG(D_INODE, "punch\n");
505 rc = ost_punch(ost, req);
508 req->rq_status = -ENOTSUPP;
509 return ost_error(obddev, req);
515 CERROR("ost: processing error %d\n", rc);
516 ost_error(obddev, req);
518 CDEBUG(D_INODE, "sending reply\n");
519 ost_reply(obddev, req);
525 /* FIXME: Serious refactoring needed */
526 int ost_main(void *arg)
529 struct obd_device *obddev = (struct obd_device *) arg;
530 struct ost_obd *ost = &obddev->u.ost;
531 DECLARE_WAITQUEUE(wait, current);
537 spin_lock_irq(¤t->sigmask_lock);
538 sigfillset(¤t->blocked);
539 recalc_sigpending(current);
540 spin_unlock_irq(¤t->sigmask_lock);
542 sprintf(current->comm, "lustre_ost");
544 /* Record that the thread is running */
545 ost->ost_thread = current;
546 wake_up(&ost->ost_done_waitq);
548 /* XXX maintain a list of all managed devices: insert here */
550 /* And now, wait forever for commit wakeup events. */
554 if (ost->ost_service != NULL) {
556 struct ptlrpc_request request;
557 struct ptlrpc_service *service;
559 CDEBUG(D_IOCTL, "-- sleeping\n");
561 add_wait_queue(&ost->ost_waitq, &wait);
563 set_current_state(TASK_INTERRUPTIBLE);
564 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
565 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
567 if (ost->ost_flags & OST_EXIT)
571 /* if this process really wants to die,
573 if (sigismember(&(current->pending.signal),
575 sigismember(&(current->pending.signal),
583 remove_wait_queue(&ost->ost_waitq, &wait);
584 set_current_state(TASK_RUNNING);
585 CDEBUG(D_IOCTL, "-- done\n");
588 /* We broke out because of a signal */
592 if (ost->ost_flags & OST_EXIT) {
597 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
599 /* FIXME: If we move to an event-driven model,
600 * we should put the request on the stack of
601 * mds_handle instead. */
602 memset(&request, 0, sizeof(request));
603 request.rq_reqbuf = ev.mem_desc.start + ev.offset;
604 request.rq_reqlen = ev.mem_desc.length;
605 request.rq_ost = ost;
606 request.rq_xid = ev.match_bits;
608 request.rq_peer.peer_nid = ev.initiator.nid;
609 /* FIXME: this NI should be the incoming NI.
610 * We don't know how to find that from here. */
611 request.rq_peer.peer_ni =
612 ost->ost_service->srv_self.peer_ni;
613 rc = ost_handle(obddev, &request);
615 /* Inform the rpc layer the event has been handled */
616 ptl_received_rpc(service);
618 struct ptlrpc_request *request;
620 CDEBUG(D_IOCTL, "-- sleeping\n");
621 add_wait_queue(&ost->ost_waitq, &wait);
623 spin_lock(&ost->ost_lock);
624 if (!list_empty(&ost->ost_reqs))
627 set_current_state(TASK_INTERRUPTIBLE);
629 /* if this process really wants to die,
631 if (sigismember(&(current->pending.signal),
633 sigismember(&(current->pending.signal),
637 spin_unlock(&ost->ost_lock);
641 remove_wait_queue(&ost->ost_waitq, &wait);
642 set_current_state(TASK_RUNNING);
643 CDEBUG(D_IOCTL, "-- done\n");
645 if (list_empty(&ost->ost_reqs)) {
646 CDEBUG(D_INODE, "woke because of signal\n");
647 spin_unlock(&ost->ost_lock);
649 request = list_entry(ost->ost_reqs.next,
650 struct ptlrpc_request,
652 list_del(&request->rq_list);
653 spin_unlock(&ost->ost_lock);
654 rc = ost_handle(obddev, request);
659 /* XXX maintain a list of all managed devices: cleanup here */
661 ost->ost_thread = NULL;
662 wake_up(&ost->ost_done_waitq);
663 CERROR("lustre_ost: exiting\n");
667 static void ost_stop_srv_thread(struct ost_obd *ost)
669 ost->ost_flags |= OST_EXIT;
671 while (ost->ost_thread) {
672 wake_up(&ost->ost_waitq);
673 sleep_on(&ost->ost_done_waitq);
677 static void ost_start_srv_thread(struct obd_device *obd)
679 struct ost_obd *ost = &obd->u.ost;
682 init_waitqueue_head(&ost->ost_waitq);
683 init_waitqueue_head(&ost->ost_done_waitq);
684 kernel_thread(ost_main, (void *)obd,
685 CLONE_VM | CLONE_FS | CLONE_FILES);
686 while (!ost->ost_thread)
687 sleep_on(&ost->ost_done_waitq);
691 /* mount the file system (secretly) */
692 static int ost_setup(struct obd_device *obddev, obd_count len,
696 struct obd_ioctl_data* data = buf;
697 struct ost_obd *ost = &obddev->u.ost;
698 struct obd_device *tgt;
699 struct lustre_peer peer;
703 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
708 tgt = &obd_dev[data->ioc_dev];
710 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
711 ! (tgt->obd_flags & OBD_SET_UP) ){
712 CERROR("device not attached or not set up (%d)\n",
718 ost->ost_conn.oc_dev = tgt;
719 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
721 CERROR("lustre ost: fail to connect to device %d\n",
726 INIT_LIST_HEAD(&ost->ost_reqs);
727 ost->ost_thread = NULL;
730 spin_lock_init(&obddev->u.ost.ost_lock);
732 err = kportal_uuid_to_peer("self", &peer);
734 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
735 if (ost->ost_service == NULL)
737 ost->ost_service->srv_buf_size = 64 * 1024;
738 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
739 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
740 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
742 rpc_register_service(ost->ost_service, "self");
745 ost_start_srv_thread(obddev);
752 static int ost_cleanup(struct obd_device * obddev)
754 struct ost_obd *ost = &obddev->u.ost;
755 struct obd_device *tgt;
760 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
765 if ( !list_empty(&obddev->obd_gen_clients) ) {
766 CERROR("still has clients!\n");
771 ost_stop_srv_thread(ost);
772 rpc_unregister_service(ost->ost_service);
773 OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
775 if (!list_empty(&ost->ost_reqs)) {
776 // XXX reply with errors and clean up
777 CDEBUG(D_INODE, "Request list not empty!\n");
781 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
783 CERROR("lustre ost: fail to disconnect device\n");
793 /* use obd ops to offer management infrastructure */
794 static struct obd_ops ost_obd_ops = {
796 o_cleanup: ost_cleanup,
799 static int __init ost_init(void)
801 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
805 static void __exit ost_exit(void)
807 obd_unregister_type(LUSTRE_OST_NAME);
810 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
811 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
812 MODULE_LICENSE("GPL");
814 // for testing (maybe this stays)
815 EXPORT_SYMBOL(ost_queue_req);
817 module_init(ost_init);
818 module_exit(ost_exit);