1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27 struct ptlrpc_connection **connection)
29 struct osc_obd *osc = &conn->oc_dev->u.osc;
30 *cl = osc->osc_client;
31 *connection = osc->osc_conn;
34 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
35 struct ptlrpc_connection **connection)
37 struct osc_obd *osc = &conn->oc_dev->u.osc;
38 *cl = osc->osc_ldlm_client;
39 *connection = osc->osc_conn;
42 static int osc_connect(struct obd_conn *conn)
44 struct ptlrpc_request *request;
45 struct ptlrpc_client *cl;
46 struct ptlrpc_connection *connection;
47 struct ost_body *body;
48 int rc, size = sizeof(*body);
51 osc_con2cl(conn, &cl, &connection);
52 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
56 request->rq_replen = lustre_msg_size(1, &size);
58 rc = ptlrpc_queue_wait(request);
59 rc = ptlrpc_check_status(request, rc);
61 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
65 body = lustre_msg_buf(request->rq_repmsg, 0);
66 CDEBUG(D_INODE, "received connid %d\n", body->connid);
68 conn->oc_id = body->connid;
71 ptlrpc_free_req(request);
75 static int osc_disconnect(struct obd_conn *conn)
77 struct ptlrpc_request *request;
78 struct ptlrpc_client *cl;
79 struct ptlrpc_connection *connection;
80 struct ost_body *body;
81 int rc, size = sizeof(*body);
84 osc_con2cl(conn, &cl, &connection);
85 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
90 body = lustre_msg_buf(request->rq_reqmsg, 0);
91 body->connid = conn->oc_id;
93 request->rq_replen = lustre_msg_size(1, &size);
95 rc = ptlrpc_queue_wait(request);
98 ptlrpc_free_req(request);
102 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
104 struct ptlrpc_request *request;
105 struct ptlrpc_client *cl;
106 struct ptlrpc_connection *connection;
107 struct ost_body *body;
108 int rc, size = sizeof(*body);
111 osc_con2cl(conn, &cl, &connection);
112 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
116 body = lustre_msg_buf(request->rq_reqmsg, 0);
117 memcpy(&body->oa, oa, sizeof(*oa));
118 body->connid = conn->oc_id;
119 body->oa.o_valid = ~0;
121 request->rq_replen = lustre_msg_size(1, &size);
123 rc = ptlrpc_queue_wait(request);
124 rc = ptlrpc_check_status(request, rc);
126 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
130 body = lustre_msg_buf(request->rq_repmsg, 0);
131 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
133 memcpy(oa, &body->oa, sizeof(*oa));
137 ptlrpc_free_req(request);
141 static int osc_open(struct obd_conn *conn, struct obdo *oa)
143 struct ptlrpc_request *request;
144 struct ptlrpc_client *cl;
145 struct ptlrpc_connection *connection;
146 struct ost_body *body;
147 int rc, size = sizeof(*body);
150 osc_con2cl(conn, &cl, &connection);
151 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
155 body = lustre_msg_buf(request->rq_reqmsg, 0);
156 memcpy(&body->oa, oa, sizeof(*oa));
157 body->connid = conn->oc_id;
158 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
161 request->rq_replen = lustre_msg_size(1, &size);
163 rc = ptlrpc_queue_wait(request);
164 rc = ptlrpc_check_status(request, rc);
168 body = lustre_msg_buf(request->rq_repmsg, 0);
169 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
171 memcpy(oa, &body->oa, sizeof(*oa));
175 ptlrpc_free_req(request);
179 static int osc_close(struct obd_conn *conn, struct obdo *oa)
181 struct ptlrpc_request *request;
182 struct ptlrpc_client *cl;
183 struct ptlrpc_connection *connection;
184 struct ost_body *body;
185 int rc, size = sizeof(*body);
188 osc_con2cl(conn, &cl, &connection);
189 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
193 body = lustre_msg_buf(request->rq_reqmsg, 0);
194 memcpy(&body->oa, oa, sizeof(*oa));
195 body->connid = conn->oc_id;
197 request->rq_replen = lustre_msg_size(1, &size);
199 rc = ptlrpc_queue_wait(request);
200 rc = ptlrpc_check_status(request, rc);
204 body = lustre_msg_buf(request->rq_repmsg, 0);
205 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
207 memcpy(oa, &body->oa, sizeof(*oa));
211 ptlrpc_free_req(request);
215 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
217 struct ptlrpc_request *request;
218 struct ptlrpc_client *cl;
219 struct ptlrpc_connection *connection;
220 struct ost_body *body;
221 int rc, size = sizeof(*body);
224 osc_con2cl(conn, &cl, &connection);
225 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
229 body = lustre_msg_buf(request->rq_reqmsg, 0);
230 memcpy(&body->oa, oa, sizeof(*oa));
231 body->connid = conn->oc_id;
233 request->rq_replen = lustre_msg_size(1, &size);
235 rc = ptlrpc_queue_wait(request);
236 rc = ptlrpc_check_status(request, rc);
240 ptlrpc_free_req(request);
244 static int osc_create(struct obd_conn *conn, struct obdo *oa)
246 struct ptlrpc_request *request;
247 struct ptlrpc_client *cl;
248 struct ptlrpc_connection *connection;
249 struct ost_body *body;
250 int rc, size = sizeof(*body);
257 osc_con2cl(conn, &cl, &connection);
258 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
262 body = lustre_msg_buf(request->rq_reqmsg, 0);
263 memcpy(&body->oa, oa, sizeof(*oa));
264 body->oa.o_valid = ~0;
265 body->connid = conn->oc_id;
267 request->rq_replen = lustre_msg_size(1, &size);
269 rc = ptlrpc_queue_wait(request);
270 rc = ptlrpc_check_status(request, rc);
274 body = lustre_msg_buf(request->rq_repmsg, 0);
275 memcpy(oa, &body->oa, sizeof(*oa));
279 ptlrpc_free_req(request);
283 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
286 struct ptlrpc_request *request;
287 struct ptlrpc_client *cl;
288 struct ptlrpc_connection *connection;
289 struct ost_body *body;
290 int rc, size = sizeof(*body);
297 osc_con2cl(conn, &cl, &connection);
298 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
302 body = lustre_msg_buf(request->rq_reqmsg, 0);
303 memcpy(&body->oa, oa, sizeof(*oa));
304 body->connid = conn->oc_id;
305 body->oa.o_valid = ~0;
306 body->oa.o_size = offset;
307 body->oa.o_blocks = count;
309 request->rq_replen = lustre_msg_size(1, &size);
311 rc = ptlrpc_queue_wait(request);
312 rc = ptlrpc_check_status(request, rc);
316 body = lustre_msg_buf(request->rq_repmsg, 0);
317 memcpy(oa, &body->oa, sizeof(*oa));
321 ptlrpc_free_req(request);
325 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
327 struct ptlrpc_request *request;
328 struct ptlrpc_client *cl;
329 struct ptlrpc_connection *connection;
330 struct ost_body *body;
331 int rc, size = sizeof(*body);
338 osc_con2cl(conn, &cl, &connection);
339 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
343 body = lustre_msg_buf(request->rq_reqmsg, 0);
344 memcpy(&body->oa, oa, sizeof(*oa));
345 body->connid = conn->oc_id;
346 body->oa.o_valid = ~0;
348 request->rq_replen = lustre_msg_size(1, &size);
350 rc = ptlrpc_queue_wait(request);
351 rc = ptlrpc_check_status(request, rc);
355 body = lustre_msg_buf(request->rq_repmsg, 0);
356 memcpy(oa, &body->oa, sizeof(*oa));
360 ptlrpc_free_req(request);
364 struct osc_brw_cb_data {
366 struct ptlrpc_request *req;
367 bulk_callback_t callback;
371 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
373 struct osc_brw_cb_data *cb_data = data;
375 if (desc->b_flags & PTL_RPC_FL_INTR)
376 CERROR("got signal\n");
378 (cb_data->callback)(desc, cb_data->cb_data);
380 ptlrpc_free_bulk(desc);
381 ptlrpc_free_req(cb_data->req);
383 OBD_FREE(cb_data, sizeof(*cb_data));
386 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
387 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
388 obd_size *count, obd_off *offset, obd_flag *flags,
389 bulk_callback_t callback)
391 struct ptlrpc_client *cl;
392 struct ptlrpc_connection *connection;
393 struct ptlrpc_request *request;
394 struct ost_body *body;
395 struct list_head *tmp;
396 int pages, rc, i, j, size[3] = {sizeof(*body)};
398 struct ptlrpc_bulk_desc *desc;
401 size[1] = num_oa * sizeof(struct obd_ioobj);
403 for (i = 0; i < num_oa; i++)
405 size[2] = pages * sizeof(struct niobuf_remote);
407 osc_con2cl(conn, &cl, &connection);
408 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
410 GOTO(out, rc = -ENOMEM);
412 body = lustre_msg_buf(request->rq_reqmsg, 0);
413 body->data = OBD_BRW_READ;
415 desc = ptlrpc_prep_bulk(connection);
417 GOTO(out2, rc = -ENOMEM);
418 desc->b_portal = OST_BULK_PORTAL;
420 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
421 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
422 for (pages = 0, i = 0; i < num_oa; i++) {
423 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
424 /* FIXME: this inner loop is wrong for multiple OAs */
425 for (j = 0; j < oa_bufs[i]; j++, pages++) {
426 struct ptlrpc_bulk_page *bulk;
427 bulk = ptlrpc_prep_bulk_page(desc);
429 GOTO(out3, rc = -ENOMEM);
431 spin_lock(&connection->c_lock);
432 bulk->b_xid = ++connection->c_xid_out;
433 spin_unlock(&connection->c_lock);
435 bulk->b_buf = kmap(buf[pages]);
436 bulk->b_page = buf[pages];
437 bulk->b_buflen = PAGE_SIZE;
438 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
439 flags[pages], bulk->b_xid);
443 rc = ptlrpc_register_bulk(desc);
447 request->rq_replen = lustre_msg_size(1, size);
448 rc = ptlrpc_queue_wait(request);
449 rc = ptlrpc_check_status(request, rc);
451 ptlrpc_abort_bulk(desc);
455 list_for_each(tmp, &desc->b_page_list) {
456 struct ptlrpc_bulk_page *bulk;
457 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
458 if (bulk->b_buf != NULL)
459 kunmap(bulk->b_page);
461 ptlrpc_free_bulk(desc);
463 ptlrpc_free_req(request);
468 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
470 struct osc_brw_cb_data *cb_data = data;
473 if (desc->b_flags & PTL_RPC_FL_INTR)
474 CERROR("got signal\n");
476 for (i = 0; i < desc->b_page_count; i++)
477 kunmap(cb_data->buf[i]);
479 (cb_data->callback)(desc, cb_data->cb_data);
481 ptlrpc_free_bulk(desc);
482 ptlrpc_free_req(cb_data->req);
484 OBD_FREE(cb_data, sizeof(*cb_data));
487 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
488 struct obdo **oa, obd_count *oa_bufs,
489 struct page **pagearray, obd_size *count,
490 obd_off *offset, obd_flag *flags,
491 bulk_callback_t callback)
493 struct ptlrpc_client *cl;
494 struct ptlrpc_connection *connection;
495 struct ptlrpc_request *request;
496 struct ptlrpc_bulk_desc *desc;
497 struct obd_ioobj ioo;
498 struct ost_body *body;
499 struct niobuf_local *local;
500 struct niobuf_remote *remote;
501 struct osc_brw_cb_data *cb_data;
503 int rc, i, j, size[3] = {sizeof(*body)};
507 size[1] = num_oa * sizeof(ioo);
509 for (i = 0; i < num_oa; i++)
511 size[2] = pages * sizeof(*remote);
513 OBD_ALLOC(local, pages * sizeof(*local));
517 osc_con2cl(conn, &cl, &connection);
518 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
520 GOTO(out, rc = -ENOMEM);
521 body = lustre_msg_buf(request->rq_reqmsg, 0);
522 body->data = OBD_BRW_WRITE;
524 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
525 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
526 for (pages = 0, i = 0; i < num_oa; i++) {
527 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
528 for (j = 0; j < oa_bufs[i]; j++, pages++) {
529 local[pages].addr = kmap(pagearray[pages]);
530 local[pages].offset = offset[pages];
531 local[pages].len = count[pages];
532 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
537 size[1] = pages * sizeof(struct niobuf_remote);
538 request->rq_replen = lustre_msg_size(2, size);
540 rc = ptlrpc_queue_wait(request);
541 rc = ptlrpc_check_status(request, rc);
545 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
547 GOTO(out2, rc = -EINVAL);
549 if (request->rq_repmsg->buflens[1] !=
550 pages * sizeof(struct niobuf_remote)) {
551 CERROR("buffer length wrong (%d vs. %ld)\n",
552 request->rq_repmsg->buflens[1],
553 pages * sizeof(struct niobuf_remote));
554 GOTO(out2, rc = -EINVAL);
557 desc = ptlrpc_prep_bulk(connection);
558 desc->b_portal = OSC_BULK_PORTAL;
560 desc->b_cb = brw_write_finish;
561 OBD_ALLOC(cb_data, sizeof(*cb_data));
562 cb_data->buf = pagearray;
563 cb_data->callback = callback;
564 desc->b_cb_data = cb_data;
567 for (pages = 0, i = 0; i < num_oa; i++) {
568 for (j = 0; j < oa_bufs[i]; j++, pages++) {
569 struct ptlrpc_bulk_page *page;
571 ost_unpack_niobuf(&ptr2, &remote);
573 page = ptlrpc_prep_bulk_page(desc);
575 GOTO(out3, rc = -ENOMEM);
577 page->b_buf = (void *)(unsigned long)local[pages].addr;
578 page->b_buflen = local[pages].len;
579 page->b_xid = remote->xid;
583 if (desc->b_page_count != pages)
586 rc = ptlrpc_send_bulk(desc);
592 /* If there's no callback function, sleep here until complete. */
593 wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
594 if (desc->b_flags & PTL_RPC_FL_INTR)
600 ptlrpc_free_bulk(desc);
602 ptlrpc_free_req(request);
603 for (pages = 0, i = 0; i < num_oa; i++)
604 for (j = 0; j < oa_bufs[i]; j++, pages++)
605 kunmap(pagearray[pages]);
607 OBD_FREE(local, pages * sizeof(*local));
612 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
613 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
614 obd_size *count, obd_off *offset, obd_flag *flags,
620 if (rw == OBD_BRW_READ)
621 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
622 offset, flags, (bulk_callback_t)callback);
624 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
625 offset, flags, (bulk_callback_t)callback);
628 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
629 struct ldlm_handle *parent_lock, __u64 *res_id,
630 __u32 type, struct ldlm_extent *extent, __u32 mode,
631 int *flags, void *data, int datalen,
632 struct ldlm_handle *lockh)
634 struct ptlrpc_connection *conn;
635 struct ptlrpc_client *cl;
639 /* Filesystem locks are given a bit of special treatment: first we
640 * fixup the lock to start and end on page boundaries. */
641 extent->start &= PAGE_MASK;
642 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
644 /* Next, search for already existing extent locks that will cover us */
645 osc_con2dlmcl(oconn, &cl, &conn);
646 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
648 /* We already have a lock, and it's referenced */
652 /* Next, search for locks that we can upgrade (if we're trying to write)
653 * or are more than we need (if we're trying to read). Because the VFS
654 * and page cache already protect us locally, lots of readers/writers
655 * can share a single PW lock. */
661 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
664 struct ldlm_lock *lock = ldlm_handle2object(lockh);
665 /* FIXME: This is not incredibly elegant, but it might
666 * be more elegant than adding another parameter to
667 * lock_match. I want a second opinion. */
668 ldlm_lock_addref(lock, mode);
669 ldlm_lock_decref(lock, mode2);
674 rc = ldlm_cli_convert(cl, lockh, type, &flags);
681 rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
682 extent, mode, flags, data, datalen, lockh);
686 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
687 struct ldlm_handle *lockh)
689 struct ldlm_lock *lock;
692 lock = ldlm_handle2object(lockh);
693 ldlm_lock_decref(lock, mode);
698 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
700 struct osc_obd *osc = &obddev->u.osc;
704 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
708 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
709 if (osc->osc_client == NULL)
710 GOTO(out_conn, rc = -ENOMEM);
712 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
713 if (osc->osc_ldlm_client == NULL)
714 GOTO(out_client, rc = -ENOMEM);
716 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
718 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
719 osc->osc_ldlm_client);
720 osc->osc_client->cli_name = "osc";
721 osc->osc_ldlm_client->cli_name = "ldlm";
727 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
729 ptlrpc_put_connection(osc->osc_conn);
733 static int osc_cleanup(struct obd_device * obddev)
735 struct osc_obd *osc = &obddev->u.osc;
737 ptlrpc_cleanup_client(osc->osc_client);
738 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
739 ptlrpc_cleanup_client(osc->osc_ldlm_client);
740 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
741 ptlrpc_put_connection(osc->osc_conn);
747 struct obd_ops osc_obd_ops = {
749 o_cleanup: osc_cleanup,
750 o_create: osc_create,
751 o_destroy: osc_destroy,
752 o_getattr: osc_getattr,
753 o_setattr: osc_setattr,
756 o_connect: osc_connect,
757 o_disconnect: osc_disconnect,
760 o_enqueue: osc_enqueue,
764 static int __init osc_init(void)
766 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
770 static void __exit osc_exit(void)
772 obd_unregister_type(LUSTRE_OSC_NAME);
775 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
776 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
777 MODULE_LICENSE("GPL");
779 module_init(osc_init);
780 module_exit(osc_exit);