1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
49 struct osc_obd *osc = &conn->oc_dev->u.osc;
50 return osc->osc_client;
53 static int osc_connect(struct obd_conn *conn)
55 struct ptlrpc_request *request;
56 struct ptlrpc_client *cl = osc_con2cl(conn);
57 struct ost_body *body;
58 int rc, size = sizeof(*body);
61 request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL);
65 request->rq_replen = lustre_msg_size(1, &size);
67 rc = ptlrpc_queue_wait(cl, request);
71 body = lustre_msg_buf(request->rq_repmsg, 0);
72 CDEBUG(D_INODE, "received connid %d\n", body->connid);
74 conn->oc_id = body->connid;
77 ptlrpc_free_req(request);
81 static int osc_disconnect(struct obd_conn *conn)
83 struct ptlrpc_request *request;
84 struct ptlrpc_client *cl = osc_con2cl(conn);
85 struct ost_body *body;
86 int rc, size = sizeof(*body);
89 request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL);
93 body = lustre_msg_buf(request->rq_reqmsg, 0);
94 body->connid = conn->oc_id;
96 request->rq_replen = lustre_msg_size(1, &size);
98 rc = ptlrpc_queue_wait(cl, request);
101 ptlrpc_free_req(request);
105 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
107 struct ptlrpc_request *request;
108 struct ptlrpc_client *cl = osc_con2cl(conn);
109 struct ost_body *body;
110 int rc, size = sizeof(*body);
113 request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL);
117 body = lustre_msg_buf(request->rq_reqmsg, 0);
118 memcpy(&body->oa, oa, sizeof(*oa));
119 body->connid = conn->oc_id;
120 body->oa.o_valid = ~0;
122 request->rq_replen = lustre_msg_size(1, &size);
124 rc = ptlrpc_queue_wait(cl, request);
128 body = lustre_msg_buf(request->rq_repmsg, 0);
129 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131 memcpy(oa, &body->oa, sizeof(*oa));
135 ptlrpc_free_req(request);
139 static int osc_open(struct obd_conn *conn, struct obdo *oa)
141 struct ptlrpc_request *request;
142 struct ptlrpc_client *cl = osc_con2cl(conn);
143 struct ost_body *body;
144 int rc, size = sizeof(*body);
147 request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL);
151 body = lustre_msg_buf(request->rq_reqmsg, 0);
152 memcpy(&body->oa, oa, sizeof(*oa));
153 body->connid = conn->oc_id;
154 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
157 request->rq_replen = lustre_msg_size(1, &size);
159 rc = ptlrpc_queue_wait(cl, request);
163 body = lustre_msg_buf(request->rq_repmsg, 0);
164 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
166 memcpy(oa, &body->oa, sizeof(*oa));
170 ptlrpc_free_req(request);
174 static int osc_close(struct obd_conn *conn, struct obdo *oa)
176 struct ptlrpc_request *request;
177 struct ptlrpc_client *cl = osc_con2cl(conn);
178 struct ost_body *body;
179 int rc, size = sizeof(*body);
182 request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL);
186 body = lustre_msg_buf(request->rq_reqmsg, 0);
187 memcpy(&body->oa, oa, sizeof(*oa));
188 body->connid = conn->oc_id;
190 request->rq_replen = lustre_msg_size(1, &size);
192 rc = ptlrpc_queue_wait(cl, request);
196 body = lustre_msg_buf(request->rq_repmsg, 0);
197 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 memcpy(oa, &body->oa, sizeof(*oa));
203 ptlrpc_free_req(request);
207 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
209 struct ptlrpc_request *request;
210 struct ptlrpc_client *cl = osc_con2cl(conn);
211 struct ost_body *body;
212 int rc, size = sizeof(*body);
215 request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL);
219 body = lustre_msg_buf(request->rq_reqmsg, 0);
220 memcpy(&body->oa, oa, sizeof(*oa));
221 body->connid = conn->oc_id;
223 request->rq_replen = lustre_msg_size(1, &size);
225 rc = ptlrpc_queue_wait(cl, request);
229 ptlrpc_free_req(request);
233 static int osc_create(struct obd_conn *conn, struct obdo *oa)
235 struct ptlrpc_request *request;
236 struct ptlrpc_client *cl = osc_con2cl(conn);
237 struct ost_body *body;
238 int rc, size = sizeof(*body);
245 request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL);
249 body = lustre_msg_buf(request->rq_reqmsg, 0);
250 memcpy(&body->oa, oa, sizeof(*oa));
251 body->oa.o_valid = ~0;
252 body->connid = conn->oc_id;
254 request->rq_replen = lustre_msg_size(1, &size);
256 rc = ptlrpc_queue_wait(cl, request);
260 body = lustre_msg_buf(request->rq_repmsg, 0);
261 memcpy(oa, &body->oa, sizeof(*oa));
265 ptlrpc_free_req(request);
269 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
272 struct ptlrpc_request *request;
273 struct ptlrpc_client *cl = osc_con2cl(conn);
274 struct ost_body *body;
275 int rc, size = sizeof(*body);
282 request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL);
286 body = lustre_msg_buf(request->rq_reqmsg, 0);
287 memcpy(&body->oa, oa, sizeof(*oa));
288 body->connid = conn->oc_id;
289 body->oa.o_valid = ~0;
290 body->oa.o_size = offset;
291 body->oa.o_blocks = count;
293 request->rq_replen = lustre_msg_size(1, &size);
295 rc = ptlrpc_queue_wait(cl, request);
299 body = lustre_msg_buf(request->rq_repmsg, 0);
300 memcpy(oa, &body->oa, sizeof(*oa));
304 ptlrpc_free_req(request);
308 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
310 struct ptlrpc_request *request;
311 struct ptlrpc_client *cl = osc_con2cl(conn);
312 struct ost_body *body;
313 int rc, size = sizeof(*body);
320 request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL);
324 body = lustre_msg_buf(request->rq_reqmsg, 0);
325 memcpy(&body->oa, oa, sizeof(*oa));
326 body->connid = conn->oc_id;
327 body->oa.o_valid = ~0;
329 request->rq_replen = lustre_msg_size(1, &size);
331 rc = ptlrpc_queue_wait(cl, request);
335 body = lustre_msg_buf(request->rq_repmsg, 0);
336 memcpy(oa, &body->oa, sizeof(*oa));
340 ptlrpc_free_req(request);
344 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
345 struct niobuf *dst, struct niobuf *src)
347 struct ptlrpc_client *cl = osc_con2cl(conn);
351 memcpy((char *)(unsigned long)dst->addr,
352 (char *)(unsigned long)src->addr, src->len);
354 struct ptlrpc_bulk_desc *bulk;
357 bulk = ptlrpc_prep_bulk(&cl->cli_server);
361 bulk->b_buf = (void *)(unsigned long)src->addr;
362 bulk->b_buflen = src->len;
363 bulk->b_xid = dst->xid;
364 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
366 CERROR("send_bulk failed: %d\n", rc);
370 wait_event_interruptible(bulk->b_waitq,
371 ptlrpc_check_bulk_sent(bulk));
373 if (bulk->b_flags == PTL_RPC_INTR) {
375 /* FIXME: hey hey, we leak here. */
379 OBD_FREE(bulk, sizeof(*bulk));
385 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
386 obd_count *oa_bufs, struct page **buf, obd_size *count,
387 obd_off *offset, obd_flag *flags)
389 struct ptlrpc_client *cl = osc_con2cl(conn);
390 struct ptlrpc_request *request;
391 struct ost_body *body;
392 struct obd_ioobj ioo;
394 int pages, rc, i, j, size[3] = {sizeof(*body)};
396 struct ptlrpc_bulk_desc **bulk;
399 size[1] = num_oa * sizeof(ioo);
401 for (i = 0; i < num_oa; i++)
403 size[2] = pages * sizeof(src);
405 OBD_ALLOC(bulk, pages * sizeof(*bulk));
409 request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
411 GOTO(out, rc = -ENOMEM);
413 body = lustre_msg_buf(request->rq_reqmsg, 0);
414 body->data = OBD_BRW_READ;
416 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
417 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
418 for (pages = 0, i = 0; i < num_oa; i++) {
419 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
420 for (j = 0; j < oa_bufs[i]; j++, pages++) {
421 bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server);
422 if (bulk[pages] == NULL)
423 GOTO(out, rc = -ENOMEM);
425 spin_lock(&cl->cli_lock);
426 bulk[pages]->b_xid = cl->cli_xid++;
427 spin_unlock(&cl->cli_lock);
429 bulk[pages]->b_buf = kmap(buf[pages]);
430 bulk[pages]->b_buflen = PAGE_SIZE;
431 bulk[pages]->b_portal = OST_BULK_PORTAL;
432 ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
433 offset[pages], count[pages],
434 flags[pages], bulk[pages]->b_xid);
436 rc = ptlrpc_register_bulk(bulk[pages]);
442 request->rq_replen = lustre_msg_size(1, size);
443 rc = ptlrpc_queue_wait(cl, request);
447 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
448 * abort those bulk listeners. */
450 for (pages = 0, i = 0; i < num_oa; i++) {
451 for (j = 0; j < oa_bufs[i]; j++, pages++) {
452 if (bulk[pages] == NULL)
455 OBD_FREE(bulk[pages], sizeof(**bulk));
459 OBD_FREE(bulk, pages * sizeof(*bulk));
460 ptlrpc_free_req(request);
464 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
465 obd_count *oa_bufs, struct page **buf, obd_size *count,
466 obd_off *offset, obd_flag *flags)
468 struct ptlrpc_client *cl = osc_con2cl(conn);
469 struct ptlrpc_request *request;
470 struct obd_ioobj ioo;
471 struct ost_body *body;
473 int pages, rc, i, j, size[3] = {sizeof(*body)};
477 size[1] = num_oa * sizeof(ioo);
479 for (i = 0; i < num_oa; i++)
481 size[2] = pages * sizeof(*src);
483 OBD_ALLOC(src, size[2]);
487 request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
490 body = lustre_msg_buf(request->rq_reqmsg, 0);
491 body->data = OBD_BRW_WRITE;
493 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
494 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
495 for (pages = 0, i = 0; i < num_oa; i++) {
496 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
497 for (j = 0; j < oa_bufs[i]; j++, pages++) {
498 ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
499 count[pages], flags[pages], 0);
502 memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
504 size[1] = pages * sizeof(struct niobuf);
505 request->rq_replen = lustre_msg_size(2, size);
507 rc = ptlrpc_queue_wait(cl, request);
511 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
513 GOTO(out, rc = -EINVAL);
515 if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
516 CERROR("buffer length wrong (%d vs. %d)\n",
517 request->rq_repmsg->buflens[1],
518 pages * sizeof(struct niobuf));
519 GOTO(out, rc = -EINVAL);
522 for (pages = 0, i = 0; i < num_oa; i++) {
523 for (j = 0; j < oa_bufs[i]; j++, pages++) {
525 ost_unpack_niobuf(&ptr2, &dst);
526 osc_sendpage(conn, request, dst, &src[pages]);
529 OBD_FREE(src, size[2]);
531 for (pages = 0, i = 0; i < num_oa; i++)
532 for (j = 0; j < oa_bufs[i]; j++, pages++)
535 ptlrpc_free_req(request);
539 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
540 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
541 obd_size *count, obd_off *offset, obd_flag *flags)
543 if (rw == OBD_BRW_READ)
544 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
547 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
551 /* mount the file system (secretly) */
552 static int osc_setup(struct obd_device *obddev, obd_count len,
556 struct osc_obd *osc = &obddev->u.osc;
557 struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
559 int dev = data->ioc_dev;
562 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
563 if (osc->osc_client == NULL)
566 ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
568 rc = ptlrpc_connect_client(dev, "ost", osc->osc_client);
575 static int osc_cleanup(struct obd_device * obddev)
577 struct osc_obd *osc = &obddev->u.osc;
579 if (osc->osc_client != NULL)
580 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
586 struct obd_ops osc_obd_ops = {
588 o_cleanup: osc_cleanup,
589 o_create: osc_create,
590 o_destroy: osc_destroy,
591 o_getattr: osc_getattr,
592 o_setattr: osc_setattr,
595 o_connect: osc_connect,
596 o_disconnect: osc_disconnect,
601 static int __init osc_init(void)
603 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
607 static void __exit osc_exit(void)
609 obd_unregister_type(LUSTRE_OSC_NAME);
612 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
613 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
614 MODULE_LICENSE("GPL");
616 module_init(osc_init);
617 module_exit(osc_exit);