4 * Lustre Object Server Module (OST)
6 * Copyright (C) 2001 Cluster File Systems, Inc.
8 * This code is issued under the GNU General Public License.
9 * See the file COPYING in this distribution
11 * by Peter Braam <braam@clusterfs.com>
13 * This server is single threaded at present (but can easily be multi threaded).
14 * For testing and management it is treated as an obd_device, although it does
15 * not export a full OBD method table (the requests are coming in over the wire,
16 * so object target modules do not have a full method table.)
23 #include <linux/version.h>
24 #include <linux/module.h>
26 #include <linux/stat.h>
27 #include <linux/locks.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <asm/unistd.h>
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/obd_class.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
40 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
42 struct ost_request *srv_req;
43 struct ost_obd *ost = &obddev->u.ost;
50 srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
56 printk("---> OST at %d %p, incoming req %p, srv_req %p\n",
57 __LINE__, ost, req, srv_req);
59 memset(srv_req, 0, sizeof(*req));
60 srv_req->rq_reqbuf = req->rq_reqbuf;
61 srv_req->rq_reqlen = req->rq_reqlen;
62 srv_req->rq_obd = ost;
63 srv_req->rq_reply_handle = req;
65 list_add(&srv_req->rq_list, &ost->ost_reqs);
66 wake_up(&ost->ost_waitq);
71 /* XXX replace with networking code */
72 int ost_reply(struct obd_device *obddev, struct ost_request *req)
74 struct ost_request *clnt_req = req->rq_reply_handle;
77 printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req);
79 /* free the request buffer */
80 kfree(req->rq_reqbuf);
81 req->rq_reqbuf = NULL;
83 /* move the reply to the client */
84 clnt_req->rq_replen = req->rq_replen;
85 clnt_req->rq_repbuf = req->rq_repbuf;
87 printk("---> client req %p repbuf %p len %d status %d\n",
88 clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen,
89 req->rq_rephdr->status);
91 req->rq_repbuf = NULL;
94 /* free the server request */
96 /* wake up the client */
97 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
102 int ost_error(struct obd_device *obddev, struct ost_request *req)
104 struct ost_rep_hdr *hdr;
107 hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
113 memset(hdr, 0, sizeof(*hdr));
115 hdr->seqno = req->rq_reqhdr->seqno;
116 hdr->status = req->rq_status;
117 hdr->type = OST_TYPE_ERR;
118 req->rq_repbuf = (char *)hdr;
121 return ost_reply(obddev, req);
124 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
126 struct obd_conn conn;
131 conn.oc_id = req->rq_req->connid;
132 conn.oc_dev = ost->ost_tgt;
134 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
135 &req->rq_replen, &req->rq_repbuf);
137 printk("ost_destroy: cannot pack reply\n");
141 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
142 (&conn, &req->rq_req->oa);
148 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
150 struct obd_conn conn;
154 printk("ost getattr entered\n");
156 conn.oc_id = req->rq_req->connid;
157 conn.oc_dev = ost->ost_tgt;
159 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
160 &req->rq_replen, &req->rq_repbuf);
162 printk("ost_getattr: cannot pack reply\n");
165 req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
166 req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
168 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
169 (&conn, &req->rq_rep->oa);
175 static int ost_create(struct ost_obd *ost, struct ost_request *req)
177 struct obd_conn conn;
182 conn.oc_id = req->rq_req->connid;
183 conn.oc_dev = ost->ost_tgt;
185 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
186 &req->rq_replen, &req->rq_repbuf);
188 printk("ost_create: cannot pack reply\n");
192 memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
194 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
195 (&conn, &req->rq_rep->oa);
202 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
204 struct obd_conn conn;
209 conn.oc_id = req->rq_req->connid;
210 conn.oc_dev = ost->ost_tgt;
212 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
213 &req->rq_replen, &req->rq_repbuf);
215 printk("ost_setattr: cannot pack reply\n");
219 memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
221 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
222 (&conn, &req->rq_rep->oa);
228 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
230 struct obd_conn conn;
235 conn.oc_dev = ost->ost_tgt;
237 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
238 &req->rq_replen, &req->rq_repbuf);
240 printk("ost_setattr: cannot pack reply\n");
244 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
246 printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
248 req->rq_rep->connid = conn.oc_id;
254 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
256 struct obd_conn conn;
261 conn.oc_dev = ost->ost_tgt;
262 conn.oc_id = req->rq_req->connid;
264 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
265 &req->rq_replen, &req->rq_repbuf);
267 printk("ost_setattr: cannot pack reply\n");
271 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
277 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
279 struct obd_conn conn;
286 conn.oc_id = req->rq_req->connid;
287 conn.oc_dev = ost->ost_tgt;
289 req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
290 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val);
293 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
294 &req->rq_replen, &req->rq_repbuf);
296 printk("ost_setattr: cannot pack reply\n");
306 //int ost_handle(struct ost_conn *conn, int len, char *buf)
307 int ost_handle(struct obd_device *obddev, struct ost_request *req)
310 struct ost_obd *ost = &obddev->u.ost;
311 struct ost_req_hdr *hdr;
314 printk("ost_handle: req at %p\n", req);
316 hdr = (struct ost_req_hdr *)req->rq_reqbuf;
317 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
318 printk("lustre_ost: wrong packet type sent %d\n",
319 NTOH__u32(hdr->type));
324 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
325 &req->rq_reqhdr, &req->rq_req);
327 printk("lustre_ost: Invalid request\n");
332 switch (req->rq_reqhdr->opc) {
335 CDEBUG(D_INODE, "connect\n");
336 printk("----> connect \n");
337 rc = ost_connect(ost, req);
340 CDEBUG(D_INODE, "disconnect\n");
341 rc = ost_disconnect(ost, req);
344 CDEBUG(D_INODE, "get_info\n");
345 rc = ost_get_info(ost, req);
348 CDEBUG(D_INODE, "create\n");
349 rc = ost_create(ost, req);
352 CDEBUG(D_INODE, "destroy\n");
353 rc = ost_destroy(ost, req);
356 CDEBUG(D_INODE, "getattr\n");
357 rc = ost_getattr(ost, req);
360 CDEBUG(D_INODE, "setattr\n");
361 rc = ost_setattr(ost, req);
365 return ost_error(obddev, req);
369 req->rq_rephdr->status = rc;
371 printk("ost: processing error %d\n", rc);
372 ost_error(obddev, req);
374 CDEBUG(D_INODE, "sending reply\n");
375 ost_reply(obddev, req);
381 int ost_main(void *arg)
383 struct obd_device *obddev = (struct obd_device *) arg;
384 struct ost_obd *ost = &obddev->u.ost;
386 printk("---> %d\n", __LINE__);
390 printk("---> %d\n", __LINE__);
392 printk("---> %d\n", __LINE__);
393 spin_lock_irq(¤t->sigmask_lock);
394 printk("---> %d\n", __LINE__);
395 sigfillset(¤t->blocked);
396 printk("---> %d\n", __LINE__);
397 recalc_sigpending(current);
398 printk("---> %d\n", __LINE__);
399 spin_unlock_irq(¤t->sigmask_lock);
400 printk("---> %d\n", __LINE__);
402 printk("---> %d\n", __LINE__);
403 sprintf(current->comm, "lustre_ost");
404 printk("---> %d\n", __LINE__);
406 /* Record that the thread is running */
407 ost->ost_thread = current;
408 printk("---> %d\n", __LINE__);
409 wake_up(&ost->ost_done_waitq);
410 printk("---> %d\n", __LINE__);
412 /* XXX maintain a list of all managed devices: insert here */
414 /* And now, wait forever for commit wakeup events. */
416 struct ost_request *request;
419 if (ost->ost_flags & OST_EXIT)
423 wake_up(&ost->ost_done_waitq);
424 interruptible_sleep_on(&ost->ost_waitq);
426 CDEBUG(D_INODE, "lustre_ost wakes\n");
427 CDEBUG(D_INODE, "pick up req here and continue\n");
429 if (list_empty(&ost->ost_reqs)) {
430 CDEBUG(D_INODE, "woke because of timer\n");
432 printk("---> %d\n", __LINE__);
433 request = list_entry(ost->ost_reqs.next,
434 struct ost_request, rq_list);
435 printk("---> %d\n", __LINE__);
436 list_del(&request->rq_list);
437 rc = ost_handle(obddev, request);
441 /* XXX maintain a list of all managed devices: cleanup here */
442 printk("---> %d\n", __LINE__);
443 ost->ost_thread = NULL;
444 printk("---> %d\n", __LINE__);
445 wake_up(&ost->ost_done_waitq);
446 printk("lustre_ost: exiting\n");
450 static void ost_stop_srv_thread(struct ost_obd *ost)
452 ost->ost_flags |= OST_EXIT;
454 while (ost->ost_thread) {
455 wake_up(&ost->ost_waitq);
456 sleep_on(&ost->ost_done_waitq);
460 static void ost_start_srv_thread(struct obd_device *obd)
462 struct ost_obd *ost = &obd->u.ost;
465 init_waitqueue_head(&ost->ost_waitq);
466 printk("---> %d\n", __LINE__);
467 init_waitqueue_head(&ost->ost_done_waitq);
468 printk("---> %d\n", __LINE__);
469 kernel_thread(ost_main, (void *)obd,
470 CLONE_VM | CLONE_FS | CLONE_FILES);
471 printk("---> %d\n", __LINE__);
472 while (!ost->ost_thread)
473 sleep_on(&ost->ost_done_waitq);
474 printk("---> %d\n", __LINE__);
478 /* mount the file system (secretly) */
479 static int ost_setup(struct obd_device *obddev, obd_count len,
483 struct obd_ioctl_data* data = buf;
484 struct ost_obd *ost = &obddev->u.ost;
485 struct obd_device *tgt;
489 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
494 tgt = &obd_dev[data->ioc_dev];
496 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
497 ! (tgt->obd_flags & OBD_SET_UP) ){
498 printk("device not attached or not set up (%d)\n",
504 ost->ost_conn.oc_dev = tgt;
505 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
507 printk("lustre ost: fail to connect to device %d\n",
512 INIT_LIST_HEAD(&ost->ost_reqs);
513 ost->ost_thread = NULL;
516 spin_lock_init(&obddev->u.ost.fo_lock);
518 ost_start_srv_thread(obddev);
525 static int ost_cleanup(struct obd_device * obddev)
527 struct ost_obd *ost = &obddev->u.ost;
528 struct obd_device *tgt;
533 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
538 if ( !list_empty(&obddev->obd_gen_clients) ) {
539 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
544 ost_stop_srv_thread(ost);
546 if (!list_empty(&ost->ost_reqs)) {
547 // XXX reply with errors and clean up
548 CDEBUG(D_INODE, "Request list not empty!\n");
552 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
554 printk("lustre ost: fail to disconnect device\n");
564 /* use obd ops to offer management infrastructure */
565 static struct obd_ops ost_obd_ops = {
567 o_cleanup: ost_cleanup,
570 static int __init ost_init(void)
572 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
576 static void __exit ost_exit(void)
578 obd_unregister_type(LUSTRE_OST_NAME);
581 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
582 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
583 MODULE_LICENSE("GPL");
585 // for testing (maybe this stays)
586 EXPORT_SYMBOL(ost_queue_req);
588 module_init(ost_init);
589 module_exit(ost_exit);