1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
40 #define DEBUG_SUBSYSTEM S_OSC
42 #include <linux/obd_support.h>
43 #include <linux/obd_class.h>
44 #include <linux/lustre_lib.h>
45 #include <linux/lustre_idl.h>
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
49 struct osc_obd *osc = &conn->oc_dev->u.osc;
50 return &osc->osc_peer;
54 static int osc_connect(struct obd_conn *conn)
56 struct ptlrpc_request *request;
57 struct ptlrpc_client *peer = osc_con2cl(conn);
61 request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
63 CERROR("cannot pack req!\n");
68 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
70 rc = ptlrpc_queue_wait(peer, request);
76 CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid);
78 conn->oc_id = request->rq_rep.ost->connid;
80 ptlrpc_free_req(request);
85 static int osc_disconnect(struct obd_conn *conn)
87 struct ptlrpc_request *request;
88 struct ptlrpc_client *peer = osc_con2cl(conn);
92 request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
94 CERROR("cannot pack req!\n");
97 request->rq_req.ost->connid = conn->oc_id;
99 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
101 rc = ptlrpc_queue_wait(peer, request);
107 ptlrpc_free_req(request);
113 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
115 struct ptlrpc_request *request;
116 struct ptlrpc_client *peer = osc_con2cl(conn);
119 request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
121 CERROR("cannot pack req!\n");
125 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
126 request->rq_req.ost->oa.o_valid = ~0;
128 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
130 rc = ptlrpc_queue_wait(peer, request);
136 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
138 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
142 ptlrpc_free_req(request);
146 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
148 struct ptlrpc_request *request;
149 struct ptlrpc_client *peer = osc_con2cl(conn);
152 request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
154 CERROR("cannot pack req!\n");
158 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
160 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
162 rc = ptlrpc_queue_wait(peer, request);
169 ptlrpc_free_req(request);
173 static int osc_create(struct obd_conn *conn, struct obdo *oa)
175 struct ptlrpc_request *request;
176 struct ptlrpc_client *peer = osc_con2cl(conn);
182 request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
184 CERROR("cannot pack req!\n");
188 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
189 request->rq_req.ost->connid = conn->oc_id;
190 request->rq_req.ost->oa.o_valid = ~0;
192 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
194 rc = ptlrpc_queue_wait(peer, request);
199 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
202 ptlrpc_free_req(request);
206 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
209 struct ptlrpc_request *request;
210 struct ptlrpc_client *peer = osc_con2cl(conn);
216 request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
218 CERROR("cannot pack req!\n");
222 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
223 request->rq_req.ost->oa.o_valid = ~0;
224 request->rq_req.ost->oa.o_size = offset;
225 request->rq_req.ost->oa.o_blocks = count;
227 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
229 rc = ptlrpc_queue_wait(peer, request);
234 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
237 ptlrpc_free_req(request);
241 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
243 struct ptlrpc_request *request;
244 struct ptlrpc_client *peer = osc_con2cl(conn);
250 request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
252 CERROR("cannot pack req!\n");
256 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
257 request->rq_req.ost->oa.o_valid = ~0;
259 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
261 rc = ptlrpc_queue_wait(peer, request);
266 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
269 ptlrpc_free_req(request);
273 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
274 struct niobuf *dst, struct niobuf *src)
276 if (conn->oc_id != -1) {
278 memcpy((char *)(unsigned long)dst->addr,
279 (char *)(unsigned long)src->addr, src->len);
281 struct ptlrpc_client *cl = osc_con2cl(conn);
282 struct ptlrpc_bulk_desc *bulk;
286 bulk = ptlrpc_prep_bulk(&cl->cli_server);
290 spin_lock(&cl->cli_lock);
291 bulk->b_xid = cl->cli_xid++;
292 spin_unlock(&cl->cli_lock);
294 OBD_ALLOC(buf, src->len);
296 OBD_FREE(bulk, sizeof(*bulk));
300 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
303 bulk->b_buflen = src->len;
304 /* FIXME: maybe we should add an XID to struct niobuf? */
305 bulk->b_xid = (__u32)(unsigned long)src->page;
307 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
309 CERROR("send_bulk failed: %d\n", rc);
313 wait_event_interruptible(bulk->b_waitq,
314 ptlrpc_check_bulk_sent(bulk));
316 if (bulk->b_flags == PTL_RPC_INTR) {
318 /* FIXME: hey hey, we leak here. */
322 OBD_FREE(bulk, sizeof(*bulk));
323 OBD_FREE(buf, src->len);
329 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
330 obd_count *oa_bufs, struct page **buf, obd_size *count,
331 obd_off *offset, obd_flag *flags)
333 struct ptlrpc_client *cl = osc_con2cl(conn);
334 struct ptlrpc_request *request;
337 struct obd_ioobj ioo;
339 int size1, size2 = 0;
342 struct ptlrpc_bulk_desc **bulk;
344 size1 = num_oa * sizeof(ioo);
346 for (i = 0; i < num_oa; i++) {
347 size2 += oa_bufs[i] * sizeof(src);
351 /* We actually pack a _third_ buffer, with XIDs for bulk pages */
352 size2 += pages * sizeof(__u32);
353 request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
355 CERROR("cannot pack req!\n");
358 request->rq_req.ost->cmd = OBD_BRW_READ;
360 OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
362 CERROR("cannot alloc bulk desc vector\n");
365 memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
368 ptr1 = ost_req_buf1(request->rq_req.ost);
369 ptr2 = ost_req_buf2(request->rq_req.ost);
370 for (i = 0; i < num_oa; i++) {
371 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
372 for (j = 0; j < oa_bufs[i]; j++) {
373 bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
374 if (bulk[n] == NULL) {
375 CERROR("cannot alloc bulk desc\n");
380 spin_lock(&cl->cli_lock);
381 bulk[n]->b_xid = cl->cli_xid++;
382 spin_unlock(&cl->cli_lock);
383 bulk[n]->b_buf = kmap(buf[n]);
384 bulk[n]->b_buflen = PAGE_SIZE;
385 bulk[n]->b_portal = OST_BULK_PORTAL;
386 ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
387 count[n], flags[n], bulk[n]->b_xid);
392 /* This is kinda silly--put the XIDs in the "third" buffer. */
393 for (n = 0; n < pages; n++) {
394 *(__u32 *)ptr2 = bulk[n]->b_xid;
395 ptr2 = (char *)ptr2 + sizeof(__u32);
397 rc = ptlrpc_register_bulk(bulk[n]);
402 request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
403 rc = ptlrpc_queue_wait(cl, request);
406 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
407 * abort those bulk listeners. */
409 if (request->rq_rephdr)
410 OBD_FREE(request->rq_rephdr, request->rq_replen);
412 for (i = 0; i < num_oa; i++) {
413 for (j = 0; j < oa_bufs[i]; j++) {
416 kunmap(bulk[n]->b_buf);
417 OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
422 OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
423 ptlrpc_free_req(request);
427 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
428 obd_count *oa_bufs, struct page **buf, obd_size *count,
429 obd_off *offset, obd_flag *flags)
431 struct ptlrpc_client *cl = osc_con2cl(conn);
432 struct ptlrpc_request *request;
433 struct obd_ioobj ioo;
435 int pages, rc, i, j, n, size1, size2 = 0;
438 size1 = num_oa * sizeof(ioo);
440 for (i = 0; i < num_oa; i++) {
441 size2 += oa_bufs[i] * sizeof(src);
445 request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
447 CERROR("cannot pack req!\n");
450 request->rq_req.ost->cmd = OBD_BRW_WRITE;
453 ptr1 = ost_req_buf1(request->rq_req.ost);
454 ptr2 = ost_req_buf2(request->rq_req.ost);
455 for (i = 0; i < num_oa; i++) {
456 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
457 for (j = 0; j < oa_bufs[i]; j++) {
458 ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
459 count[n], flags[n], 0);
464 request->rq_replen = sizeof(struct ptlrep_hdr) +
465 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
466 rc = ptlrpc_queue_wait(cl, request);
472 ptr2 = ost_rep_buf2(request->rq_rep.ost);
473 if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
474 CERROR("buffer length wrong (%d vs. %d)\n",
475 request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
480 for (i = 0; i < num_oa; i++) {
481 for (j = 0; j < oa_bufs[i]; j++) {
483 src.addr = (__u64)(unsigned long)buf[n];
485 ost_unpack_niobuf(&ptr2, &dst);
486 osc_sendpage(conn, request, dst, &src);
491 /* Reuse the request structure for the completion request. */
492 OBD_FREE(request->rq_rephdr, request->rq_replen);
493 request->rq_rephdr = NULL;
494 request->rq_repbuf = NULL;
495 request->rq_reqhdr->opc = OST_BRW_COMPLETE;
496 request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
497 rc = ptlrpc_queue_wait(cl, request);
504 if (request->rq_rephdr)
505 OBD_FREE(request->rq_rephdr, request->rq_replen);
507 for (i = 0; i < num_oa; i++) {
508 for (j = 0; j < oa_bufs[i]; j++) {
514 ptlrpc_free_req(request);
518 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
519 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
520 obd_size *count, obd_off *offset, obd_flag *flags)
522 if (rw == OBD_BRW_READ) {
523 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
526 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
531 /* mount the file system (secretly) */
532 static int osc_setup(struct obd_device *obddev, obd_count len,
536 struct osc_obd *osc = &obddev->u.osc;
537 struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
539 int dev = data->ioc_dev;
542 rc = ptlrpc_connect_client(dev, "ost",
554 static int osc_cleanup(struct obd_device * obddev)
560 struct obd_ops osc_obd_ops = {
562 o_cleanup: osc_cleanup,
563 o_create: osc_create,
564 o_destroy: osc_destroy,
565 o_getattr: osc_getattr,
566 o_setattr: osc_setattr,
567 o_connect: osc_connect,
568 o_disconnect: osc_disconnect,
573 static int __init osc_init(void)
575 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
579 static void __exit osc_exit(void)
581 obd_unregister_type(LUSTRE_OSC_NAME);
584 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
585 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
586 MODULE_LICENSE("GPL");
588 module_init(osc_init);
589 module_exit(osc_exit);