1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection)
31 struct osc_obd *osc = &conn->oc_dev->u.osc;
32 *cl = osc->osc_client;
33 *connection = osc->osc_conn;
36 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
37 struct ptlrpc_connection **connection)
39 struct osc_obd *osc = &conn->oc_dev->u.osc;
40 *cl = osc->osc_ldlm_client;
41 *connection = osc->osc_conn;
44 static int osc_connect(struct obd_conn *conn)
46 struct ptlrpc_request *request;
47 struct ptlrpc_client *cl;
48 struct ptlrpc_connection *connection;
49 struct ost_body *body;
50 int rc, size = sizeof(*body);
53 osc_con2cl(conn, &cl, &connection);
54 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
58 request->rq_replen = lustre_msg_size(1, &size);
60 rc = ptlrpc_queue_wait(request);
61 rc = ptlrpc_check_status(request, rc);
63 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
67 body = lustre_msg_buf(request->rq_repmsg, 0);
68 CDEBUG(D_INODE, "received connid %d\n", body->connid);
70 conn->oc_id = body->connid;
73 ptlrpc_free_req(request);
77 static int osc_disconnect(struct obd_conn *conn)
79 struct ptlrpc_request *request;
80 struct ptlrpc_client *cl;
81 struct ptlrpc_connection *connection;
82 struct ost_body *body;
83 int rc, size = sizeof(*body);
86 osc_con2cl(conn, &cl, &connection);
87 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
92 body = lustre_msg_buf(request->rq_reqmsg, 0);
93 body->connid = conn->oc_id;
95 request->rq_replen = lustre_msg_size(1, &size);
97 rc = ptlrpc_queue_wait(request);
100 ptlrpc_free_req(request);
104 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
106 struct ptlrpc_request *request;
107 struct ptlrpc_client *cl;
108 struct ptlrpc_connection *connection;
109 struct ost_body *body;
110 int rc, size = sizeof(*body);
113 osc_con2cl(conn, &cl, &connection);
114 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
118 body = lustre_msg_buf(request->rq_reqmsg, 0);
119 memcpy(&body->oa, oa, sizeof(*oa));
120 body->connid = conn->oc_id;
121 body->oa.o_valid = ~0;
123 request->rq_replen = lustre_msg_size(1, &size);
125 rc = ptlrpc_queue_wait(request);
126 rc = ptlrpc_check_status(request, rc);
128 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
132 body = lustre_msg_buf(request->rq_repmsg, 0);
133 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
135 memcpy(oa, &body->oa, sizeof(*oa));
139 ptlrpc_free_req(request);
143 static int osc_open(struct obd_conn *conn, struct obdo *oa)
145 struct ptlrpc_request *request;
146 struct ptlrpc_client *cl;
147 struct ptlrpc_connection *connection;
148 struct ost_body *body;
149 int rc, size = sizeof(*body);
152 osc_con2cl(conn, &cl, &connection);
153 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
157 body = lustre_msg_buf(request->rq_reqmsg, 0);
158 memcpy(&body->oa, oa, sizeof(*oa));
159 body->connid = conn->oc_id;
160 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
162 request->rq_replen = lustre_msg_size(1, &size);
164 rc = ptlrpc_queue_wait(request);
165 rc = ptlrpc_check_status(request, rc);
169 body = lustre_msg_buf(request->rq_repmsg, 0);
170 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
172 memcpy(oa, &body->oa, sizeof(*oa));
176 ptlrpc_free_req(request);
180 static int osc_close(struct obd_conn *conn, struct obdo *oa)
182 struct ptlrpc_request *request;
183 struct ptlrpc_client *cl;
184 struct ptlrpc_connection *connection;
185 struct ost_body *body;
186 int rc, size = sizeof(*body);
189 osc_con2cl(conn, &cl, &connection);
190 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
194 body = lustre_msg_buf(request->rq_reqmsg, 0);
195 memcpy(&body->oa, oa, sizeof(*oa));
196 body->connid = conn->oc_id;
198 request->rq_replen = lustre_msg_size(1, &size);
200 rc = ptlrpc_queue_wait(request);
201 rc = ptlrpc_check_status(request, rc);
205 body = lustre_msg_buf(request->rq_repmsg, 0);
206 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208 memcpy(oa, &body->oa, sizeof(*oa));
212 ptlrpc_free_req(request);
216 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
218 struct ptlrpc_request *request;
219 struct ptlrpc_client *cl;
220 struct ptlrpc_connection *connection;
221 struct ost_body *body;
222 int rc, size = sizeof(*body);
225 osc_con2cl(conn, &cl, &connection);
226 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
230 body = lustre_msg_buf(request->rq_reqmsg, 0);
231 memcpy(&body->oa, oa, sizeof(*oa));
232 body->connid = conn->oc_id;
234 request->rq_replen = lustre_msg_size(1, &size);
236 rc = ptlrpc_queue_wait(request);
237 rc = ptlrpc_check_status(request, rc);
241 ptlrpc_free_req(request);
245 static int osc_create(struct obd_conn *conn, struct obdo *oa)
247 struct ptlrpc_request *request;
248 struct ptlrpc_client *cl;
249 struct ptlrpc_connection *connection;
250 struct ost_body *body;
251 struct mds_objid *objid;
252 struct lov_object_id *lov_id;
253 int rc, size = sizeof(*body);
260 osc_con2cl(conn, &cl, &connection);
261 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
265 body = lustre_msg_buf(request->rq_reqmsg, 0);
266 memcpy(&body->oa, oa, sizeof(*oa));
267 body->oa.o_valid = ~0;
268 body->connid = conn->oc_id;
270 request->rq_replen = lustre_msg_size(1, &size);
272 rc = ptlrpc_queue_wait(request);
273 rc = ptlrpc_check_status(request, rc);
277 body = lustre_msg_buf(request->rq_repmsg, 0);
278 memcpy(oa, &body->oa, sizeof(*oa));
280 memset(oa->o_inline, 0, sizeof(oa->o_inline));
281 objid = (struct mds_objid *)oa->o_inline;
282 objid->mo_lov_md.lmd_object_id = oa->o_id;
283 objid->mo_lov_md.lmd_stripe_count = 1;
284 lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
285 lov_id->l_device_id = 0;
286 lov_id->l_object_id = oa->o_id;
290 ptlrpc_free_req(request);
294 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
297 struct ptlrpc_request *request;
298 struct ptlrpc_client *cl;
299 struct ptlrpc_connection *connection;
300 struct ost_body *body;
301 int rc, size = sizeof(*body);
308 osc_con2cl(conn, &cl, &connection);
309 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
313 body = lustre_msg_buf(request->rq_reqmsg, 0);
314 memcpy(&body->oa, oa, sizeof(*oa));
315 body->connid = conn->oc_id;
316 body->oa.o_valid = ~0;
317 body->oa.o_size = offset;
318 body->oa.o_blocks = count;
320 request->rq_replen = lustre_msg_size(1, &size);
322 rc = ptlrpc_queue_wait(request);
323 rc = ptlrpc_check_status(request, rc);
327 body = lustre_msg_buf(request->rq_repmsg, 0);
328 memcpy(oa, &body->oa, sizeof(*oa));
332 ptlrpc_free_req(request);
336 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
338 struct ptlrpc_request *request;
339 struct ptlrpc_client *cl;
340 struct ptlrpc_connection *connection;
341 struct ost_body *body;
342 int rc, size = sizeof(*body);
349 osc_con2cl(conn, &cl, &connection);
350 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
354 body = lustre_msg_buf(request->rq_reqmsg, 0);
355 memcpy(&body->oa, oa, sizeof(*oa));
356 body->connid = conn->oc_id;
357 body->oa.o_valid = ~0;
359 request->rq_replen = lustre_msg_size(1, &size);
361 rc = ptlrpc_queue_wait(request);
362 rc = ptlrpc_check_status(request, rc);
366 body = lustre_msg_buf(request->rq_repmsg, 0);
367 memcpy(oa, &body->oa, sizeof(*oa));
371 ptlrpc_free_req(request);
375 struct osc_brw_cb_data {
377 struct ptlrpc_request *req;
378 bulk_callback_t callback;
382 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
384 struct osc_brw_cb_data *cb_data = data;
386 if (desc->b_flags & PTL_RPC_FL_INTR)
387 CERROR("got signal\n");
389 (cb_data->callback)(desc, cb_data->cb_data);
391 ptlrpc_free_bulk(desc);
392 ptlrpc_free_req(cb_data->req);
394 OBD_FREE(cb_data, sizeof(*cb_data));
397 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
398 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
399 obd_size *count, obd_off *offset, obd_flag *flags,
400 bulk_callback_t callback)
402 struct ptlrpc_client *cl;
403 struct ptlrpc_connection *connection;
404 struct ptlrpc_request *request;
405 struct ost_body *body;
406 struct list_head *tmp;
407 int pages, rc, i, j, size[3] = {sizeof(*body)};
409 struct ptlrpc_bulk_desc *desc;
412 size[1] = num_oa * sizeof(struct obd_ioobj);
414 for (i = 0; i < num_oa; i++)
416 size[2] = pages * sizeof(struct niobuf_remote);
418 osc_con2cl(conn, &cl, &connection);
419 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
421 GOTO(out, rc = -ENOMEM);
423 body = lustre_msg_buf(request->rq_reqmsg, 0);
424 body->data = OBD_BRW_READ;
426 desc = ptlrpc_prep_bulk(connection);
428 GOTO(out2, rc = -ENOMEM);
429 desc->b_portal = OST_BULK_PORTAL;
431 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
432 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
433 for (pages = 0, i = 0; i < num_oa; i++) {
434 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
435 /* FIXME: this inner loop is wrong for multiple OAs */
436 for (j = 0; j < oa_bufs[i]; j++, pages++) {
437 struct ptlrpc_bulk_page *bulk;
438 bulk = ptlrpc_prep_bulk_page(desc);
440 GOTO(out3, rc = -ENOMEM);
442 spin_lock(&connection->c_lock);
443 bulk->b_xid = ++connection->c_xid_out;
444 spin_unlock(&connection->c_lock);
446 bulk->b_buf = kmap(buf[pages]);
447 bulk->b_page = buf[pages];
448 bulk->b_buflen = PAGE_SIZE;
449 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
450 flags[pages], bulk->b_xid);
454 rc = ptlrpc_register_bulk(desc);
458 request->rq_replen = lustre_msg_size(1, size);
459 rc = ptlrpc_queue_wait(request);
460 rc = ptlrpc_check_status(request, rc);
462 ptlrpc_abort_bulk(desc);
466 list_for_each(tmp, &desc->b_page_list) {
467 struct ptlrpc_bulk_page *bulk;
468 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
469 if (bulk->b_buf != NULL)
470 kunmap(bulk->b_page);
472 ptlrpc_free_bulk(desc);
474 ptlrpc_free_req(request);
479 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
481 struct osc_brw_cb_data *cb_data = data;
485 if (desc->b_flags & PTL_RPC_FL_INTR)
486 CERROR("got signal\n");
488 for (i = 0; i < desc->b_page_count; i++)
489 kunmap(cb_data->buf[i]);
491 (cb_data->callback)(desc, cb_data->cb_data);
493 ptlrpc_free_bulk(desc);
494 ptlrpc_free_req(cb_data->req);
496 OBD_FREE(cb_data, sizeof(*cb_data));
500 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
501 struct obdo **oa, obd_count *oa_bufs,
502 struct page **pagearray, obd_size *count,
503 obd_off *offset, obd_flag *flags,
504 bulk_callback_t callback)
506 struct ptlrpc_client *cl;
507 struct ptlrpc_connection *connection;
508 struct ptlrpc_request *request;
509 struct ptlrpc_bulk_desc *desc;
510 struct obd_ioobj ioo;
511 struct ost_body *body;
512 struct niobuf_local *local;
513 struct niobuf_remote *remote;
514 struct osc_brw_cb_data *cb_data;
516 int rc, i, j, size[3] = {sizeof(*body)};
520 size[1] = num_oa * sizeof(ioo);
522 for (i = 0; i < num_oa; i++)
524 size[2] = pages * sizeof(*remote);
526 OBD_ALLOC(local, pages * sizeof(*local));
530 osc_con2cl(conn, &cl, &connection);
531 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
533 GOTO(out, rc = -ENOMEM);
534 body = lustre_msg_buf(request->rq_reqmsg, 0);
535 body->data = OBD_BRW_WRITE;
537 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
538 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
539 for (pages = 0, i = 0; i < num_oa; i++) {
540 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
541 for (j = 0; j < oa_bufs[i]; j++, pages++) {
542 local[pages].addr = kmap(pagearray[pages]);
543 local[pages].offset = offset[pages];
544 local[pages].len = count[pages];
545 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
550 size[1] = pages * sizeof(struct niobuf_remote);
551 request->rq_replen = lustre_msg_size(2, size);
553 rc = ptlrpc_queue_wait(request);
554 rc = ptlrpc_check_status(request, rc);
558 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
560 GOTO(out2, rc = -EINVAL);
562 if (request->rq_repmsg->buflens[1] !=
563 pages * sizeof(struct niobuf_remote)) {
564 CERROR("buffer length wrong (%d vs. %ld)\n",
565 request->rq_repmsg->buflens[1],
566 pages * sizeof(struct niobuf_remote));
567 GOTO(out2, rc = -EINVAL);
570 desc = ptlrpc_prep_bulk(connection);
571 desc->b_portal = OSC_BULK_PORTAL;
573 desc->b_cb = brw_write_finish;
574 OBD_ALLOC(cb_data, sizeof(*cb_data));
575 cb_data->buf = pagearray;
576 cb_data->callback = callback;
577 desc->b_cb_data = cb_data;
580 for (pages = 0, i = 0; i < num_oa; i++) {
581 for (j = 0; j < oa_bufs[i]; j++, pages++) {
582 struct ptlrpc_bulk_page *page;
584 ost_unpack_niobuf(&ptr2, &remote);
586 page = ptlrpc_prep_bulk_page(desc);
588 GOTO(out3, rc = -ENOMEM);
590 page->b_buf = (void *)(unsigned long)local[pages].addr;
591 page->b_buflen = local[pages].len;
592 page->b_xid = remote->xid;
596 if (desc->b_page_count != pages)
599 rc = ptlrpc_send_bulk(desc);
605 /* If there's no callback function, sleep here until complete. */
606 wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
607 if (desc->b_flags & PTL_RPC_FL_INTR)
613 ptlrpc_free_bulk(desc);
615 ptlrpc_free_req(request);
616 for (pages = 0, i = 0; i < num_oa; i++)
617 for (j = 0; j < oa_bufs[i]; j++, pages++)
618 kunmap(pagearray[pages]);
620 OBD_FREE(local, pages * sizeof(*local));
625 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
626 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
627 obd_size *count, obd_off *offset, obd_flag *flags,
633 if (rw == OBD_BRW_READ)
634 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
635 offset, flags, (bulk_callback_t)callback);
637 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
638 offset, flags, (bulk_callback_t)callback);
641 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
642 struct ldlm_handle *parent_lock, __u64 *res_id,
643 __u32 type, struct ldlm_extent *extent, __u32 mode,
644 int *flags, void *data, int datalen,
645 struct ldlm_handle *lockh)
647 struct ptlrpc_connection *conn;
648 struct ptlrpc_client *cl;
652 /* Filesystem locks are given a bit of special treatment: first we
653 * fixup the lock to start and end on page boundaries. */
654 extent->start &= PAGE_MASK;
655 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
657 /* Next, search for already existing extent locks that will cover us */
658 osc_con2dlmcl(oconn, &cl, &conn);
659 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
661 /* We already have a lock, and it's referenced */
665 /* Next, search for locks that we can upgrade (if we're trying to write)
666 * or are more than we need (if we're trying to read). Because the VFS
667 * and page cache already protect us locally, lots of readers/writers
668 * can share a single PW lock. */
674 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
677 struct ldlm_lock *lock = ldlm_handle2object(lockh);
678 /* FIXME: This is not incredibly elegant, but it might
679 * be more elegant than adding another parameter to
680 * lock_match. I want a second opinion. */
681 ldlm_lock_addref(lock, mode);
682 ldlm_lock_decref(lock, mode2);
687 rc = ldlm_cli_convert(cl, lockh, type, &flags);
694 rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
695 extent, mode, flags, data, datalen, lockh);
699 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
700 struct ldlm_handle *lockh)
702 struct ldlm_lock *lock;
705 lock = ldlm_handle2object(lockh);
706 ldlm_lock_decref(lock, mode);
711 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
713 struct osc_obd *osc = &obddev->u.osc;
717 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
721 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
722 if (osc->osc_client == NULL)
723 GOTO(out_conn, rc = -ENOMEM);
725 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
726 if (osc->osc_ldlm_client == NULL)
727 GOTO(out_client, rc = -ENOMEM);
729 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
731 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
732 osc->osc_ldlm_client);
733 osc->osc_client->cli_name = "osc";
734 osc->osc_ldlm_client->cli_name = "ldlm";
740 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
742 ptlrpc_put_connection(osc->osc_conn);
746 static int osc_cleanup(struct obd_device * obddev)
748 struct osc_obd *osc = &obddev->u.osc;
750 ptlrpc_cleanup_client(osc->osc_client);
751 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
752 ptlrpc_cleanup_client(osc->osc_ldlm_client);
753 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
754 ptlrpc_put_connection(osc->osc_conn);
761 static int osc_statfs(struct obd_conn *conn, struct statfs *statfs);
763 struct ptlrpc_request *request;
764 struct ptlrpc_client *cl;
765 struct ptlrpc_connection *connection;
766 struct ost_body *body;
767 int rc, size = sizeof(*body);
770 osc_con2cl(conn, &cl, &connection);
771 request = ptlrpc_prep_req(cl, connection, OST_STATFS, 1, &size, NULL);
775 body = lustre_msg_buf(request->rq_reqmsg, 0);
776 memcpy(&body->oa, oa, sizeof(*oa));
777 body->oa.o_valid = ~0;
778 body->connid = conn->oc_id;
780 request->rq_replen = lustre_msg_size(1, &size);
782 rc = ptlrpc_queue_wait(request);
783 rc = ptlrpc_check_status(request, rc);
787 body = lustre_msg_buf(request->rq_repmsg, 0);
788 memcpy(oa, &body->oa, sizeof(*oa));
792 ptlrpc_free_req(request);
797 struct obd_ops osc_obd_ops = {
799 o_cleanup: osc_cleanup,
800 o_create: osc_create,
801 o_destroy: osc_destroy,
802 o_getattr: osc_getattr,
803 o_setattr: osc_setattr,
806 o_connect: osc_connect,
807 o_disconnect: osc_disconnect,
810 o_enqueue: osc_enqueue,
814 static int __init osc_init(void)
816 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
820 static void __exit osc_exit(void)
822 obd_unregister_type(LUSTRE_OSC_NAME);
825 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
826 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
827 MODULE_LICENSE("GPL");
829 module_init(osc_init);
830 module_exit(osc_exit);