1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27 struct ptlrpc_connection **connection)
29 struct osc_obd *osc = &conn->oc_dev->u.osc;
30 *cl = osc->osc_client;
31 *connection = osc->osc_conn;
34 static int osc_connect(struct obd_conn *conn)
36 struct ptlrpc_request *request;
37 struct ptlrpc_client *cl;
38 struct ptlrpc_connection *connection;
39 struct ost_body *body;
40 int rc, size = sizeof(*body);
43 osc_con2cl(conn, &cl, &connection);
44 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
48 request->rq_replen = lustre_msg_size(1, &size);
50 rc = ptlrpc_queue_wait(request);
54 body = lustre_msg_buf(request->rq_repmsg, 0);
55 CDEBUG(D_INODE, "received connid %d\n", body->connid);
57 conn->oc_id = body->connid;
60 ptlrpc_free_req(request);
64 static int osc_disconnect(struct obd_conn *conn)
66 struct ptlrpc_request *request;
67 struct ptlrpc_client *cl;
68 struct ptlrpc_connection *connection;
69 struct ost_body *body;
70 int rc, size = sizeof(*body);
73 osc_con2cl(conn, &cl, &connection);
74 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
78 body = lustre_msg_buf(request->rq_reqmsg, 0);
79 body->connid = conn->oc_id;
81 request->rq_replen = lustre_msg_size(1, &size);
83 rc = ptlrpc_queue_wait(request);
86 ptlrpc_free_req(request);
90 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
92 struct ptlrpc_request *request;
93 struct ptlrpc_client *cl;
94 struct ptlrpc_connection *connection;
95 struct ost_body *body;
96 int rc, size = sizeof(*body);
99 osc_con2cl(conn, &cl, &connection);
100 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
104 body = lustre_msg_buf(request->rq_reqmsg, 0);
105 memcpy(&body->oa, oa, sizeof(*oa));
106 body->connid = conn->oc_id;
107 body->oa.o_valid = ~0;
109 request->rq_replen = lustre_msg_size(1, &size);
111 rc = ptlrpc_queue_wait(request);
115 body = lustre_msg_buf(request->rq_repmsg, 0);
116 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
118 memcpy(oa, &body->oa, sizeof(*oa));
122 ptlrpc_free_req(request);
126 static int osc_open(struct obd_conn *conn, struct obdo *oa)
128 struct ptlrpc_request *request;
129 struct ptlrpc_client *cl;
130 struct ptlrpc_connection *connection;
131 struct ost_body *body;
132 int rc, size = sizeof(*body);
135 osc_con2cl(conn, &cl, &connection);
136 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
140 body = lustre_msg_buf(request->rq_reqmsg, 0);
141 memcpy(&body->oa, oa, sizeof(*oa));
142 body->connid = conn->oc_id;
143 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
146 request->rq_replen = lustre_msg_size(1, &size);
148 rc = ptlrpc_queue_wait(request);
152 body = lustre_msg_buf(request->rq_repmsg, 0);
153 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
155 memcpy(oa, &body->oa, sizeof(*oa));
159 ptlrpc_free_req(request);
163 static int osc_close(struct obd_conn *conn, struct obdo *oa)
165 struct ptlrpc_request *request;
166 struct ptlrpc_client *cl;
167 struct ptlrpc_connection *connection;
168 struct ost_body *body;
169 int rc, size = sizeof(*body);
172 osc_con2cl(conn, &cl, &connection);
173 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
177 body = lustre_msg_buf(request->rq_reqmsg, 0);
178 memcpy(&body->oa, oa, sizeof(*oa));
179 body->connid = conn->oc_id;
181 request->rq_replen = lustre_msg_size(1, &size);
183 rc = ptlrpc_queue_wait(request);
187 body = lustre_msg_buf(request->rq_repmsg, 0);
188 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
190 memcpy(oa, &body->oa, sizeof(*oa));
194 ptlrpc_free_req(request);
198 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
200 struct ptlrpc_request *request;
201 struct ptlrpc_client *cl;
202 struct ptlrpc_connection *connection;
203 struct ost_body *body;
204 int rc, size = sizeof(*body);
207 osc_con2cl(conn, &cl, &connection);
208 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
212 body = lustre_msg_buf(request->rq_reqmsg, 0);
213 memcpy(&body->oa, oa, sizeof(*oa));
214 body->connid = conn->oc_id;
216 request->rq_replen = lustre_msg_size(1, &size);
218 rc = ptlrpc_queue_wait(request);
222 ptlrpc_free_req(request);
226 static int osc_create(struct obd_conn *conn, struct obdo *oa)
228 struct ptlrpc_request *request;
229 struct ptlrpc_client *cl;
230 struct ptlrpc_connection *connection;
231 struct ost_body *body;
232 int rc, size = sizeof(*body);
239 osc_con2cl(conn, &cl, &connection);
240 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
244 body = lustre_msg_buf(request->rq_reqmsg, 0);
245 memcpy(&body->oa, oa, sizeof(*oa));
246 body->oa.o_valid = ~0;
247 body->connid = conn->oc_id;
249 request->rq_replen = lustre_msg_size(1, &size);
251 rc = ptlrpc_queue_wait(request);
255 body = lustre_msg_buf(request->rq_repmsg, 0);
256 memcpy(oa, &body->oa, sizeof(*oa));
260 ptlrpc_free_req(request);
264 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
267 struct ptlrpc_request *request;
268 struct ptlrpc_client *cl;
269 struct ptlrpc_connection *connection;
270 struct ost_body *body;
271 int rc, size = sizeof(*body);
278 osc_con2cl(conn, &cl, &connection);
279 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
283 body = lustre_msg_buf(request->rq_reqmsg, 0);
284 memcpy(&body->oa, oa, sizeof(*oa));
285 body->connid = conn->oc_id;
286 body->oa.o_valid = ~0;
287 body->oa.o_size = offset;
288 body->oa.o_blocks = count;
290 request->rq_replen = lustre_msg_size(1, &size);
292 rc = ptlrpc_queue_wait(request);
296 body = lustre_msg_buf(request->rq_repmsg, 0);
297 memcpy(oa, &body->oa, sizeof(*oa));
301 ptlrpc_free_req(request);
305 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
307 struct ptlrpc_request *request;
308 struct ptlrpc_client *cl;
309 struct ptlrpc_connection *connection;
310 struct ost_body *body;
311 int rc, size = sizeof(*body);
318 osc_con2cl(conn, &cl, &connection);
319 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
323 body = lustre_msg_buf(request->rq_reqmsg, 0);
324 memcpy(&body->oa, oa, sizeof(*oa));
325 body->connid = conn->oc_id;
326 body->oa.o_valid = ~0;
328 request->rq_replen = lustre_msg_size(1, &size);
330 rc = ptlrpc_queue_wait(request);
334 body = lustre_msg_buf(request->rq_repmsg, 0);
335 memcpy(oa, &body->oa, sizeof(*oa));
339 ptlrpc_free_req(request);
343 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
344 struct niobuf *dst, struct niobuf *src)
346 struct ptlrpc_client *cl;
347 struct ptlrpc_connection *connection;
349 osc_con2cl(conn, &cl, &connection);
353 memcpy((char *)(unsigned long)dst->addr,
354 (char *)(unsigned long)src->addr, src->len);
356 struct ptlrpc_bulk_desc *bulk;
359 bulk = ptlrpc_prep_bulk(connection);
363 bulk->b_buf = (void *)(unsigned long)src->addr;
364 bulk->b_buflen = src->len;
365 bulk->b_xid = dst->xid;
366 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
368 CERROR("send_bulk failed: %d\n", rc);
369 ptlrpc_free_bulk(bulk);
373 wait_event_interruptible(bulk->b_waitq,
374 ptlrpc_check_bulk_sent(bulk));
376 if (bulk->b_flags & PTL_RPC_FL_INTR) {
377 ptlrpc_free_bulk(bulk);
381 ptlrpc_free_bulk(bulk);
387 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
388 obd_count *oa_bufs, struct page **buf, obd_size *count,
389 obd_off *offset, obd_flag *flags)
391 struct ptlrpc_client *cl;
392 struct ptlrpc_connection *connection;
393 struct ptlrpc_request *request;
394 struct ost_body *body;
395 struct obd_ioobj ioo;
397 int pages, rc, i, j, size[3] = {sizeof(*body)};
399 struct ptlrpc_bulk_desc **bulk;
402 size[1] = num_oa * sizeof(ioo);
404 for (i = 0; i < num_oa; i++)
406 size[2] = pages * sizeof(src);
408 OBD_ALLOC(bulk, pages * sizeof(*bulk));
412 osc_con2cl(conn, &cl, &connection);
413 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
415 GOTO(out, rc = -ENOMEM);
417 body = lustre_msg_buf(request->rq_reqmsg, 0);
418 body->data = OBD_BRW_READ;
420 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
421 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
422 for (pages = 0, i = 0; i < num_oa; i++) {
423 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
424 for (j = 0; j < oa_bufs[i]; j++, pages++) {
425 bulk[pages] = ptlrpc_prep_bulk(connection);
426 if (bulk[pages] == NULL)
427 GOTO(out, rc = -ENOMEM);
429 spin_lock(&connection->c_lock);
430 bulk[pages]->b_xid = ++connection->c_xid_out;
431 spin_unlock(&connection->c_lock);
433 bulk[pages]->b_buf = kmap(buf[pages]);
434 bulk[pages]->b_buflen = PAGE_SIZE;
435 bulk[pages]->b_portal = OST_BULK_PORTAL;
436 ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
437 offset[pages], count[pages],
438 flags[pages], bulk[pages]->b_xid);
440 rc = ptlrpc_register_bulk(bulk[pages]);
446 request->rq_replen = lustre_msg_size(1, size);
447 rc = ptlrpc_queue_wait(request);
451 /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
452 * abort those bulk listeners. */
454 for (pages = 0, i = 0; i < num_oa; i++) {
455 for (j = 0; j < oa_bufs[i]; j++, pages++) {
456 if (bulk[pages] == NULL)
459 ptlrpc_free_bulk(bulk[pages]);
463 OBD_FREE(bulk, pages * sizeof(*bulk));
464 ptlrpc_free_req(request);
468 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
469 obd_count *oa_bufs, struct page **buf, obd_size *count,
470 obd_off *offset, obd_flag *flags)
472 struct ptlrpc_client *cl;
473 struct ptlrpc_connection *connection;
474 struct ptlrpc_request *request;
475 struct obd_ioobj ioo;
476 struct ost_body *body;
478 int pages, rc, i, j, size[3] = {sizeof(*body)};
482 size[1] = num_oa * sizeof(ioo);
484 for (i = 0; i < num_oa; i++)
486 size[2] = pages * sizeof(*src);
488 OBD_ALLOC(src, size[2]);
492 osc_con2cl(conn, &cl, &connection);
493 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
496 body = lustre_msg_buf(request->rq_reqmsg, 0);
497 body->data = OBD_BRW_WRITE;
499 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
500 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
501 for (pages = 0, i = 0; i < num_oa; i++) {
502 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
503 for (j = 0; j < oa_bufs[i]; j++, pages++) {
504 ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
505 count[pages], flags[pages], 0);
508 memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
510 size[1] = pages * sizeof(struct niobuf);
511 request->rq_replen = lustre_msg_size(2, size);
513 rc = ptlrpc_queue_wait(request);
517 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
519 GOTO(out, rc = -EINVAL);
521 if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
522 CERROR("buffer length wrong (%d vs. %d)\n",
523 request->rq_repmsg->buflens[1],
524 pages * sizeof(struct niobuf));
525 GOTO(out, rc = -EINVAL);
528 for (pages = 0, i = 0; i < num_oa; i++) {
529 for (j = 0; j < oa_bufs[i]; j++, pages++) {
531 ost_unpack_niobuf(&ptr2, &dst);
532 osc_sendpage(conn, request, dst, &src[pages]);
535 OBD_FREE(src, size[2]);
537 for (pages = 0, i = 0; i < num_oa; i++)
538 for (j = 0; j < oa_bufs[i]; j++, pages++)
541 ptlrpc_free_req(request);
545 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
546 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
547 obd_size *count, obd_off *offset, obd_flag *flags)
549 if (rw == OBD_BRW_READ)
550 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
553 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
557 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
559 struct osc_obd *osc = &obddev->u.osc;
563 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
567 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
568 if (osc->osc_client == NULL)
569 GOTO(out_conn, rc = -ENOMEM);
571 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
572 if (osc->osc_ldlm_client == NULL)
573 GOTO(out_client, rc = -ENOMEM);
575 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
577 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
578 osc->osc_ldlm_client);
584 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
586 ptlrpc_put_connection(osc->osc_conn);
590 static int osc_cleanup(struct obd_device * obddev)
592 struct osc_obd *osc = &obddev->u.osc;
594 ptlrpc_cleanup_client(osc->osc_client);
595 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
596 ptlrpc_cleanup_client(osc->osc_ldlm_client);
597 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
598 ptlrpc_put_connection(osc->osc_conn);
604 struct obd_ops osc_obd_ops = {
606 o_cleanup: osc_cleanup,
607 o_create: osc_create,
608 o_destroy: osc_destroy,
609 o_getattr: osc_getattr,
610 o_setattr: osc_setattr,
613 o_connect: osc_connect,
614 o_disconnect: osc_disconnect,
619 static int __init osc_init(void)
621 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
625 static void __exit osc_exit(void)
627 obd_unregister_type(LUSTRE_OSC_NAME);
630 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
631 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
632 MODULE_LICENSE("GPL");
634 module_init(osc_init);
635 module_exit(osc_exit);