3 * Storage Target Handling functions
5 * Lustre Object Server Module (OST)
7 * Copyright (C) 2001 Cluster File Systems, Inc.
9 * This code is issued under the GNU General Public License.
10 * See the file COPYING in this distribution
12 * by Peter Braam <braam@clusterfs.com>
14 * This server is single threaded at present (but can easily be multi threaded).
15 * For testing and management it is treated as an obd_device, although it does
16 * not export a full OBD method table (the requests are coming in over the wire,
17 * so object target modules do not have a full method table.)
24 #include <linux/version.h>
25 #include <linux/module.h>
27 #include <linux/stat.h>
28 #include <linux/locks.h>
29 #include <linux/ext2_fs.h>
30 #include <linux/quotaops.h>
31 #include <asm/unistd.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
41 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
43 struct ptlrpc_request *srv_req;
44 struct ost_obd *ost = &obddev->u.ost;
51 srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
57 printk("---> OST at %d %p, incoming req %p, srv_req %p\n",
58 __LINE__, ost, req, srv_req);
60 memset(srv_req, 0, sizeof(*req));
62 /* move the request buffer */
63 srv_req->rq_reqbuf = req->rq_reqbuf;
64 srv_req->rq_reqlen = req->rq_reqlen;
65 srv_req->rq_ost = ost;
67 /* remember where it came from */
68 srv_req->rq_reply_handle = req;
70 list_add(&srv_req->rq_list, &ost->ost_reqs);
71 wake_up(&ost->ost_waitq);
76 /* XXX replace with networking code */
77 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
79 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
82 printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req);
84 /* free the request buffer */
85 kfree(req->rq_reqbuf);
86 req->rq_reqbuf = NULL;
88 /* move the reply to the client */
89 clnt_req->rq_replen = req->rq_replen;
90 clnt_req->rq_repbuf = req->rq_repbuf;
92 printk("---> client req %p repbuf %p len %d status %d\n",
93 clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen,
94 req->rq_rephdr->status);
96 req->rq_repbuf = NULL;
99 /* free the server request */
101 /* wake up the client */
102 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
107 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
109 struct ptlrep_hdr *hdr;
112 hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
118 memset(hdr, 0, sizeof(*hdr));
120 hdr->seqno = req->rq_reqhdr->seqno;
121 hdr->status = req->rq_status;
122 hdr->type = OST_TYPE_ERR;
124 req->rq_repbuf = (char *)hdr;
125 req->rq_replen = sizeof(*hdr);
128 return ost_reply(obddev, req);
131 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
133 struct obd_conn conn;
138 conn.oc_id = req->rq_req.ost->connid;
139 conn.oc_dev = ost->ost_tgt;
141 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
142 &req->rq_replen, &req->rq_repbuf);
144 printk("ost_destroy: cannot pack reply\n");
148 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
149 (&conn, &req->rq_req.ost->oa);
155 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
157 struct obd_conn conn;
161 printk("ost getattr entered\n");
163 conn.oc_id = req->rq_req.ost->connid;
164 conn.oc_dev = ost->ost_tgt;
166 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
167 &req->rq_replen, &req->rq_repbuf);
169 printk("ost_getattr: cannot pack reply\n");
172 req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
173 req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
175 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
176 (&conn, &req->rq_rep.ost->oa);
182 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
184 struct obd_conn conn;
189 conn.oc_id = req->rq_req.ost->connid;
190 conn.oc_dev = ost->ost_tgt;
192 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
193 &req->rq_replen, &req->rq_repbuf);
195 printk("ost_create: cannot pack reply\n");
199 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
201 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
202 (&conn, &req->rq_rep.ost->oa);
209 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
211 struct obd_conn conn;
216 conn.oc_id = req->rq_req.ost->connid;
217 conn.oc_dev = ost->ost_tgt;
219 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
220 &req->rq_replen, &req->rq_repbuf);
222 printk("ost_setattr: cannot pack reply\n");
226 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
228 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
229 (&conn, &req->rq_rep.ost->oa);
235 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
237 struct obd_conn conn;
242 conn.oc_dev = ost->ost_tgt;
244 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
245 &req->rq_replen, &req->rq_repbuf);
247 printk("ost_setattr: cannot pack reply\n");
251 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
253 printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
255 req->rq_rep.ost->connid = conn.oc_id;
261 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
263 struct obd_conn conn;
268 conn.oc_dev = ost->ost_tgt;
269 conn.oc_id = req->rq_req.ost->connid;
271 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
272 &req->rq_replen, &req->rq_repbuf);
274 printk("ost_setattr: cannot pack reply\n");
278 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
284 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
286 struct obd_conn conn;
294 conn.oc_id = req->rq_req.ost->connid;
295 conn.oc_dev = ost->ost_tgt;
297 ptr = ost_req_buf1(req->rq_req.ost);
298 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
299 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
301 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
302 &req->rq_replen, &req->rq_repbuf);
304 printk("ost_setattr: cannot pack reply\n");
313 static struct page * ext2_get_page(struct inode *dir, unsigned long n)
315 struct address_space *mapping = dir->i_mapping;
316 struct page *page = read_cache_page(mapping, n,
317 (filler_t*)mapping->a_ops->readpage, NULL);
321 if (!Page_Uptodate(page))
323 if (!PageChecked(page))
324 ext2_check_page(page);
332 return ERR_PTR(-EIO);
337 static inline void ext2_put_page(struct page *page)
340 page_cache_release(page);
343 /* Releases the page */
344 void ext2_set_link(struct inode *dir, struct ext2_dir_entry_2 *de,
345 struct page *page, struct inode *inode)
347 unsigned from = (char *) de - (char *) page_address(page);
348 unsigned to = from + le16_to_cpu(de->rec_len);
352 err = page->mapping->a_ops->prepare_write(NULL, page, from, to);
355 de->inode = cpu_to_le32(inode->i_ino);
356 ext2_set_de_type (de, inode);
357 dir->i_mtime = dir->i_ctime = CURRENT_TIME;
358 err = ext2_commit_chunk(page, from, to);
363 static int ext2_commit_chunk(struct page *page, unsigned from, unsigned to)
365 struct inode *dir = page->mapping->host;
367 dir->i_version = ++event;
368 SetPageUptodate(page);
369 set_page_clean(page);
371 //page->mapping->a_ops->commit_write(NULL, page, from, to);
373 // err = waitfor_one_page(page);
379 int ost_prepw(struct ost_obd *obddev, struct ptlrpc_request *req)
382 struct obd_conn conn;
388 struct obd_ioo **ioo;
392 tmp1 = ost_req_buf1(req);
393 tmp2 = ost_req_buf2(req);
394 objcount = req->buflen1 / sizeof(**ioo);
397 for (i=0 ; i<objcount ; i++) {
400 conn.oc_id = req->rq_req.ost->connid;
401 conn.oc_dev = ost->ost_tgt;
403 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
404 &req->rq_replen, &req->rq_repbuf);
406 printk("ost_create: cannot pack reply\n");
410 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
412 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
413 (&conn, &req->rq_rep.ost->oa);
423 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
426 struct ost_obd *ost = &obddev->u.ost;
427 struct ptlreq_hdr *hdr;
430 printk("ost_handle: req at %p\n", req);
432 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
433 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
434 printk("lustre_ost: wrong packet type sent %d\n",
435 NTOH__u32(hdr->type));
440 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
441 &req->rq_reqhdr, &req->rq_req.ost);
443 printk("lustre_ost: Invalid request\n");
448 switch (req->rq_reqhdr->opc) {
451 CDEBUG(D_INODE, "connect\n");
452 printk("----> connect \n");
453 rc = ost_connect(ost, req);
456 CDEBUG(D_INODE, "disconnect\n");
457 rc = ost_disconnect(ost, req);
460 CDEBUG(D_INODE, "get_info\n");
461 rc = ost_get_info(ost, req);
464 CDEBUG(D_INODE, "create\n");
465 rc = ost_create(ost, req);
468 CDEBUG(D_INODE, "destroy\n");
469 rc = ost_destroy(ost, req);
472 CDEBUG(D_INODE, "getattr\n");
473 rc = ost_getattr(ost, req);
476 CDEBUG(D_INODE, "setattr\n");
477 rc = ost_setattr(ost, req);
480 CDEBUG(D_INODE, "prepw\n");
481 rc = ost_prepw(ost, req);
484 req->rq_status = -ENOTSUPP;
485 return ost_error(obddev, req);
491 printk("ost: processing error %d\n", rc);
492 ost_error(obddev, req);
494 CDEBUG(D_INODE, "sending reply\n");
495 ost_reply(obddev, req);
501 int ost_main(void *arg)
503 struct obd_device *obddev = (struct obd_device *) arg;
504 struct ost_obd *ost = &obddev->u.ost;
506 printk("---> %d\n", __LINE__);
510 printk("---> %d\n", __LINE__);
512 printk("---> %d\n", __LINE__);
513 spin_lock_irq(¤t->sigmask_lock);
514 printk("---> %d\n", __LINE__);
515 sigfillset(¤t->blocked);
516 printk("---> %d\n", __LINE__);
517 recalc_sigpending(current);
518 printk("---> %d\n", __LINE__);
519 spin_unlock_irq(¤t->sigmask_lock);
520 printk("---> %d\n", __LINE__);
522 printk("---> %d\n", __LINE__);
523 sprintf(current->comm, "lustre_ost");
524 printk("---> %d\n", __LINE__);
526 /* Record that the thread is running */
527 ost->ost_thread = current;
528 printk("---> %d\n", __LINE__);
529 wake_up(&ost->ost_done_waitq);
530 printk("---> %d\n", __LINE__);
532 /* XXX maintain a list of all managed devices: insert here */
534 /* And now, wait forever for commit wakeup events. */
536 struct ptlrpc_request *request;
539 if (ost->ost_flags & OST_EXIT)
543 wake_up(&ost->ost_done_waitq);
544 interruptible_sleep_on(&ost->ost_waitq);
546 CDEBUG(D_INODE, "lustre_ost wakes\n");
547 CDEBUG(D_INODE, "pick up req here and continue\n");
549 if (list_empty(&ost->ost_reqs)) {
550 CDEBUG(D_INODE, "woke because of timer\n");
552 printk("---> %d\n", __LINE__);
553 request = list_entry(ost->ost_reqs.next,
554 struct ptlrpc_request, rq_list);
555 printk("---> %d\n", __LINE__);
556 list_del(&request->rq_list);
557 rc = ost_handle(obddev, request);
561 /* XXX maintain a list of all managed devices: cleanup here */
562 printk("---> %d\n", __LINE__);
563 ost->ost_thread = NULL;
564 printk("---> %d\n", __LINE__);
565 wake_up(&ost->ost_done_waitq);
566 printk("lustre_ost: exiting\n");
570 static void ost_stop_srv_thread(struct ost_obd *ost)
572 ost->ost_flags |= OST_EXIT;
574 while (ost->ost_thread) {
575 wake_up(&ost->ost_waitq);
576 sleep_on(&ost->ost_done_waitq);
580 static void ost_start_srv_thread(struct obd_device *obd)
582 struct ost_obd *ost = &obd->u.ost;
585 init_waitqueue_head(&ost->ost_waitq);
586 printk("---> %d\n", __LINE__);
587 init_waitqueue_head(&ost->ost_done_waitq);
588 printk("---> %d\n", __LINE__);
589 kernel_thread(ost_main, (void *)obd,
590 CLONE_VM | CLONE_FS | CLONE_FILES);
591 printk("---> %d\n", __LINE__);
592 while (!ost->ost_thread)
593 sleep_on(&ost->ost_done_waitq);
594 printk("---> %d\n", __LINE__);
598 /* mount the file system (secretly) */
599 static int ost_setup(struct obd_device *obddev, obd_count len,
603 struct obd_ioctl_data* data = buf;
604 struct ost_obd *ost = &obddev->u.ost;
605 struct obd_device *tgt;
609 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
614 tgt = &obd_dev[data->ioc_dev];
616 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
617 ! (tgt->obd_flags & OBD_SET_UP) ){
618 printk("device not attached or not set up (%d)\n",
624 ost->ost_conn.oc_dev = tgt;
625 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
627 printk("lustre ost: fail to connect to device %d\n",
632 INIT_LIST_HEAD(&ost->ost_reqs);
633 ost->ost_thread = NULL;
636 spin_lock_init(&obddev->u.ost.ost_lock);
638 ost_start_srv_thread(obddev);
645 static int ost_cleanup(struct obd_device * obddev)
647 struct ost_obd *ost = &obddev->u.ost;
648 struct obd_device *tgt;
653 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
658 if ( !list_empty(&obddev->obd_gen_clients) ) {
659 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
664 ost_stop_srv_thread(ost);
666 if (!list_empty(&ost->ost_reqs)) {
667 // XXX reply with errors and clean up
668 CDEBUG(D_INODE, "Request list not empty!\n");
672 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
674 printk("lustre ost: fail to disconnect device\n");
684 /* use obd ops to offer management infrastructure */
685 static struct obd_ops ost_obd_ops = {
687 o_cleanup: ost_cleanup,
690 static int __init ost_init(void)
692 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
696 static void __exit ost_exit(void)
698 obd_unregister_type(LUSTRE_OST_NAME);
701 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
702 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
703 MODULE_LICENSE("GPL");
705 // for testing (maybe this stays)
706 EXPORT_SYMBOL(ost_queue_req);
708 module_init(ost_init);
709 module_exit(ost_exit);