1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
47 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
48 struct ptlrpc_connection **connection)
50 struct osc_obd *osc = &conn->oc_dev->u.osc;
51 *cl = osc->osc_client;
52 *connection = osc->osc_conn;
55 static int osc_connect(struct obd_conn *conn)
57 struct ptlrpc_request *request;
58 struct ptlrpc_client *cl;
59 struct ptlrpc_connection *connection;
60 struct ost_body *body;
61 int rc, size = sizeof(*body);
64 osc_con2cl(conn, &cl, &connection);
65 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
69 request->rq_replen = lustre_msg_size(1, &size);
71 rc = ptlrpc_queue_wait(request);
75 body = lustre_msg_buf(request->rq_repmsg, 0);
76 CDEBUG(D_INODE, "received connid %d\n", body->connid);
78 conn->oc_id = body->connid;
81 ptlrpc_free_req(request);
85 static int osc_disconnect(struct obd_conn *conn)
87 struct ptlrpc_request *request;
88 struct ptlrpc_client *cl;
89 struct ptlrpc_connection *connection;
90 struct ost_body *body;
91 int rc, size = sizeof(*body);
94 osc_con2cl(conn, &cl, &connection);
95 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
99 body = lustre_msg_buf(request->rq_reqmsg, 0);
100 body->connid = conn->oc_id;
102 request->rq_replen = lustre_msg_size(1, &size);
104 rc = ptlrpc_queue_wait(request);
107 ptlrpc_free_req(request);
111 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
113 struct ptlrpc_request *request;
114 struct ptlrpc_client *cl;
115 struct ptlrpc_connection *connection;
116 struct ost_body *body;
117 int rc, size = sizeof(*body);
120 osc_con2cl(conn, &cl, &connection);
121 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
125 body = lustre_msg_buf(request->rq_reqmsg, 0);
126 memcpy(&body->oa, oa, sizeof(*oa));
127 body->connid = conn->oc_id;
128 body->oa.o_valid = ~0;
130 request->rq_replen = lustre_msg_size(1, &size);
132 rc = ptlrpc_queue_wait(request);
136 body = lustre_msg_buf(request->rq_repmsg, 0);
137 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139 memcpy(oa, &body->oa, sizeof(*oa));
143 ptlrpc_free_req(request);
147 static int osc_open(struct obd_conn *conn, struct obdo *oa)
149 struct ptlrpc_request *request;
150 struct ptlrpc_client *cl;
151 struct ptlrpc_connection *connection;
152 struct ost_body *body;
153 int rc, size = sizeof(*body);
156 osc_con2cl(conn, &cl, &connection);
157 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
161 body = lustre_msg_buf(request->rq_reqmsg, 0);
162 memcpy(&body->oa, oa, sizeof(*oa));
163 body->connid = conn->oc_id;
164 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
167 request->rq_replen = lustre_msg_size(1, &size);
169 rc = ptlrpc_queue_wait(request);
173 body = lustre_msg_buf(request->rq_repmsg, 0);
174 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
176 memcpy(oa, &body->oa, sizeof(*oa));
180 ptlrpc_free_req(request);
184 static int osc_close(struct obd_conn *conn, struct obdo *oa)
186 struct ptlrpc_request *request;
187 struct ptlrpc_client *cl;
188 struct ptlrpc_connection *connection;
189 struct ost_body *body;
190 int rc, size = sizeof(*body);
193 osc_con2cl(conn, &cl, &connection);
194 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
198 body = lustre_msg_buf(request->rq_reqmsg, 0);
199 memcpy(&body->oa, oa, sizeof(*oa));
200 body->connid = conn->oc_id;
202 request->rq_replen = lustre_msg_size(1, &size);
204 rc = ptlrpc_queue_wait(request);
208 body = lustre_msg_buf(request->rq_repmsg, 0);
209 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211 memcpy(oa, &body->oa, sizeof(*oa));
215 ptlrpc_free_req(request);
219 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
221 struct ptlrpc_request *request;
222 struct ptlrpc_client *cl;
223 struct ptlrpc_connection *connection;
224 struct ost_body *body;
225 int rc, size = sizeof(*body);
228 osc_con2cl(conn, &cl, &connection);
229 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
233 body = lustre_msg_buf(request->rq_reqmsg, 0);
234 memcpy(&body->oa, oa, sizeof(*oa));
235 body->connid = conn->oc_id;
237 request->rq_replen = lustre_msg_size(1, &size);
239 rc = ptlrpc_queue_wait(request);
243 ptlrpc_free_req(request);
247 static int osc_create(struct obd_conn *conn, struct obdo *oa)
249 struct ptlrpc_request *request;
250 struct ptlrpc_client *cl;
251 struct ptlrpc_connection *connection;
252 struct ost_body *body;
253 int rc, size = sizeof(*body);
260 osc_con2cl(conn, &cl, &connection);
261 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
265 body = lustre_msg_buf(request->rq_reqmsg, 0);
266 memcpy(&body->oa, oa, sizeof(*oa));
267 body->oa.o_valid = ~0;
268 body->connid = conn->oc_id;
270 request->rq_replen = lustre_msg_size(1, &size);
272 rc = ptlrpc_queue_wait(request);
276 body = lustre_msg_buf(request->rq_repmsg, 0);
277 memcpy(oa, &body->oa, sizeof(*oa));
281 ptlrpc_free_req(request);
285 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
288 struct ptlrpc_request *request;
289 struct ptlrpc_client *cl;
290 struct ptlrpc_connection *connection;
291 struct ost_body *body;
292 int rc, size = sizeof(*body);
299 osc_con2cl(conn, &cl, &connection);
300 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
304 body = lustre_msg_buf(request->rq_reqmsg, 0);
305 memcpy(&body->oa, oa, sizeof(*oa));
306 body->connid = conn->oc_id;
307 body->oa.o_valid = ~0;
308 body->oa.o_size = offset;
309 body->oa.o_blocks = count;
311 request->rq_replen = lustre_msg_size(1, &size);
313 rc = ptlrpc_queue_wait(request);
317 body = lustre_msg_buf(request->rq_repmsg, 0);
318 memcpy(oa, &body->oa, sizeof(*oa));
322 ptlrpc_free_req(request);
326 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
328 struct ptlrpc_request *request;
329 struct ptlrpc_client *cl;
330 struct ptlrpc_connection *connection;
331 struct ost_body *body;
332 int rc, size = sizeof(*body);
339 osc_con2cl(conn, &cl, &connection);
340 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
344 body = lustre_msg_buf(request->rq_reqmsg, 0);
345 memcpy(&body->oa, oa, sizeof(*oa));
346 body->connid = conn->oc_id;
347 body->oa.o_valid = ~0;
349 request->rq_replen = lustre_msg_size(1, &size);
351 rc = ptlrpc_queue_wait(request);
355 body = lustre_msg_buf(request->rq_repmsg, 0);
356 memcpy(oa, &body->oa, sizeof(*oa));
360 ptlrpc_free_req(request);
364 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
365 struct niobuf *dst, struct niobuf *src)
367 struct ptlrpc_client *cl;
368 struct ptlrpc_connection *connection;
370 osc_con2cl(conn, &cl, &connection);
374 memcpy((char *)(unsigned long)dst->addr,
375 (char *)(unsigned long)src->addr, src->len);
377 struct ptlrpc_bulk_desc *bulk;
380 bulk = ptlrpc_prep_bulk(connection);
384 bulk->b_buf = (void *)(unsigned long)src->addr;
385 bulk->b_buflen = src->len;
386 bulk->b_xid = dst->xid;
387 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
389 CERROR("send_bulk failed: %d\n", rc);
393 wait_event_interruptible(bulk->b_waitq,
394 ptlrpc_check_bulk_sent(bulk));
396 if (bulk->b_flags == PTL_RPC_INTR) {
398 /* FIXME: hey hey, we leak here. */
402 OBD_FREE(bulk, sizeof(*bulk));
408 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
409 obd_count *oa_bufs, struct page **buf, obd_size *count,
410 obd_off *offset, obd_flag *flags)
412 struct ptlrpc_client *cl;
413 struct ptlrpc_connection *connection;
414 struct ptlrpc_request *request;
415 struct ost_body *body;
416 struct obd_ioobj ioo;
418 int pages, rc, i, j, size[3] = {sizeof(*body)};
420 struct ptlrpc_bulk_desc **bulk;
423 size[1] = num_oa * sizeof(ioo);
425 for (i = 0; i < num_oa; i++)
427 size[2] = pages * sizeof(src);
429 OBD_ALLOC(bulk, pages * sizeof(*bulk));
433 osc_con2cl(conn, &cl, &connection);
434 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
436 GOTO(out, rc = -ENOMEM);
438 body = lustre_msg_buf(request->rq_reqmsg, 0);
439 body->data = OBD_BRW_READ;
441 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
442 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
443 for (pages = 0, i = 0; i < num_oa; i++) {
444 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
445 for (j = 0; j < oa_bufs[i]; j++, pages++) {
446 bulk[pages] = ptlrpc_prep_bulk(connection);
447 if (bulk[pages] == NULL)
448 GOTO(out, rc = -ENOMEM);
450 spin_lock(&connection->c_lock);
451 bulk[pages]->b_xid = ++connection->c_xid_out;
452 spin_unlock(&connection->c_lock);
454 bulk[pages]->b_buf = kmap(buf[pages]);
455 bulk[pages]->b_buflen = PAGE_SIZE;
456 bulk[pages]->b_portal = OST_BULK_PORTAL;
457 ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
458 offset[pages], count[pages],
459 flags[pages], bulk[pages]->b_xid);
461 rc = ptlrpc_register_bulk(bulk[pages]);
467 request->rq_replen = lustre_msg_size(1, size);
468 rc = ptlrpc_queue_wait(request);
472 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
473 * abort those bulk listeners. */
475 for (pages = 0, i = 0; i < num_oa; i++) {
476 for (j = 0; j < oa_bufs[i]; j++, pages++) {
477 if (bulk[pages] == NULL)
480 OBD_FREE(bulk[pages], sizeof(**bulk));
484 OBD_FREE(bulk, pages * sizeof(*bulk));
485 ptlrpc_free_req(request);
489 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
490 obd_count *oa_bufs, struct page **buf, obd_size *count,
491 obd_off *offset, obd_flag *flags)
493 struct ptlrpc_client *cl;
494 struct ptlrpc_connection *connection;
495 struct ptlrpc_request *request;
496 struct obd_ioobj ioo;
497 struct ost_body *body;
499 int pages, rc, i, j, size[3] = {sizeof(*body)};
503 size[1] = num_oa * sizeof(ioo);
505 for (i = 0; i < num_oa; i++)
507 size[2] = pages * sizeof(*src);
509 OBD_ALLOC(src, size[2]);
513 osc_con2cl(conn, &cl, &connection);
514 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
517 body = lustre_msg_buf(request->rq_reqmsg, 0);
518 body->data = OBD_BRW_WRITE;
520 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
521 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
522 for (pages = 0, i = 0; i < num_oa; i++) {
523 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
524 for (j = 0; j < oa_bufs[i]; j++, pages++) {
525 ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
526 count[pages], flags[pages], 0);
529 memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
531 size[1] = pages * sizeof(struct niobuf);
532 request->rq_replen = lustre_msg_size(2, size);
534 rc = ptlrpc_queue_wait(request);
538 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
540 GOTO(out, rc = -EINVAL);
542 if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
543 CERROR("buffer length wrong (%d vs. %d)\n",
544 request->rq_repmsg->buflens[1],
545 pages * sizeof(struct niobuf));
546 GOTO(out, rc = -EINVAL);
549 for (pages = 0, i = 0; i < num_oa; i++) {
550 for (j = 0; j < oa_bufs[i]; j++, pages++) {
552 ost_unpack_niobuf(&ptr2, &dst);
553 osc_sendpage(conn, request, dst, &src[pages]);
556 OBD_FREE(src, size[2]);
558 for (pages = 0, i = 0; i < num_oa; i++)
559 for (j = 0; j < oa_bufs[i]; j++, pages++)
562 ptlrpc_free_req(request);
566 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
567 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
568 obd_size *count, obd_off *offset, obd_flag *flags)
570 if (rw == OBD_BRW_READ)
571 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
574 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
578 /* mount the file system (secretly) */
579 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
581 struct osc_obd *osc = &obddev->u.osc;
584 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
585 if (osc->osc_client == NULL)
588 ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
591 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
599 static int osc_cleanup(struct obd_device * obddev)
601 struct osc_obd *osc = &obddev->u.osc;
603 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
604 ptlrpc_put_connection(osc->osc_conn);
610 struct obd_ops osc_obd_ops = {
612 o_cleanup: osc_cleanup,
613 o_create: osc_create,
614 o_destroy: osc_destroy,
615 o_getattr: osc_getattr,
616 o_setattr: osc_setattr,
619 o_connect: osc_connect,
620 o_disconnect: osc_disconnect,
625 static int __init osc_init(void)
627 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
631 static void __exit osc_exit(void)
633 obd_unregister_type(LUSTRE_OSC_NAME);
636 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
637 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
638 MODULE_LICENSE("GPL");
640 module_init(osc_init);
641 module_exit(osc_exit);