4 * Lustre Object Server Module (OST)
6 * Copyright (C) 2001 Cluster File Systems, Inc.
8 * This code is issued under the GNU General Public License.
9 * See the file COPYING in this distribution
11 * by Peter Braam <braam@clusterfs.com>
13 * This server is single threaded at present (but can easily be multi threaded).
14 * For testing and management it is treated as an obd_device, although it does
15 * not export a full OBD method table (the requests are coming in over the wire,
16 * so object target modules do not have a full method table.)
23 #include <linux/version.h>
24 #include <linux/module.h>
26 #include <linux/stat.h>
27 #include <linux/locks.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <asm/unistd.h>
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/obd_class.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
40 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
42 struct ost_request *srv_req;
43 struct ost_obd *ost = &obddev->u.ost;
50 srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
56 printk("---> OST at %d %p, incoming req %p, srv_req %p\n",
57 __LINE__, ost, req, srv_req);
59 memset(srv_req, 0, sizeof(*req));
61 /* move the request buffer */
62 srv_req->rq_reqbuf = req->rq_reqbuf;
63 srv_req->rq_reqlen = req->rq_reqlen;
64 srv_req->rq_obd = ost;
66 /* remember where it came from */
67 srv_req->rq_reply_handle = req;
69 list_add(&srv_req->rq_list, &ost->ost_reqs);
70 wake_up(&ost->ost_waitq);
75 /* XXX replace with networking code */
76 int ost_reply(struct obd_device *obddev, struct ost_request *req)
78 struct ost_request *clnt_req = req->rq_reply_handle;
81 printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req);
83 /* free the request buffer */
84 kfree(req->rq_reqbuf);
85 req->rq_reqbuf = NULL;
87 /* move the reply to the client */
88 clnt_req->rq_replen = req->rq_replen;
89 clnt_req->rq_repbuf = req->rq_repbuf;
91 printk("---> client req %p repbuf %p len %d status %d\n",
92 clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen,
93 req->rq_rephdr->status);
95 req->rq_repbuf = NULL;
98 /* free the server request */
100 /* wake up the client */
101 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
106 int ost_error(struct obd_device *obddev, struct ost_request *req)
108 struct ost_rep_hdr *hdr;
111 hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
117 memset(hdr, 0, sizeof(*hdr));
119 hdr->seqno = req->rq_reqhdr->seqno;
120 hdr->status = req->rq_status;
121 hdr->type = OST_TYPE_ERR;
123 req->rq_repbuf = (char *)hdr;
124 req->rq_replen = sizeof(*hdr);
127 return ost_reply(obddev, req);
130 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
132 struct obd_conn conn;
137 conn.oc_id = req->rq_req->connid;
138 conn.oc_dev = ost->ost_tgt;
140 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141 &req->rq_replen, &req->rq_repbuf);
143 printk("ost_destroy: cannot pack reply\n");
147 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
148 (&conn, &req->rq_req->oa);
154 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
156 struct obd_conn conn;
160 printk("ost getattr entered\n");
162 conn.oc_id = req->rq_req->connid;
163 conn.oc_dev = ost->ost_tgt;
165 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
166 &req->rq_replen, &req->rq_repbuf);
168 printk("ost_getattr: cannot pack reply\n");
171 req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
172 req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
174 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
175 (&conn, &req->rq_rep->oa);
181 static int ost_create(struct ost_obd *ost, struct ost_request *req)
183 struct obd_conn conn;
188 conn.oc_id = req->rq_req->connid;
189 conn.oc_dev = ost->ost_tgt;
191 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
192 &req->rq_replen, &req->rq_repbuf);
194 printk("ost_create: cannot pack reply\n");
198 memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
200 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
201 (&conn, &req->rq_rep->oa);
208 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
210 struct obd_conn conn;
215 conn.oc_id = req->rq_req->connid;
216 conn.oc_dev = ost->ost_tgt;
218 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
219 &req->rq_replen, &req->rq_repbuf);
221 printk("ost_setattr: cannot pack reply\n");
225 memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
227 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
228 (&conn, &req->rq_rep->oa);
234 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
236 struct obd_conn conn;
241 conn.oc_dev = ost->ost_tgt;
243 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
244 &req->rq_replen, &req->rq_repbuf);
246 printk("ost_setattr: cannot pack reply\n");
250 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
252 printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
254 req->rq_rep->connid = conn.oc_id;
260 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
262 struct obd_conn conn;
267 conn.oc_dev = ost->ost_tgt;
268 conn.oc_id = req->rq_req->connid;
270 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
271 &req->rq_replen, &req->rq_repbuf);
273 printk("ost_setattr: cannot pack reply\n");
277 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
283 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
285 struct obd_conn conn;
292 conn.oc_id = req->rq_req->connid;
293 conn.oc_dev = ost->ost_tgt;
295 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
296 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val);
299 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
300 &req->rq_replen, &req->rq_repbuf);
302 printk("ost_setattr: cannot pack reply\n");
312 //int ost_handle(struct ost_conn *conn, int len, char *buf)
313 int ost_handle(struct obd_device *obddev, struct ost_request *req)
316 struct ost_obd *ost = &obddev->u.ost;
317 struct ost_req_hdr *hdr;
320 printk("ost_handle: req at %p\n", req);
322 hdr = (struct ost_req_hdr *)req->rq_reqbuf;
323 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
324 printk("lustre_ost: wrong packet type sent %d\n",
325 NTOH__u32(hdr->type));
330 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
331 &req->rq_reqhdr, &req->rq_req);
333 printk("lustre_ost: Invalid request\n");
338 switch (req->rq_reqhdr->opc) {
341 CDEBUG(D_INODE, "connect\n");
342 printk("----> connect \n");
343 rc = ost_connect(ost, req);
346 CDEBUG(D_INODE, "disconnect\n");
347 rc = ost_disconnect(ost, req);
350 CDEBUG(D_INODE, "get_info\n");
351 rc = ost_get_info(ost, req);
354 CDEBUG(D_INODE, "create\n");
355 rc = ost_create(ost, req);
358 CDEBUG(D_INODE, "destroy\n");
359 rc = ost_destroy(ost, req);
362 CDEBUG(D_INODE, "getattr\n");
363 rc = ost_getattr(ost, req);
366 CDEBUG(D_INODE, "setattr\n");
367 rc = ost_setattr(ost, req);
371 return ost_error(obddev, req);
375 req->rq_rephdr->status = rc;
377 printk("ost: processing error %d\n", rc);
378 ost_error(obddev, req);
380 CDEBUG(D_INODE, "sending reply\n");
381 ost_reply(obddev, req);
387 int ost_main(void *arg)
389 struct obd_device *obddev = (struct obd_device *) arg;
390 struct ost_obd *ost = &obddev->u.ost;
392 printk("---> %d\n", __LINE__);
396 printk("---> %d\n", __LINE__);
398 printk("---> %d\n", __LINE__);
399 spin_lock_irq(¤t->sigmask_lock);
400 printk("---> %d\n", __LINE__);
401 sigfillset(¤t->blocked);
402 printk("---> %d\n", __LINE__);
403 recalc_sigpending(current);
404 printk("---> %d\n", __LINE__);
405 spin_unlock_irq(¤t->sigmask_lock);
406 printk("---> %d\n", __LINE__);
408 printk("---> %d\n", __LINE__);
409 sprintf(current->comm, "lustre_ost");
410 printk("---> %d\n", __LINE__);
412 /* Record that the thread is running */
413 ost->ost_thread = current;
414 printk("---> %d\n", __LINE__);
415 wake_up(&ost->ost_done_waitq);
416 printk("---> %d\n", __LINE__);
418 /* XXX maintain a list of all managed devices: insert here */
420 /* And now, wait forever for commit wakeup events. */
422 struct ost_request *request;
425 if (ost->ost_flags & OST_EXIT)
429 wake_up(&ost->ost_done_waitq);
430 interruptible_sleep_on(&ost->ost_waitq);
432 CDEBUG(D_INODE, "lustre_ost wakes\n");
433 CDEBUG(D_INODE, "pick up req here and continue\n");
435 if (list_empty(&ost->ost_reqs)) {
436 CDEBUG(D_INODE, "woke because of timer\n");
438 printk("---> %d\n", __LINE__);
439 request = list_entry(ost->ost_reqs.next,
440 struct ost_request, rq_list);
441 printk("---> %d\n", __LINE__);
442 list_del(&request->rq_list);
443 rc = ost_handle(obddev, request);
447 /* XXX maintain a list of all managed devices: cleanup here */
448 printk("---> %d\n", __LINE__);
449 ost->ost_thread = NULL;
450 printk("---> %d\n", __LINE__);
451 wake_up(&ost->ost_done_waitq);
452 printk("lustre_ost: exiting\n");
456 static void ost_stop_srv_thread(struct ost_obd *ost)
458 ost->ost_flags |= OST_EXIT;
460 while (ost->ost_thread) {
461 wake_up(&ost->ost_waitq);
462 sleep_on(&ost->ost_done_waitq);
466 static void ost_start_srv_thread(struct obd_device *obd)
468 struct ost_obd *ost = &obd->u.ost;
471 init_waitqueue_head(&ost->ost_waitq);
472 printk("---> %d\n", __LINE__);
473 init_waitqueue_head(&ost->ost_done_waitq);
474 printk("---> %d\n", __LINE__);
475 kernel_thread(ost_main, (void *)obd,
476 CLONE_VM | CLONE_FS | CLONE_FILES);
477 printk("---> %d\n", __LINE__);
478 while (!ost->ost_thread)
479 sleep_on(&ost->ost_done_waitq);
480 printk("---> %d\n", __LINE__);
484 /* mount the file system (secretly) */
485 static int ost_setup(struct obd_device *obddev, obd_count len,
489 struct obd_ioctl_data* data = buf;
490 struct ost_obd *ost = &obddev->u.ost;
491 struct obd_device *tgt;
495 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
500 tgt = &obd_dev[data->ioc_dev];
502 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
503 ! (tgt->obd_flags & OBD_SET_UP) ){
504 printk("device not attached or not set up (%d)\n",
510 ost->ost_conn.oc_dev = tgt;
511 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
513 printk("lustre ost: fail to connect to device %d\n",
518 INIT_LIST_HEAD(&ost->ost_reqs);
519 ost->ost_thread = NULL;
522 spin_lock_init(&obddev->u.ost.fo_lock);
524 ost_start_srv_thread(obddev);
531 static int ost_cleanup(struct obd_device * obddev)
533 struct ost_obd *ost = &obddev->u.ost;
534 struct obd_device *tgt;
539 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
544 if ( !list_empty(&obddev->obd_gen_clients) ) {
545 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
550 ost_stop_srv_thread(ost);
552 if (!list_empty(&ost->ost_reqs)) {
553 // XXX reply with errors and clean up
554 CDEBUG(D_INODE, "Request list not empty!\n");
558 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
560 printk("lustre ost: fail to disconnect device\n");
570 /* use obd ops to offer management infrastructure */
571 static struct obd_ops ost_obd_ops = {
573 o_cleanup: ost_cleanup,
576 static int __init ost_init(void)
578 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
582 static void __exit ost_exit(void)
584 obd_unregister_type(LUSTRE_OST_NAME);
587 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
588 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
589 MODULE_LICENSE("GPL");
591 // for testing (maybe this stays)
592 EXPORT_SYMBOL(ost_queue_req);
594 module_init(ost_init);
595 module_exit(ost_exit);