1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
49 struct osc_obd *osc = &conn->oc_dev->u.osc;
53 static int osc_connect(struct obd_conn *conn)
55 struct ptlrpc_request *request;
56 struct ptlrpc_client *peer = osc_con2cl(conn);
60 request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
62 CERROR("cannot pack req!\n");
67 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
69 rc = ptlrpc_queue_wait(peer, request);
75 CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid);
77 conn->oc_id = request->rq_rep.ost->connid;
79 ptlrpc_free_req(request);
84 static int osc_disconnect(struct obd_conn *conn)
86 struct ptlrpc_request *request;
87 struct ptlrpc_client *peer = osc_con2cl(conn);
91 request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
93 CERROR("cannot pack req!\n");
96 request->rq_req.ost->connid = conn->oc_id;
98 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
100 rc = ptlrpc_queue_wait(peer, request);
106 ptlrpc_free_req(request);
112 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
114 struct ptlrpc_request *request;
115 struct ptlrpc_client *peer = osc_con2cl(conn);
118 request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
120 CERROR("cannot pack req!\n");
124 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
125 request->rq_req.ost->connid = conn->oc_id;
126 request->rq_req.ost->oa.o_valid = ~0;
128 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
130 rc = ptlrpc_queue_wait(peer, request);
136 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
138 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
142 ptlrpc_free_req(request);
146 static int osc_open(struct obd_conn *conn, struct obdo *oa)
148 struct ptlrpc_request *request;
149 struct ptlrpc_client *peer = osc_con2cl(conn);
152 request = ptlrpc_prep_req(peer, OST_OPEN, 0, NULL, 0, NULL);
154 CERROR("cannot pack req!\n");
158 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
159 request->rq_req.ost->connid = conn->oc_id;
160 if (request->rq_req.ost->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
163 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
165 rc = ptlrpc_queue_wait(peer, request);
171 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
173 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
177 ptlrpc_free_req(request);
181 static int osc_close(struct obd_conn *conn, struct obdo *oa)
183 struct ptlrpc_request *request;
184 struct ptlrpc_client *peer = osc_con2cl(conn);
187 request = ptlrpc_prep_req(peer, OST_CLOSE, 0, NULL, 0, NULL);
189 CERROR("cannot pack req!\n");
193 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
194 request->rq_req.ost->connid = conn->oc_id;
197 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
199 rc = ptlrpc_queue_wait(peer, request);
205 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
207 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
211 ptlrpc_free_req(request);
215 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
217 struct ptlrpc_request *request;
218 struct ptlrpc_client *peer = osc_con2cl(conn);
221 request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
223 CERROR("cannot pack req!\n");
227 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
228 request->rq_req.ost->connid = conn->oc_id;
230 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
232 rc = ptlrpc_queue_wait(peer, request);
239 ptlrpc_free_req(request);
243 static int osc_create(struct obd_conn *conn, struct obdo *oa)
245 struct ptlrpc_request *request;
246 struct ptlrpc_client *peer = osc_con2cl(conn);
252 request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
254 CERROR("cannot pack req!\n");
258 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
259 request->rq_req.ost->connid = conn->oc_id;
260 request->rq_req.ost->oa.o_valid = ~0;
262 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
264 rc = ptlrpc_queue_wait(peer, request);
269 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
272 ptlrpc_free_req(request);
276 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
279 struct ptlrpc_request *request;
280 struct ptlrpc_client *peer = osc_con2cl(conn);
286 request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
288 CERROR("cannot pack req!\n");
292 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
293 request->rq_req.ost->connid = conn->oc_id;
294 request->rq_req.ost->oa.o_valid = ~0;
295 request->rq_req.ost->oa.o_size = offset;
296 request->rq_req.ost->oa.o_blocks = count;
298 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
300 rc = ptlrpc_queue_wait(peer, request);
305 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
308 ptlrpc_free_req(request);
312 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
314 struct ptlrpc_request *request;
315 struct ptlrpc_client *peer = osc_con2cl(conn);
321 request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
323 CERROR("cannot pack req!\n");
327 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
328 request->rq_req.ost->connid = conn->oc_id;
329 request->rq_req.ost->oa.o_valid = ~0;
331 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
333 rc = ptlrpc_queue_wait(peer, request);
338 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
341 ptlrpc_free_req(request);
345 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
346 struct niobuf *dst, struct niobuf *src)
348 struct ptlrpc_client *cl = osc_con2cl(conn);
352 memcpy((char *)(unsigned long)dst->addr,
353 (char *)(unsigned long)src->addr, src->len);
355 struct ptlrpc_bulk_desc *bulk;
358 bulk = ptlrpc_prep_bulk(&cl->cli_server);
362 bulk->b_buf = (void *)(unsigned long)src->addr;
363 bulk->b_buflen = src->len;
364 bulk->b_xid = dst->xid;
365 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
367 CERROR("send_bulk failed: %d\n", rc);
371 wait_event_interruptible(bulk->b_waitq,
372 ptlrpc_check_bulk_sent(bulk));
374 if (bulk->b_flags == PTL_RPC_INTR) {
376 /* FIXME: hey hey, we leak here. */
380 OBD_FREE(bulk, sizeof(*bulk));
386 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
387 obd_count *oa_bufs, struct page **buf, obd_size *count,
388 obd_off *offset, obd_flag *flags)
390 struct ptlrpc_client *cl = osc_con2cl(conn);
391 struct ptlrpc_request *request;
394 struct obd_ioobj ioo;
396 int size1, size2 = 0;
399 struct ptlrpc_bulk_desc **bulk;
401 size1 = num_oa * sizeof(ioo);
403 for (i = 0; i < num_oa; i++)
405 size2 = pages * sizeof(src);
407 request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
409 CERROR("cannot pack req!\n");
412 request->rq_req.ost->cmd = OBD_BRW_READ;
414 OBD_ALLOC(bulk, pages * sizeof(*bulk));
416 CERROR("cannot alloc bulk desc vector\n");
419 memset(bulk, 0, pages * sizeof(*bulk));
422 ptr1 = ost_req_buf1(request->rq_req.ost);
423 ptr2 = ost_req_buf2(request->rq_req.ost);
424 for (i = 0; i < num_oa; i++) {
425 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
426 for (j = 0; j < oa_bufs[i]; j++) {
427 bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
428 if (bulk[n] == NULL) {
429 CERROR("cannot alloc bulk desc\n");
434 spin_lock(&cl->cli_lock);
435 bulk[n]->b_xid = cl->cli_xid++;
436 spin_unlock(&cl->cli_lock);
437 bulk[n]->b_buf = kmap(buf[n]);
438 bulk[n]->b_buflen = PAGE_SIZE;
439 bulk[n]->b_portal = OST_BULK_PORTAL;
440 ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
441 count[n], flags[n], bulk[n]->b_xid);
443 rc = ptlrpc_register_bulk(bulk[n]);
450 request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
451 rc = ptlrpc_queue_wait(cl, request);
454 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
455 * abort those bulk listeners. */
458 for (i = 0; i < num_oa; i++) {
459 for (j = 0; j < oa_bufs[i]; j++) {
463 OBD_FREE(bulk[n], sizeof(**bulk));
468 OBD_FREE(bulk, pages * sizeof(*bulk));
469 ptlrpc_free_req(request);
473 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
474 obd_count *oa_bufs, struct page **buf, obd_size *count,
475 obd_off *offset, obd_flag *flags)
477 struct ptlrpc_client *cl = osc_con2cl(conn);
478 struct ptlrpc_request *request;
479 struct obd_ioobj ioo;
481 int pages, rc, i, j, n, size1, size2 = 0;
484 size1 = num_oa * sizeof(ioo);
486 for (i = 0; i < num_oa; i++)
488 size2 = pages * sizeof(*src);
490 OBD_ALLOC(src, size2);
492 CERROR("no src memory\n");
495 memset((char *)src, 0, size2);
497 request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
499 CERROR("cannot pack req!\n");
502 request->rq_req.ost->cmd = OBD_BRW_WRITE;
505 ptr1 = ost_req_buf1(request->rq_req.ost);
506 ptr2 = ost_req_buf2(request->rq_req.ost);
507 for (i = 0; i < num_oa; i++) {
508 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
509 for (j = 0; j < oa_bufs[i]; j++) {
510 ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
511 count[n], flags[n], 0);
515 memcpy((char *)src, (char *)ost_req_buf2(request->rq_req.ost), size2);
517 request->rq_replen = sizeof(struct ptlrep_hdr) +
518 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
519 rc = ptlrpc_queue_wait(cl, request);
525 ptr2 = ost_rep_buf2(request->rq_rep.ost);
526 if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
527 CERROR("buffer length wrong (%d vs. %d)\n",
528 request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
534 for (i = 0; i < num_oa; i++) {
535 for (j = 0; j < oa_bufs[i]; j++) {
537 ost_unpack_niobuf(&ptr2, &dst);
538 osc_sendpage(conn, request, dst, &src[n]);
542 OBD_FREE(src, size2);
545 for (i = 0; i < num_oa; i++) {
546 for (j = 0; j < oa_bufs[i]; j++) {
552 ptlrpc_free_req(request);
556 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
557 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
558 obd_size *count, obd_off *offset, obd_flag *flags)
560 if (rw == OBD_BRW_READ) {
561 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
564 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
569 /* mount the file system (secretly) */
570 static int osc_setup(struct obd_device *obddev, obd_count len,
574 struct osc_obd *osc = &obddev->u.osc;
575 struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
577 int dev = data->ioc_dev;
580 OBD_ALLOC(osc->osc_peer, sizeof(*osc->osc_peer));
581 if (osc->osc_peer == NULL)
584 rc = ptlrpc_connect_client(dev, "ost",
596 static int osc_cleanup(struct obd_device * obddev)
598 struct osc_obd *osc = &obddev->u.osc;
600 if (osc->osc_peer != NULL)
601 OBD_FREE(osc->osc_peer, sizeof(*osc->osc_peer));
607 struct obd_ops osc_obd_ops = {
609 o_cleanup: osc_cleanup,
610 o_create: osc_create,
611 o_destroy: osc_destroy,
612 o_getattr: osc_getattr,
613 o_setattr: osc_setattr,
616 o_connect: osc_connect,
617 o_disconnect: osc_disconnect,
622 static int __init osc_init(void)
624 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
628 static void __exit osc_exit(void)
630 obd_unregister_type(LUSTRE_OSC_NAME);
633 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
634 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
635 MODULE_LICENSE("GPL");
637 module_init(osc_init);
638 module_exit(osc_exit);