1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Peter J. Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * Storage Target Handling functions
24 * Lustre Object Server Module (OST)
26 * This server is single threaded at present (but can easily be multi
27 * threaded). For testing and management it is treated as an
28 * obd_device, although it does not export a full OBD method table
29 * (the requests are coming in over the wire, so object target
30 * modules do not have a full method table.)
34 #define DEBUG_SUBSYSTEM S_OST
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
41 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
44 struct ost_body *body;
45 int rc, size = sizeof(*body);
48 body = lustre_msg_buf(req->rq_reqmsg, 0);
49 conn.oc_id = body->connid;
50 conn.oc_dev = ost->ost_tgt;
52 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
56 req->rq_status = obd_destroy(&conn, &body->oa);
60 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
63 struct ost_body *body, *repbody;
64 int rc, size = sizeof(*body);
67 body = lustre_msg_buf(req->rq_reqmsg, 0);
68 conn.oc_id = body->connid;
69 conn.oc_dev = ost->ost_tgt;
71 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
75 repbody = lustre_msg_buf(req->rq_repmsg, 0);
76 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
77 req->rq_status = obd_getattr(&conn, &repbody->oa);
81 static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
84 struct ost_body *body, *repbody;
85 int rc, size = sizeof(*body);
88 body = lustre_msg_buf(req->rq_reqmsg, 0);
89 conn.oc_id = body->connid;
90 conn.oc_dev = ost->ost_tgt;
92 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
96 repbody = lustre_msg_buf(req->rq_repmsg, 0);
97 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98 req->rq_status = obd_open(&conn, &repbody->oa);
102 static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
104 struct obd_conn conn;
105 struct ost_body *body, *repbody;
106 int rc, size = sizeof(*body);
109 body = lustre_msg_buf(req->rq_reqmsg, 0);
110 conn.oc_id = body->connid;
111 conn.oc_dev = ost->ost_tgt;
113 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117 repbody = lustre_msg_buf(req->rq_repmsg, 0);
118 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119 req->rq_status = obd_close(&conn, &repbody->oa);
123 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
125 struct obd_conn conn;
126 struct ost_body *body, *repbody;
127 int rc, size = sizeof(*body);
130 body = lustre_msg_buf(req->rq_reqmsg, 0);
131 conn.oc_id = body->connid;
132 conn.oc_dev = ost->ost_tgt;
134 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
138 repbody = lustre_msg_buf(req->rq_repmsg, 0);
139 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
140 req->rq_status = obd_create(&conn, &repbody->oa);
144 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
146 struct obd_conn conn;
147 struct ost_body *body, *repbody;
148 int rc, size = sizeof(*body);
151 body = lustre_msg_buf(req->rq_reqmsg, 0);
152 conn.oc_id = body->connid;
153 conn.oc_dev = ost->ost_tgt;
155 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
159 repbody = lustre_msg_buf(req->rq_repmsg, 0);
160 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
161 req->rq_status = obd_punch(&conn, &repbody->oa,
162 repbody->oa.o_size, repbody->oa.o_blocks);
166 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
168 struct obd_conn conn;
169 struct ost_body *body, *repbody;
170 int rc, size = sizeof(*body);
173 body = lustre_msg_buf(req->rq_reqmsg, 0);
174 conn.oc_id = body->connid;
175 conn.oc_dev = ost->ost_tgt;
177 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
181 repbody = lustre_msg_buf(req->rq_repmsg, 0);
182 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
183 req->rq_status = obd_setattr(&conn, &repbody->oa);
187 static int ost_connect(struct ptlrpc_request *req)
189 struct obd_conn conn;
190 struct ost_body *body;
193 int rc, size = sizeof(*body), i;
196 uuid = lustre_msg_buf(req->rq_reqmsg, 0);
197 if (req->rq_reqmsg->buflens[0] > 37) {
199 req->rq_status = -EINVAL;
203 i = obd_class_name2dev(uuid);
205 req->rq_status = -ENODEV;
209 ost = &(obd_dev[i].u.ost);
210 conn.oc_dev = ost->ost_tgt;
212 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
216 req->rq_repmsg->target_id = i;
217 req->rq_status = obd_connect(&conn);
219 CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
220 body = lustre_msg_buf(req->rq_repmsg, 0);
221 body->connid = conn.oc_id;
225 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
227 struct obd_conn conn;
228 struct ost_body *body;
232 body = lustre_msg_buf(req->rq_reqmsg, 0);
233 conn.oc_id = body->connid;
234 conn.oc_dev = ost->ost_tgt;
236 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
240 CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
241 req->rq_status = obd_disconnect(&conn);
245 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
247 struct obd_conn conn;
248 struct ost_body *body;
249 int rc, size[2] = {sizeof(*body)};
250 char *bufs[2] = {NULL, NULL}, *ptr;
253 body = lustre_msg_buf(req->rq_reqmsg, 0);
254 conn.oc_id = body->connid;
255 conn.oc_dev = ost->ost_tgt;
257 ptr = lustre_msg_buf(req->rq_reqmsg, 1);
261 req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
262 &(size[1]), (void **)&(bufs[1]));
264 rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
266 CERROR("cannot pack reply\n");
271 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
273 struct ptlrpc_bulk_desc *desc;
274 struct obd_conn conn;
275 void *tmp1, *tmp2, *end2;
276 struct niobuf_remote *remote_nb;
277 struct niobuf_local *local_nb = NULL;
278 struct obd_ioobj *ioo;
279 struct ost_body *body;
280 int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
283 body = lustre_msg_buf(req->rq_reqmsg, 0);
284 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
285 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
286 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
287 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
288 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
291 conn.oc_id = body->connid;
292 conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
294 for (i = 0; i < objcount; i++) {
295 ost_unpack_ioo(&tmp1, &ioo);
296 if (tmp2 + ioo->ioo_bufcnt > end2) {
298 GOTO(out, rc = -EFAULT);
300 for (j = 0; j < ioo->ioo_bufcnt; j++)
301 ost_unpack_niobuf(&tmp2, &remote_nb);
304 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
307 OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
308 if (local_nb == NULL)
311 /* The unpackers move tmp1 and tmp2, so reset them before using */
312 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
313 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
314 req->rq_status = obd_preprw(cmd, &conn, objcount,
315 tmp1, niocount, tmp2, local_nb, NULL);
320 desc = ptlrpc_prep_bulk(req->rq_connection);
322 GOTO(out_local, rc = -ENOMEM);
323 desc->b_portal = OST_BULK_PORTAL;
325 for (i = 0; i < niocount; i++) {
326 struct ptlrpc_bulk_page *bulk;
327 bulk = ptlrpc_prep_bulk_page(desc);
329 GOTO(out_bulk, rc = -ENOMEM);
330 remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
331 bulk->b_xid = remote_nb->xid;
332 bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
333 bulk->b_buflen = PAGE_SIZE;
336 rc = ptlrpc_send_bulk(desc);
340 wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
341 if (desc->b_flags & PTL_RPC_FL_INTR)
344 /* The unpackers move tmp1 and tmp2, so reset them before using */
345 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
346 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
347 req->rq_status = obd_commitrw(cmd, &conn, objcount,
348 tmp1, niocount, local_nb, NULL);
351 ptlrpc_free_bulk(desc);
353 OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
358 static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
360 struct obd_ioobj obj;
361 struct niobuf_local lnb;
365 memset(&lnb, 0, sizeof(lnb));
366 memset(&obj, 0, sizeof(obj));
368 lnb.page = bulk->b_page;
369 lnb.dentry = bulk->b_dentry;
370 lnb.flags = bulk->b_flags;
373 rc = obd_commitrw(OBD_BRW_WRITE, &bulk->b_desc->b_conn, 1, &obj, 1,
374 &lnb, bulk->b_desc->b_desc_private);
376 CERROR("ost_commit_page failed: %d\n", rc);
381 static void ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc, void *data)
384 ptlrpc_free_bulk(desc);
388 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
390 struct ptlrpc_bulk_desc *desc;
391 struct obd_conn conn;
392 struct niobuf_remote *remote_nb;
393 struct niobuf_local *local_nb, *lnb;
394 struct obd_ioobj *ioo;
395 struct ost_body *body;
396 int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
397 void *tmp1, *tmp2, *end2;
398 void *desc_priv = NULL;
401 body = lustre_msg_buf(req->rq_reqmsg, 0);
402 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
403 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
404 end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
405 objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
406 niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
409 conn.oc_id = body->connid;
410 conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
412 for (i = 0; i < objcount; i++) {
413 ost_unpack_ioo((void *)&tmp1, &ioo);
414 if (tmp2 + ioo->ioo_bufcnt > end2) {
418 for (j = 0; j < ioo->ioo_bufcnt; j++)
419 ost_unpack_niobuf((void *)&tmp2, &remote_nb);
422 size[1] = niocount * sizeof(*remote_nb);
423 rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
426 remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
428 OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
429 if (local_nb == NULL)
430 GOTO(out, rc = -ENOMEM);
432 /* The unpackers move tmp1 and tmp2, so reset them before using */
433 tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
434 tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
435 req->rq_status = obd_preprw(cmd, &conn, objcount,
436 tmp1, niocount, tmp2, local_nb, &desc_priv);
438 GOTO(out_free, rc = 0); /* XXX is this correct? */
440 desc = ptlrpc_prep_bulk(req->rq_connection);
442 GOTO(fail_preprw, rc = -ENOMEM);
443 desc->b_cb = ost_brw_write_finished_cb;
444 desc->b_portal = OSC_BULK_PORTAL;
445 desc->b_desc_private = desc_priv;
446 memcpy(&(desc->b_conn), &conn, sizeof(conn));
448 for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
449 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
450 struct ptlrpc_bulk_page *bulk;
452 bulk = ptlrpc_prep_bulk_page(desc);
454 GOTO(fail_bulk, rc = -ENOMEM);
456 spin_lock(&srv->srv_lock);
457 bulk->b_xid = srv->srv_xid++;
458 spin_unlock(&srv->srv_lock);
460 bulk->b_buf = lnb->addr;
461 bulk->b_page = lnb->page;
462 bulk->b_flags = lnb->flags;
463 bulk->b_dentry = lnb->dentry;
464 bulk->b_buflen = PAGE_SIZE;
465 bulk->b_cb = ost_brw_write_cb;
467 /* this advances remote_nb */
468 ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
472 rc = ptlrpc_register_bulk(desc);
478 OBD_FREE(local_nb, niocount * sizeof(*local_nb));
483 ptlrpc_free_bulk(desc);
485 /* FIXME: how do we undo the preprw? */
489 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
491 struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
493 if (body->data == OBD_BRW_READ)
494 return ost_brw_read(obddev, req);
496 return ost_brw_write(obddev, req);
499 static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
500 struct ptlrpc_request *req)
506 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
507 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
508 CERROR("lustre_mds: Invalid request\n");
512 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
513 CERROR("lustre_mds: wrong packet type sent %d\n",
514 req->rq_reqmsg->type);
515 GOTO(out, rc = -EINVAL);
518 if (req->rq_reqmsg->opc != OST_CONNECT) {
519 int id = req->rq_reqmsg->target_id;
520 struct obd_device *obddev;
521 if (id < 0 || id > MAX_OBD_DEVICES)
522 GOTO(out, rc = -ENODEV);
523 obddev = &obd_dev[id];
524 if (strcmp(obddev->obd_type->typ_name, "ost") != 0)
525 GOTO(out, rc = -EINVAL);
526 ost = &obddev->u.ost;
527 req->rq_obd = obddev;
530 switch (req->rq_reqmsg->opc) {
532 CDEBUG(D_INODE, "connect\n");
533 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
534 rc = ost_connect(req);
537 CDEBUG(D_INODE, "disconnect\n");
538 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
539 rc = ost_disconnect(ost, req);
542 CDEBUG(D_INODE, "get_info\n");
543 OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
544 rc = ost_get_info(ost, req);
547 CDEBUG(D_INODE, "create\n");
548 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
549 rc = ost_create(ost, req);
552 CDEBUG(D_INODE, "destroy\n");
553 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
554 rc = ost_destroy(ost, req);
557 CDEBUG(D_INODE, "getattr\n");
558 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
559 rc = ost_getattr(ost, req);
562 CDEBUG(D_INODE, "setattr\n");
563 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
564 rc = ost_setattr(ost, req);
567 CDEBUG(D_INODE, "setattr\n");
568 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
569 rc = ost_open(ost, req);
572 CDEBUG(D_INODE, "setattr\n");
573 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
574 rc = ost_close(ost, req);
577 CDEBUG(D_INODE, "brw\n");
578 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
579 rc = ost_brw(ost, req);
582 CDEBUG(D_INODE, "punch\n");
583 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
584 rc = ost_punch(ost, req);
588 CDEBUG(D_INODE, "statfs\n");
589 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
590 rc = ost_statfs(ost, req);
594 req->rq_status = -ENOTSUPP;
595 rc = ptlrpc_error(svc, req);
601 //req->rq_status = rc;
603 CERROR("ost: processing error %d\n", rc);
604 ptlrpc_error(svc, req);
606 CDEBUG(D_INODE, "sending reply\n");
607 ptlrpc_reply(svc, req);
613 /* mount the file system (secretly) */
614 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
616 struct obd_ioctl_data* data = buf;
617 struct ost_obd *ost = &obddev->u.ost;
618 struct obd_device *tgt;
622 if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
626 tgt = &obd_dev[data->ioc_dev];
628 if (!(tgt->obd_flags & OBD_ATTACHED) ||
629 !(tgt->obd_flags & OBD_SET_UP)) {
630 CERROR("device not attached or not set up (%d)\n",
632 GOTO(error_dec, err = -EINVAL);
635 ost->ost_conn.oc_dev = tgt;
636 err = obd_connect(&ost->ost_conn);
638 CERROR("fail to connect to device %d\n", data->ioc_dev);
639 GOTO(error_dec, err = -EINVAL);
642 obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
643 if (obddev->obd_namespace == NULL)
646 ost->ost_service = ptlrpc_init_svc(128 * 1024,
647 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
649 if (!ost->ost_service) {
650 CERROR("failed to start service\n");
651 GOTO(error_disc, err = -EINVAL);
654 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
656 GOTO(error_disc, err = -EINVAL);
657 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
659 GOTO(error_disc, err = -EINVAL);
664 obd_disconnect(&ost->ost_conn);
670 static int ost_cleanup(struct obd_device * obddev)
672 struct ost_obd *ost = &obddev->u.ost;
677 if ( !list_empty(&obddev->obd_gen_clients) ) {
678 CERROR("still has clients!\n");
682 ptlrpc_stop_all_threads(ost->ost_service);
683 rpc_unregister_service(ost->ost_service);
685 if (!list_empty(&ost->ost_service->srv_reqs)) {
686 // XXX reply with errors and clean up
687 CERROR("Request list not empty!\n");
689 OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
691 err = obd_disconnect(&ost->ost_conn);
693 CERROR("lustre ost: fail to disconnect device\n");
697 ldlm_namespace_free(obddev->obd_namespace);
703 /* use obd ops to offer management infrastructure */
704 static struct obd_ops ost_obd_ops = {
706 o_cleanup: ost_cleanup,
709 static int __init ost_init(void)
711 obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
715 static void __exit ost_exit(void)
717 obd_unregister_type(LUSTRE_OST_NAME);
720 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
721 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
722 MODULE_LICENSE("GPL");
724 module_init(ost_init);
725 module_exit(ost_exit);