1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
41 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
44 struct ost_body *body;
45 int rc, size = sizeof(*body);
48 body = lustre_msg_buf(req->rq_reqmsg, 0);
49 conn.oc_id = body->connid;
50 conn.oc_dev = ost->ost_tgt;
52 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
56 req->rq_status = obd_destroy(&conn, &body->oa);
60 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
63 struct ost_body *body, *repbody;
64 int rc, size = sizeof(*body);
67 body = lustre_msg_buf(req->rq_reqmsg, 0);
68 conn.oc_id = body->connid;
69 conn.oc_dev = ost->ost_tgt;
71 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
75 repbody = lustre_msg_buf(req->rq_repmsg, 0);
76 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
77 req->rq_status = obd_getattr(&conn, &repbody->oa);
81 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
84 struct ost_body *body, *repbody;
85 int rc, size = sizeof(*body);
88 body = lustre_msg_buf(req->rq_reqmsg, 0);
89 conn.oc_id = body->connid;
90 conn.oc_dev = ost->ost_tgt;
92 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
96 repbody = lustre_msg_buf(req->rq_repmsg, 0);
97 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98 req->rq_status = obd_open(&conn, &repbody->oa);
102 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
104 struct obd_conn conn;
105 struct ost_body *body, *repbody;
106 int rc, size = sizeof(*body);
109 body = lustre_msg_buf(req->rq_reqmsg, 0);
110 conn.oc_id = body->connid;
111 conn.oc_dev = ost->ost_tgt;
113 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117 repbody = lustre_msg_buf(req->rq_repmsg, 0);
118 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119 req->rq_status = obd_close(&conn, &repbody->oa);
123 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
125 struct obd_conn conn;
126 struct ost_body *body, *repbody;
127 int rc, size = sizeof(*body);
130 body = lustre_msg_buf(req->rq_reqmsg, 0);
131 conn.oc_id = body->connid;
132 conn.oc_dev = ost->ost_tgt;
134 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
138 repbody = lustre_msg_buf(req->rq_repmsg, 0);
139 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
140 req->rq_status = obd_create(&conn, &repbody->oa);
144 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
146 struct obd_conn conn;
147 struct ost_body *body, *repbody;
148 int rc, size = sizeof(*body);
151 body = lustre_msg_buf(req->rq_reqmsg, 0);
152 conn.oc_id = body->connid;
153 conn.oc_dev = ost->ost_tgt;
155 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
159 repbody = lustre_msg_buf(req->rq_repmsg, 0);
160 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
161 req->rq_status = obd_punch(&conn, &repbody->oa,
162 repbody->oa.o_size, repbody->oa.o_blocks);
166 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
168 struct obd_conn conn;
169 struct ost_body *body, *repbody;
170 int rc, size = sizeof(*body);
173 body = lustre_msg_buf(req->rq_reqmsg, 0);
174 conn.oc_id = body->connid;
175 conn.oc_dev = ost->ost_tgt;
177 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
181 repbody = lustre_msg_buf(req->rq_repmsg, 0);
182 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183 req->rq_status = obd_setattr(&conn, &repbody->oa);
187 static int ost_connect(struct ptlrpc_request *req)
189 struct obd_conn conn;
190 struct ost_body *body;
193 int rc, size = sizeof(*body), i;
196 uuid = lustre_msg_buf(req->rq_reqmsg, 0);
197 if (req->rq_reqmsg->buflens[0] > 37) {
199 req->rq_status = -EINVAL;
203 i = obd_class_name2dev(uuid);
205 req->rq_status = -ENODEV;
209 ost = &(obd_dev[i].u.ost);
210 conn.oc_dev = ost->ost_tgt;
212 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
216 req->rq_repmsg->target_id = i;
217 req->rq_status = obd_connect(&conn);
219 CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
220 body = lustre_msg_buf(req->rq_repmsg, 0);
221 body->connid = conn.oc_id;
225 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
227 struct obd_conn conn;
228 struct ost_body *body;
232 body = lustre_msg_buf(req->rq_reqmsg, 0);
233 conn.oc_id = body->connid;
234 conn.oc_dev = ost->ost_tgt;
236 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
240 CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
241 req->rq_status = obd_disconnect(&conn);
245 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
247 struct obd_conn conn;
248 struct ost_body *body;
249 int rc, size[2] = {sizeof(*body)};
250 char *bufs[2] = {NULL, NULL}, *ptr;
253 body = lustre_msg_buf(req->rq_reqmsg, 0);
254 conn.oc_id = body->connid;
255 conn.oc_dev = ost->ost_tgt;
257 ptr = lustre_msg_buf(req->rq_reqmsg, 1);
261 req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
262 &(size[1]), (void **)&(bufs[1]));
264 rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
266 CERROR("cannot pack reply\n");
271 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
273 struct ptlrpc_bulk_desc *desc;
274 struct obd_conn conn;
275 void *tmp1, *tmp2, *end2;
276 struct niobuf_remote *remote_nb;
277 struct niobuf_local *local_nb = NULL;
278 struct obd_ioobj *ioo;
279 struct ost_body *body;
280 int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
283 body = lustre_msg_buf(req->rq_reqmsg, 0);
284 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
285 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
286 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
287 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
288 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
291 conn.oc_id = body->connid;
292 conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
294 for (i = 0; i < objcount; i++) {
295 ost_unpack_ioo(&tmp1, &ioo);
296 if (tmp2 + ioo->ioo_bufcnt > end2) {
298 GOTO(out, rc = -EFAULT);
300 for (j = 0; j < ioo->ioo_bufcnt; j++)
301 ost_unpack_niobuf(&tmp2, &remote_nb);
304 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
307 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
308 if (local_nb == NULL)
311 /* The unpackers move tmp1 and tmp2, so reset them before using */
312 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
313 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
314 req->rq_status = obd_preprw(cmd, &conn, objcount,
315 tmp1, niocount, tmp2, local_nb, NULL);
320 desc = ptlrpc_prep_bulk(req->rq_connection);
322 GOTO(out_local, rc = -ENOMEM);
323 desc->b_portal = OST_BULK_PORTAL;
325 for (i = 0; i < niocount; i++) {
326 struct ptlrpc_bulk_page *bulk;
327 bulk = ptlrpc_prep_bulk_page(desc);
329 GOTO(out_bulk, rc = -ENOMEM);
330 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
331 bulk->b_xid = remote_nb->xid;
332 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
333 bulk->b_buflen = PAGE_SIZE;
336 rc = ptlrpc_send_bulk(desc);
340 wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
341 if (desc->b_flags & PTL_RPC_FL_INTR)
344 /* The unpackers move tmp1 and tmp2, so reset them before using */
345 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
346 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
347 req->rq_status = obd_commitrw(cmd, &conn, objcount,
348 tmp1, niocount, local_nb, NULL);
351 ptlrpc_free_bulk(desc);
353 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
356 ptlrpc_error(obddev->ost_service, req);
358 ptlrpc_reply(obddev->ost_service, req);
362 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
364 struct ptlrpc_bulk_desc *desc;
365 struct obd_conn conn;
366 struct niobuf_remote *remote_nb;
367 struct niobuf_local *local_nb, *lnb;
368 struct obd_ioobj *ioo;
369 struct ost_body *body;
370 int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
371 void *tmp1, *tmp2, *end2;
372 void *desc_priv = NULL;
376 body = lustre_msg_buf(req->rq_reqmsg, 0);
377 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
378 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
379 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
380 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
381 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
384 conn.oc_id = body->connid;
385 conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
387 for (i = 0; i < objcount; i++) {
388 ost_unpack_ioo((void *)&tmp1, &ioo);
389 if (tmp2 + ioo->ioo_bufcnt > end2) {
393 for (j = 0; j < ioo->ioo_bufcnt; j++)
394 ost_unpack_niobuf((void *)&tmp2, &remote_nb);
397 size[1] = niocount * sizeof(*remote_nb);
398 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
401 remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
403 OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
404 if (local_nb == NULL)
405 GOTO(out, rc = -ENOMEM);
407 /* The unpackers move tmp1 and tmp2, so reset them before using */
408 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
409 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
410 req->rq_status = obd_preprw(cmd, &conn, objcount,
411 tmp1, niocount, tmp2, local_nb, &desc_priv);
413 GOTO(out_free, rc = 0); /* XXX is this correct? */
415 desc = ptlrpc_prep_bulk(req->rq_connection);
417 GOTO(fail_preprw, rc = -ENOMEM);
419 desc->b_portal = OSC_BULK_PORTAL;
420 desc->b_desc_private = desc_priv;
421 memcpy(&(desc->b_conn), &conn, sizeof(conn));
423 for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
424 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
425 struct ptlrpc_bulk_page *bulk;
427 bulk = ptlrpc_prep_bulk_page(desc);
429 GOTO(fail_bulk, rc = -ENOMEM);
431 spin_lock(&srv->srv_lock);
432 bulk->b_xid = srv->srv_xid++;
433 spin_unlock(&srv->srv_lock);
435 bulk->b_buf = lnb->addr;
436 bulk->b_page = lnb->page;
437 bulk->b_flags = lnb->flags;
438 bulk->b_dentry = lnb->dentry;
439 bulk->b_buflen = PAGE_SIZE;
442 /* this advances remote_nb */
443 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
447 rc = ptlrpc_register_bulk(desc);
452 ptlrpc_reply(obddev->ost_service, req);
454 wait_event_interruptible(desc->b_waitq,
455 desc->b_flags & PTL_BULK_FL_RCVD);
457 rc = obd_commitrw(cmd, &conn, objcount, tmp1, niocount, local_nb,
458 desc->b_desc_private);
459 ptlrpc_free_bulk(desc);
462 OBD_FREE(local_nb, niocount * sizeof(*local_nb));
466 ptlrpc_error(obddev->ost_service, req);
468 ptlrpc_reply(obddev->ost_service, req);
473 ptlrpc_free_bulk(desc);
475 /* FIXME: how do we undo the preprw? */
479 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
481 struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
483 if (body->data == OBD_BRW_READ)
484 return ost_brw_read(obddev, req);
486 return ost_brw_write(obddev, req);
489 static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
490 struct ptlrpc_request *req)
496 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
497 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
498 CERROR("lustre_mds: Invalid request\n");
502 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
503 CERROR("lustre_mds: wrong packet type sent %d\n",
504 req->rq_reqmsg->type);
505 GOTO(out, rc = -EINVAL);
508 if (req->rq_reqmsg->opc != OST_CONNECT) {
509 int id = req->rq_reqmsg->target_id;
510 struct obd_device *obddev;
511 if (id < 0 || id > MAX_OBD_DEVICES)
512 GOTO(out, rc = -ENODEV);
513 obddev = &obd_dev[id];
514 if (strcmp(obddev->obd_type->typ_name, "ost") != 0)
515 GOTO(out, rc = -EINVAL);
516 ost = &obddev->u.ost;
517 req->rq_obd = obddev;
520 switch (req->rq_reqmsg->opc) {
522 CDEBUG(D_INODE, "connect\n");
523 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
524 rc = ost_connect(req);
527 CDEBUG(D_INODE, "disconnect\n");
528 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
529 rc = ost_disconnect(ost, req);
532 CDEBUG(D_INODE, "get_info\n");
533 OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
534 rc = ost_get_info(ost, req);
537 CDEBUG(D_INODE, "create\n");
538 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
539 rc = ost_create(ost, req);
542 CDEBUG(D_INODE, "destroy\n");
543 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
544 rc = ost_destroy(ost, req);
547 CDEBUG(D_INODE, "getattr\n");
548 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
549 rc = ost_getattr(ost, req);
552 CDEBUG(D_INODE, "setattr\n");
553 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
554 rc = ost_setattr(ost, req);
557 CDEBUG(D_INODE, "setattr\n");
558 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
559 rc = ost_open(ost, req);
562 CDEBUG(D_INODE, "setattr\n");
563 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
564 rc = ost_close(ost, req);
567 CDEBUG(D_INODE, "brw\n");
568 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
569 rc = ost_brw(ost, req);
570 /* ost_brw sends its own replies */
573 CDEBUG(D_INODE, "punch\n");
574 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
575 rc = ost_punch(ost, req);
579 CDEBUG(D_INODE, "statfs\n");
580 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
581 rc = ost_statfs(ost, req);
585 req->rq_status = -ENOTSUPP;
586 rc = ptlrpc_error(svc, req);
592 //req->rq_status = rc;
594 CERROR("ost: processing error %d\n", rc);
595 ptlrpc_error(svc, req);
597 CDEBUG(D_INODE, "sending reply\n");
598 ptlrpc_reply(svc, req);
604 /* mount the file system (secretly) */
605 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
607 struct obd_ioctl_data* data = buf;
608 struct ost_obd *ost = &obddev->u.ost;
609 struct obd_device *tgt;
613 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
617 tgt = &obd_dev[data->ioc_dev];
619 if (!(tgt->obd_flags & OBD_ATTACHED) ||
620 !(tgt->obd_flags & OBD_SET_UP)) {
621 CERROR("device not attached or not set up (%d)\n",
623 GOTO(error_dec, err = -EINVAL);
626 ost->ost_conn.oc_dev = tgt;
627 err = obd_connect(&ost->ost_conn);
629 CERROR("fail to connect to device %d\n", data->ioc_dev);
630 GOTO(error_dec, err = -EINVAL);
633 obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
634 if (obddev->obd_namespace == NULL)
637 ost->ost_service = ptlrpc_init_svc(128 * 1024,
638 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
640 if (!ost->ost_service) {
641 CERROR("failed to start service\n");
642 GOTO(error_disc, err = -EINVAL);
645 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
647 GOTO(error_disc, err = -EINVAL);
648 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
650 GOTO(error_disc, err = -EINVAL);
651 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
653 GOTO(error_disc, err = -EINVAL);
654 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
656 GOTO(error_disc, err = -EINVAL);
657 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
659 GOTO(error_disc, err = -EINVAL);
660 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
662 GOTO(error_disc, err = -EINVAL);
667 obd_disconnect(&ost->ost_conn);
673 static int ost_cleanup(struct obd_device * obddev)
675 struct ost_obd *ost = &obddev->u.ost;
680 if ( !list_empty(&obddev->obd_gen_clients) ) {
681 CERROR("still has clients!\n");
685 ptlrpc_stop_all_threads(ost->ost_service);
686 rpc_unregister_service(ost->ost_service);
688 if (!list_empty(&ost->ost_service->srv_reqs)) {
689 // XXX reply with errors and clean up
690 CERROR("Request list not empty!\n");
692 OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
694 err = obd_disconnect(&ost->ost_conn);
696 CERROR("lustre ost: fail to disconnect device\n");
700 ldlm_namespace_free(obddev->obd_namespace);
706 /* use obd ops to offer management infrastructure */
707 static struct obd_ops ost_obd_ops = {
709 o_cleanup: ost_cleanup,
712 static int __init ost_init(void)
714 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
718 static void __exit ost_exit(void)
720 obd_unregister_type(LUSTRE_OST_NAME);
723 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
724 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
725 MODULE_LICENSE("GPL");
727 module_init(ost_init);
728 module_exit(ost_exit);