3 * Storage Target Handling functions
5 * Lustre Object Server Module (OST)
7 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9 * This code is issued under the GNU General Public License.
10 * See the file COPYING in this distribution
12 * by Peter Braam <braam@clusterfs.com>
14 * This server is single threaded at present (but can easily be multi
15 * threaded). For testing and management it is treated as an
16 * obd_device, although it does not export a full OBD method table
17 * (the requests are coming in over the wire, so object target
18 * modules do not have a full method table.)
24 #include <linux/version.h>
25 #include <linux/module.h>
27 #include <linux/stat.h>
28 #include <linux/locks.h>
29 #include <linux/ext2_fs.h>
30 #include <linux/quotaops.h>
31 #include <asm/unistd.h>
33 #define DEBUG_SUBSYSTEM S_OST
35 #include <linux/obd_support.h>
36 #include <linux/obd.h>
37 #include <linux/obd_class.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_idl.h>
40 #include <linux/lustre_mds.h>
41 #include <linux/obd_class.h>
44 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
46 struct ptlrpc_request *srv_req;
47 struct ost_obd *ost = &obddev->u.ost;
54 OBD_ALLOC(srv_req, sizeof(*srv_req));
60 CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
61 __LINE__, ost, req, srv_req);
63 memset(srv_req, 0, sizeof(*req));
65 /* move the request buffer */
66 srv_req->rq_reqbuf = req->rq_reqbuf;
67 srv_req->rq_reqlen = req->rq_reqlen;
68 srv_req->rq_ost = ost;
70 /* remember where it came from */
71 srv_req->rq_reply_handle = req;
73 list_add(&srv_req->rq_list, &ost->ost_reqs);
74 wake_up(&ost->ost_waitq);
78 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
80 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
84 if (req->rq_ost->ost_service != NULL) {
85 /* This is a request that came from the network via portals. */
87 /* FIXME: we need to increment the count of handled events */
88 ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
90 /* This is a local request that came from another thread. */
92 /* move the reply to the client */
93 clnt_req->rq_replen = req->rq_replen;
94 clnt_req->rq_repbuf = req->rq_repbuf;
95 req->rq_repbuf = NULL;
98 /* free the request buffer */
99 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
100 req->rq_reqbuf = NULL;
102 /* wake up the client */
103 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
110 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
112 struct ptlrep_hdr *hdr;
116 OBD_ALLOC(hdr, sizeof(*hdr));
122 memset(hdr, 0, sizeof(*hdr));
124 hdr->seqno = req->rq_reqhdr->seqno;
125 hdr->status = req->rq_status;
126 hdr->type = OST_TYPE_ERR;
128 req->rq_repbuf = (char *)hdr;
129 req->rq_replen = sizeof(*hdr);
132 return ost_reply(obddev, req);
135 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
137 struct obd_conn conn;
142 conn.oc_id = req->rq_req.ost->connid;
143 conn.oc_dev = ost->ost_tgt;
145 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
146 &req->rq_replen, &req->rq_repbuf);
148 CERROR("cannot pack reply\n");
152 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
153 (&conn, &req->rq_req.ost->oa);
159 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
161 struct obd_conn conn;
166 conn.oc_id = req->rq_req.ost->connid;
167 conn.oc_dev = ost->ost_tgt;
169 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
170 &req->rq_replen, &req->rq_repbuf);
172 CERROR("cannot pack reply\n");
175 req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
176 req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
178 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
179 (&conn, &req->rq_rep.ost->oa);
185 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
187 struct obd_conn conn;
192 conn.oc_id = req->rq_req.ost->connid;
193 conn.oc_dev = ost->ost_tgt;
195 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
196 &req->rq_replen, &req->rq_repbuf);
198 CERROR("cannot pack reply\n");
202 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
204 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
205 (&conn, &req->rq_rep.ost->oa);
211 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
213 struct obd_conn conn;
218 conn.oc_id = req->rq_req.ost->connid;
219 conn.oc_dev = ost->ost_tgt;
221 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
222 &req->rq_replen, &req->rq_repbuf);
224 CERROR("cannot pack reply\n");
228 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
230 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_punch
231 (&conn, &req->rq_rep.ost->oa,
232 req->rq_rep.ost->oa.o_size,
233 req->rq_rep.ost->oa.o_blocks);
240 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
242 struct obd_conn conn;
247 conn.oc_id = req->rq_req.ost->connid;
248 conn.oc_dev = ost->ost_tgt;
250 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
251 &req->rq_replen, &req->rq_repbuf);
253 CERROR("cannot pack reply\n");
257 memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
258 sizeof(req->rq_req.ost->oa));
260 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
261 (&conn, &req->rq_rep.ost->oa);
267 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
269 struct obd_conn conn;
274 conn.oc_dev = ost->ost_tgt;
276 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
277 &req->rq_replen, &req->rq_repbuf);
279 CERROR("cannot pack reply\n");
283 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
285 CDEBUG(0, "rep buffer %p, id %d\n", req->rq_repbuf,
287 req->rq_rep.ost->connid = conn.oc_id;
292 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
294 struct obd_conn conn;
299 conn.oc_dev = ost->ost_tgt;
300 conn.oc_id = req->rq_req.ost->connid;
302 rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
303 &req->rq_replen, &req->rq_repbuf);
305 CERROR("cannot pack reply\n");
309 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
315 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
317 struct obd_conn conn;
325 conn.oc_id = req->rq_req.ost->connid;
326 conn.oc_dev = ost->ost_tgt;
328 ptr = ost_req_buf1(req->rq_req.ost);
329 req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
330 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
332 rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
333 &req->rq_rep.ost, &req->rq_replen, &req->rq_repbuf);
335 CERROR("cannot pack reply\n");
343 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
345 struct obd_conn conn;
348 int objcount, niocount;
349 char *tmp1, *tmp2, *end2;
352 struct niobuf *nb, *src, *dst;
353 struct obd_ioobj *ioo;
354 struct ost_req *r = req->rq_req.ost;
358 tmp1 = ost_req_buf1(r);
359 tmp2 = ost_req_buf2(r);
360 end2 = tmp2 + req->rq_req.ost->buflen2;
361 objcount = r->buflen1 / sizeof(*ioo);
362 niocount = r->buflen2 / sizeof(*nb);
365 conn.oc_id = req->rq_req.ost->connid;
366 conn.oc_dev = req->rq_ost->ost_tgt;
368 rc = ost_pack_rep(NULL, niocount, NULL, 0,
369 &req->rq_rephdr, &req->rq_rep.ost,
370 &req->rq_replen, &req->rq_repbuf);
372 CERROR("cannot pack reply\n");
375 res = ost_rep_buf1(req->rq_rep.ost);
377 for (i=0; i < objcount; i++) {
378 ost_unpack_ioo((void *)&tmp1, &ioo);
379 if (tmp2 + ioo->ioo_bufcnt > end2) {
383 for (j = 0 ; j < ioo->ioo_bufcnt ; j++) {
384 ost_unpack_niobuf((void *)&tmp2, &nb);
388 /* The unpackers move tmp1 and tmp2, so reset them before using */
389 tmp1 = ost_req_buf1(r);
390 tmp2 = ost_req_buf2(r);
391 req->rq_rep.ost->result =
392 req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
393 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
394 niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
396 if (cmd == OBD_BRW_WRITE) {
397 for (i=0; i<niocount; i++) {
398 src = &((struct niobuf *)tmp2)[i];
399 dst = &((struct niobuf *)res)[i];
400 memcpy((void *)(unsigned long)dst->addr,
401 (void *)(unsigned long)src->addr,
405 for (i=0; i<niocount; i++) {
406 dst = &((struct niobuf *)tmp2)[i];
407 src = &((struct niobuf *)res)[i];
408 memcpy((void *)(unsigned long)dst->addr,
409 (void *)(unsigned long)src->addr,
414 req->rq_rep.ost->result =
415 req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
416 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
417 niocount, (struct niobuf *)res);
423 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
426 struct ost_obd *ost = &obddev->u.ost;
427 struct ptlreq_hdr *hdr;
430 CDEBUG(0, "req at %p\n", req);
432 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
433 if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
434 CERROR("lustre_ost: wrong packet type sent %d\n",
435 NTOH__u32(hdr->type));
440 rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
441 &req->rq_reqhdr, &req->rq_req.ost);
443 CERROR("lustre_ost: Invalid request\n");
448 switch (req->rq_reqhdr->opc) {
451 CDEBUG(D_INODE, "connect\n");
452 rc = ost_connect(ost, req);
455 CDEBUG(D_INODE, "disconnect\n");
456 rc = ost_disconnect(ost, req);
459 CDEBUG(D_INODE, "get_info\n");
460 rc = ost_get_info(ost, req);
463 CDEBUG(D_INODE, "create\n");
464 rc = ost_create(ost, req);
467 CDEBUG(D_INODE, "destroy\n");
468 rc = ost_destroy(ost, req);
471 CDEBUG(D_INODE, "getattr\n");
472 rc = ost_getattr(ost, req);
475 CDEBUG(D_INODE, "setattr\n");
476 rc = ost_setattr(ost, req);
479 CDEBUG(D_INODE, "brw\n");
480 rc = ost_brw(ost, req);
483 CDEBUG(D_INODE, "punch\n");
484 rc = ost_punch(ost, req);
487 req->rq_status = -ENOTSUPP;
488 return ost_error(obddev, req);
494 CERROR("ost: processing error %d\n", rc);
495 ost_error(obddev, req);
497 CDEBUG(D_INODE, "sending reply\n");
498 ost_reply(obddev, req);
504 int ost_main(void *arg)
506 struct obd_device *obddev = (struct obd_device *) arg;
507 struct ost_obd *ost = &obddev->u.ost;
512 spin_lock_irq(¤t->sigmask_lock);
513 sigfillset(¤t->blocked);
514 recalc_sigpending(current);
515 spin_unlock_irq(¤t->sigmask_lock);
517 sprintf(current->comm, "lustre_ost");
519 /* Record that the thread is running */
520 ost->ost_thread = current;
521 wake_up(&ost->ost_done_waitq);
523 /* XXX maintain a list of all managed devices: insert here */
525 /* And now, wait forever for commit wakeup events. */
529 if (ost->ost_flags & OST_EXIT)
532 wake_up(&ost->ost_done_waitq);
533 interruptible_sleep_on(&ost->ost_waitq);
535 CDEBUG(D_INODE, "lustre_ost wakes\n");
536 CDEBUG(D_INODE, "pick up req here and continue\n");
539 if (ost->ost_service != NULL) {
542 CDEBUG(D_INODE, "\n");
544 struct ptlrpc_request request;
545 struct ptlrpc_service *service;
546 CDEBUG(D_INODE, "\n");
547 rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
548 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
550 CDEBUG(D_INODE, "\n");
552 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
554 /* FIXME: If we move to an event-driven model,
555 * we should put the request on the stack of
556 * mds_handle instead. */
557 memset(&request, 0, sizeof(request));
558 request.rq_reqbuf = ev.mem_desc.start +
560 request.rq_reqlen = ev.mem_desc.length;
561 request.rq_ost = ost;
562 request.rq_xid = ev.match_bits;
564 request.rq_peer.peer_nid = ev.initiator.nid;
565 /* FIXME: this NI should be the incoming NI.
566 * We don't know how to find that from here. */
567 request.rq_peer.peer_ni =
568 ost->ost_service->srv_self.peer_ni;
569 rc = ost_handle(obddev, &request);
571 /* Inform the rpc layer the event has been handled */
572 ptl_received_rpc(service);
575 struct ptlrpc_request *request;
577 if (list_empty(&ost->ost_reqs)) {
578 CDEBUG(D_INODE, "woke because of timer\n");
580 request = list_entry(ost->ost_reqs.next,
581 struct ptlrpc_request,
583 list_del(&request->rq_list);
584 rc = ost_handle(obddev, request);
589 /* XXX maintain a list of all managed devices: cleanup here */
591 ost->ost_thread = NULL;
592 wake_up(&ost->ost_done_waitq);
593 CERROR("lustre_ost: exiting\n");
597 static void ost_stop_srv_thread(struct ost_obd *ost)
599 ost->ost_flags |= OST_EXIT;
601 while (ost->ost_thread) {
602 wake_up(&ost->ost_waitq);
603 sleep_on(&ost->ost_done_waitq);
607 static void ost_start_srv_thread(struct obd_device *obd)
609 struct ost_obd *ost = &obd->u.ost;
612 init_waitqueue_head(&ost->ost_waitq);
613 init_waitqueue_head(&ost->ost_done_waitq);
614 kernel_thread(ost_main, (void *)obd,
615 CLONE_VM | CLONE_FS | CLONE_FILES);
616 while (!ost->ost_thread)
617 sleep_on(&ost->ost_done_waitq);
621 /* mount the file system (secretly) */
622 static int ost_setup(struct obd_device *obddev, obd_count len,
626 struct obd_ioctl_data* data = buf;
627 struct ost_obd *ost = &obddev->u.ost;
628 struct obd_device *tgt;
629 struct lustre_peer peer;
633 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
638 tgt = &obd_dev[data->ioc_dev];
640 if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
641 ! (tgt->obd_flags & OBD_SET_UP) ){
642 CERROR("device not attached or not set up (%d)\n",
648 ost->ost_conn.oc_dev = tgt;
649 err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
651 CERROR("lustre ost: fail to connect to device %d\n",
656 INIT_LIST_HEAD(&ost->ost_reqs);
657 ost->ost_thread = NULL;
660 spin_lock_init(&obddev->u.ost.ost_lock);
662 err = kportal_uuid_to_peer("self", &peer);
664 OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
665 if (ost->ost_service == NULL)
667 ost->ost_service->srv_buf_size = 64 * 1024;
668 ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
669 memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
670 ost->ost_service->srv_wait_queue = &ost->ost_waitq;
672 rpc_register_service(ost->ost_service, "self");
675 ost_start_srv_thread(obddev);
682 static int ost_cleanup(struct obd_device * obddev)
684 struct ost_obd *ost = &obddev->u.ost;
685 struct obd_device *tgt;
690 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
695 if ( !list_empty(&obddev->obd_gen_clients) ) {
696 CERROR("still has clients!\n");
701 ost_stop_srv_thread(ost);
702 rpc_unregister_service(ost->ost_service);
703 OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
705 if (!list_empty(&ost->ost_reqs)) {
706 // XXX reply with errors and clean up
707 CDEBUG(D_INODE, "Request list not empty!\n");
711 err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
713 CERROR("lustre ost: fail to disconnect device\n");
723 /* use obd ops to offer management infrastructure */
724 static struct obd_ops ost_obd_ops = {
726 o_cleanup: ost_cleanup,
729 static int __init ost_init(void)
731 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
735 static void __exit ost_exit(void)
737 obd_unregister_type(LUSTRE_OST_NAME);
740 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
741 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
742 MODULE_LICENSE("GPL");
744 // for testing (maybe this stays)
745 EXPORT_SYMBOL(ost_queue_req);
747 module_init(ost_init);
748 module_exit(ost_exit);