1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/random.h>
24 #include <linux/lustre_dlm.h>
25 #include <linux/obd_ost.h>
27 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
28 struct ptlrpc_connection **connection)
30 struct osc_obd *osc = &conn->oc_dev->u.osc;
31 *cl = osc->osc_client;
32 *connection = osc->osc_conn;
35 static int osc_connect(struct obd_conn *conn)
37 struct ptlrpc_request *request;
38 struct ptlrpc_client *cl;
39 struct ptlrpc_connection *connection;
40 struct ost_body *body;
41 int rc, size = sizeof(*body);
44 osc_con2cl(conn, &cl, &connection);
45 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
49 request->rq_replen = lustre_msg_size(1, &size);
51 rc = ptlrpc_queue_wait(request);
55 body = lustre_msg_buf(request->rq_repmsg, 0);
56 CDEBUG(D_INODE, "received connid %d\n", body->connid);
58 conn->oc_id = body->connid;
61 ptlrpc_free_req(request);
65 static int osc_disconnect(struct obd_conn *conn)
67 struct ptlrpc_request *request;
68 struct ptlrpc_client *cl;
69 struct ptlrpc_connection *connection;
70 struct ost_body *body;
71 int rc, size = sizeof(*body);
74 osc_con2cl(conn, &cl, &connection);
75 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
79 body = lustre_msg_buf(request->rq_reqmsg, 0);
80 body->connid = conn->oc_id;
82 request->rq_replen = lustre_msg_size(1, &size);
84 rc = ptlrpc_queue_wait(request);
87 ptlrpc_free_req(request);
91 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
93 struct ptlrpc_request *request;
94 struct ptlrpc_client *cl;
95 struct ptlrpc_connection *connection;
96 struct ost_body *body;
97 int rc, size = sizeof(*body);
100 osc_con2cl(conn, &cl, &connection);
101 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
105 body = lustre_msg_buf(request->rq_reqmsg, 0);
106 memcpy(&body->oa, oa, sizeof(*oa));
107 body->connid = conn->oc_id;
108 body->oa.o_valid = ~0;
110 request->rq_replen = lustre_msg_size(1, &size);
112 rc = ptlrpc_queue_wait(request);
116 body = lustre_msg_buf(request->rq_repmsg, 0);
117 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
119 memcpy(oa, &body->oa, sizeof(*oa));
123 ptlrpc_free_req(request);
127 static int osc_open(struct obd_conn *conn, struct obdo *oa)
129 struct ptlrpc_request *request;
130 struct ptlrpc_client *cl;
131 struct ptlrpc_connection *connection;
132 struct ost_body *body;
133 int rc, size = sizeof(*body);
136 osc_con2cl(conn, &cl, &connection);
137 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
141 body = lustre_msg_buf(request->rq_reqmsg, 0);
142 memcpy(&body->oa, oa, sizeof(*oa));
143 body->connid = conn->oc_id;
144 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
147 request->rq_replen = lustre_msg_size(1, &size);
149 rc = ptlrpc_queue_wait(request);
153 body = lustre_msg_buf(request->rq_repmsg, 0);
154 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156 memcpy(oa, &body->oa, sizeof(*oa));
160 ptlrpc_free_req(request);
164 static int osc_close(struct obd_conn *conn, struct obdo *oa)
166 struct ptlrpc_request *request;
167 struct ptlrpc_client *cl;
168 struct ptlrpc_connection *connection;
169 struct ost_body *body;
170 int rc, size = sizeof(*body);
173 osc_con2cl(conn, &cl, &connection);
174 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
178 body = lustre_msg_buf(request->rq_reqmsg, 0);
179 memcpy(&body->oa, oa, sizeof(*oa));
180 body->connid = conn->oc_id;
182 request->rq_replen = lustre_msg_size(1, &size);
184 rc = ptlrpc_queue_wait(request);
188 body = lustre_msg_buf(request->rq_repmsg, 0);
189 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
191 memcpy(oa, &body->oa, sizeof(*oa));
195 ptlrpc_free_req(request);
199 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
201 struct ptlrpc_request *request;
202 struct ptlrpc_client *cl;
203 struct ptlrpc_connection *connection;
204 struct ost_body *body;
205 int rc, size = sizeof(*body);
208 osc_con2cl(conn, &cl, &connection);
209 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
213 body = lustre_msg_buf(request->rq_reqmsg, 0);
214 memcpy(&body->oa, oa, sizeof(*oa));
215 body->connid = conn->oc_id;
217 request->rq_replen = lustre_msg_size(1, &size);
219 rc = ptlrpc_queue_wait(request);
223 ptlrpc_free_req(request);
227 static int osc_create(struct obd_conn *conn, struct obdo *oa)
229 struct ptlrpc_request *request;
230 struct ptlrpc_client *cl;
231 struct ptlrpc_connection *connection;
232 struct ost_body *body;
233 int rc, size = sizeof(*body);
240 osc_con2cl(conn, &cl, &connection);
241 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
245 body = lustre_msg_buf(request->rq_reqmsg, 0);
246 memcpy(&body->oa, oa, sizeof(*oa));
247 body->oa.o_valid = ~0;
248 body->connid = conn->oc_id;
250 request->rq_replen = lustre_msg_size(1, &size);
252 rc = ptlrpc_queue_wait(request);
256 body = lustre_msg_buf(request->rq_repmsg, 0);
257 memcpy(oa, &body->oa, sizeof(*oa));
261 ptlrpc_free_req(request);
265 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
268 struct ptlrpc_request *request;
269 struct ptlrpc_client *cl;
270 struct ptlrpc_connection *connection;
271 struct ost_body *body;
272 int rc, size = sizeof(*body);
279 osc_con2cl(conn, &cl, &connection);
280 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
284 body = lustre_msg_buf(request->rq_reqmsg, 0);
285 memcpy(&body->oa, oa, sizeof(*oa));
286 body->connid = conn->oc_id;
287 body->oa.o_valid = ~0;
288 body->oa.o_size = offset;
289 body->oa.o_blocks = count;
291 request->rq_replen = lustre_msg_size(1, &size);
293 rc = ptlrpc_queue_wait(request);
297 body = lustre_msg_buf(request->rq_repmsg, 0);
298 memcpy(oa, &body->oa, sizeof(*oa));
302 ptlrpc_free_req(request);
306 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
308 struct ptlrpc_request *request;
309 struct ptlrpc_client *cl;
310 struct ptlrpc_connection *connection;
311 struct ost_body *body;
312 int rc, size = sizeof(*body);
319 osc_con2cl(conn, &cl, &connection);
320 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
324 body = lustre_msg_buf(request->rq_reqmsg, 0);
325 memcpy(&body->oa, oa, sizeof(*oa));
326 body->connid = conn->oc_id;
327 body->oa.o_valid = ~0;
329 request->rq_replen = lustre_msg_size(1, &size);
331 rc = ptlrpc_queue_wait(request);
335 body = lustre_msg_buf(request->rq_repmsg, 0);
336 memcpy(oa, &body->oa, sizeof(*oa));
340 ptlrpc_free_req(request);
344 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
345 struct niobuf *dst, struct niobuf *src)
347 struct ptlrpc_client *cl;
348 struct ptlrpc_connection *connection;
350 osc_con2cl(conn, &cl, &connection);
354 memcpy((char *)(unsigned long)dst->addr,
355 (char *)(unsigned long)src->addr, src->len);
357 struct ptlrpc_bulk_desc *bulk;
360 bulk = ptlrpc_prep_bulk(connection);
364 bulk->b_buf = (void *)(unsigned long)src->addr;
365 bulk->b_buflen = src->len;
366 bulk->b_xid = dst->xid;
367 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
369 CERROR("send_bulk failed: %d\n", rc);
373 wait_event_interruptible(bulk->b_waitq,
374 ptlrpc_check_bulk_sent(bulk));
376 if (bulk->b_flags == PTL_RPC_INTR) {
378 /* FIXME: hey hey, we leak here. */
382 OBD_FREE(bulk, sizeof(*bulk));
388 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
389 obd_count *oa_bufs, struct page **buf, obd_size *count,
390 obd_off *offset, obd_flag *flags)
392 struct ptlrpc_client *cl;
393 struct ptlrpc_connection *connection;
394 struct ptlrpc_request *request;
395 struct ost_body *body;
396 struct obd_ioobj ioo;
398 int pages, rc, i, j, size[3] = {sizeof(*body)};
400 struct ptlrpc_bulk_desc **bulk;
403 size[1] = num_oa * sizeof(ioo);
405 for (i = 0; i < num_oa; i++)
407 size[2] = pages * sizeof(src);
409 OBD_ALLOC(bulk, pages * sizeof(*bulk));
413 osc_con2cl(conn, &cl, &connection);
414 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
416 GOTO(out, rc = -ENOMEM);
418 body = lustre_msg_buf(request->rq_reqmsg, 0);
419 body->data = OBD_BRW_READ;
421 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
422 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
423 for (pages = 0, i = 0; i < num_oa; i++) {
424 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
425 for (j = 0; j < oa_bufs[i]; j++, pages++) {
426 bulk[pages] = ptlrpc_prep_bulk(connection);
427 if (bulk[pages] == NULL)
428 GOTO(out, rc = -ENOMEM);
430 spin_lock(&connection->c_lock);
431 bulk[pages]->b_xid = ++connection->c_xid_out;
432 spin_unlock(&connection->c_lock);
434 bulk[pages]->b_buf = kmap(buf[pages]);
435 bulk[pages]->b_buflen = PAGE_SIZE;
436 bulk[pages]->b_portal = OST_BULK_PORTAL;
437 ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
438 offset[pages], count[pages],
439 flags[pages], bulk[pages]->b_xid);
441 rc = ptlrpc_register_bulk(bulk[pages]);
447 request->rq_replen = lustre_msg_size(1, size);
448 rc = ptlrpc_queue_wait(request);
452 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
453 * abort those bulk listeners. */
455 for (pages = 0, i = 0; i < num_oa; i++) {
456 for (j = 0; j < oa_bufs[i]; j++, pages++) {
457 if (bulk[pages] == NULL)
460 OBD_FREE(bulk[pages], sizeof(**bulk));
464 OBD_FREE(bulk, pages * sizeof(*bulk));
465 ptlrpc_free_req(request);
469 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
470 obd_count *oa_bufs, struct page **buf, obd_size *count,
471 obd_off *offset, obd_flag *flags)
473 struct ptlrpc_client *cl;
474 struct ptlrpc_connection *connection;
475 struct ptlrpc_request *request;
476 struct obd_ioobj ioo;
477 struct ost_body *body;
479 int pages, rc, i, j, size[3] = {sizeof(*body)};
483 size[1] = num_oa * sizeof(ioo);
485 for (i = 0; i < num_oa; i++)
487 size[2] = pages * sizeof(*src);
489 OBD_ALLOC(src, size[2]);
493 osc_con2cl(conn, &cl, &connection);
494 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
497 body = lustre_msg_buf(request->rq_reqmsg, 0);
498 body->data = OBD_BRW_WRITE;
500 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
501 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
502 for (pages = 0, i = 0; i < num_oa; i++) {
503 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
504 for (j = 0; j < oa_bufs[i]; j++, pages++) {
505 ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
506 count[pages], flags[pages], 0);
509 memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
511 size[1] = pages * sizeof(struct niobuf);
512 request->rq_replen = lustre_msg_size(2, size);
514 rc = ptlrpc_queue_wait(request);
518 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
520 GOTO(out, rc = -EINVAL);
522 if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
523 CERROR("buffer length wrong (%d vs. %d)\n",
524 request->rq_repmsg->buflens[1],
525 pages * sizeof(struct niobuf));
526 GOTO(out, rc = -EINVAL);
529 for (pages = 0, i = 0; i < num_oa; i++) {
530 for (j = 0; j < oa_bufs[i]; j++, pages++) {
532 ost_unpack_niobuf(&ptr2, &dst);
533 osc_sendpage(conn, request, dst, &src[pages]);
536 OBD_FREE(src, size[2]);
538 for (pages = 0, i = 0; i < num_oa; i++)
539 for (j = 0; j < oa_bufs[i]; j++, pages++)
542 ptlrpc_free_req(request);
546 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
547 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
548 obd_size *count, obd_off *offset, obd_flag *flags)
550 if (rw == OBD_BRW_READ)
551 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
554 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
558 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
560 struct osc_obd *osc = &obddev->u.osc;
565 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
569 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
570 if (osc->osc_client == NULL)
571 GOTO(out_conn, rc = -ENOMEM);
573 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
574 if (osc->osc_ldlm_client == NULL)
575 GOTO(out_client, rc = -ENOMEM);
577 ptlrpc_init_client(NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
579 ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
580 osc->osc_ldlm_client);
582 get_random_bytes(&ns_id, sizeof(ns_id));
583 rc = ldlm_cli_namespace_new(obddev, osc->osc_ldlm_client, osc->osc_conn,
586 CERROR("Couldn't create new namespace %u: %d\n", ns_id, rc);
587 GOTO(out_ldlm_client, rc);
594 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
596 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
598 ptlrpc_put_connection(osc->osc_conn);
602 static int osc_cleanup(struct obd_device * obddev)
604 struct osc_obd *osc = &obddev->u.osc;
606 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
607 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
608 ptlrpc_put_connection(osc->osc_conn);
614 struct obd_ops osc_obd_ops = {
616 o_cleanup: osc_cleanup,
617 o_create: osc_create,
618 o_destroy: osc_destroy,
619 o_getattr: osc_getattr,
620 o_setattr: osc_setattr,
623 o_connect: osc_connect,
624 o_disconnect: osc_disconnect,
629 static int __init osc_init(void)
631 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
635 static void __exit osc_exit(void)
637 obd_unregister_type(LUSTRE_OSC_NAME);
640 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
641 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
642 MODULE_LICENSE("GPL");
644 module_init(osc_init);
645 module_exit(osc_exit);