1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/obd_ost.h>
26 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
27 struct ptlrpc_connection **connection)
29 struct osc_obd *osc = &conn->oc_dev->u.osc;
30 *cl = osc->osc_client;
31 *connection = osc->osc_conn;
34 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
35 struct ptlrpc_connection **connection)
37 struct osc_obd *osc = &conn->oc_dev->u.osc;
38 *cl = osc->osc_ldlm_client;
39 *connection = osc->osc_conn;
42 static int osc_connect(struct obd_conn *conn)
44 struct ptlrpc_request *request;
45 struct ptlrpc_client *cl;
46 struct ptlrpc_connection *connection;
47 struct ost_body *body;
48 int rc, size = sizeof(*body);
51 osc_con2cl(conn, &cl, &connection);
52 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
56 request->rq_replen = lustre_msg_size(1, &size);
58 rc = ptlrpc_queue_wait(request);
59 rc = ptlrpc_check_status(request, rc);
61 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
65 body = lustre_msg_buf(request->rq_repmsg, 0);
66 CDEBUG(D_INODE, "received connid %d\n", body->connid);
68 conn->oc_id = body->connid;
71 ptlrpc_free_req(request);
75 static int osc_disconnect(struct obd_conn *conn)
77 struct ptlrpc_request *request;
78 struct ptlrpc_client *cl;
79 struct ptlrpc_connection *connection;
80 struct ost_body *body;
81 int rc, size = sizeof(*body);
84 osc_con2cl(conn, &cl, &connection);
85 request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
89 body = lustre_msg_buf(request->rq_reqmsg, 0);
90 body->connid = conn->oc_id;
92 request->rq_replen = lustre_msg_size(1, &size);
94 rc = ptlrpc_queue_wait(request);
97 ptlrpc_free_req(request);
101 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
103 struct ptlrpc_request *request;
104 struct ptlrpc_client *cl;
105 struct ptlrpc_connection *connection;
106 struct ost_body *body;
107 int rc, size = sizeof(*body);
110 osc_con2cl(conn, &cl, &connection);
111 request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
115 body = lustre_msg_buf(request->rq_reqmsg, 0);
116 memcpy(&body->oa, oa, sizeof(*oa));
117 body->connid = conn->oc_id;
118 body->oa.o_valid = ~0;
120 request->rq_replen = lustre_msg_size(1, &size);
122 rc = ptlrpc_queue_wait(request);
123 rc = ptlrpc_check_status(request, rc);
125 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
129 body = lustre_msg_buf(request->rq_repmsg, 0);
130 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
132 memcpy(oa, &body->oa, sizeof(*oa));
136 ptlrpc_free_req(request);
140 static int osc_open(struct obd_conn *conn, struct obdo *oa)
142 struct ptlrpc_request *request;
143 struct ptlrpc_client *cl;
144 struct ptlrpc_connection *connection;
145 struct ost_body *body;
146 int rc, size = sizeof(*body);
149 osc_con2cl(conn, &cl, &connection);
150 request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
154 body = lustre_msg_buf(request->rq_reqmsg, 0);
155 memcpy(&body->oa, oa, sizeof(*oa));
156 body->connid = conn->oc_id;
157 if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
160 request->rq_replen = lustre_msg_size(1, &size);
162 rc = ptlrpc_queue_wait(request);
163 rc = ptlrpc_check_status(request, rc);
167 body = lustre_msg_buf(request->rq_repmsg, 0);
168 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
170 memcpy(oa, &body->oa, sizeof(*oa));
174 ptlrpc_free_req(request);
178 static int osc_close(struct obd_conn *conn, struct obdo *oa)
180 struct ptlrpc_request *request;
181 struct ptlrpc_client *cl;
182 struct ptlrpc_connection *connection;
183 struct ost_body *body;
184 int rc, size = sizeof(*body);
187 osc_con2cl(conn, &cl, &connection);
188 request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
192 body = lustre_msg_buf(request->rq_reqmsg, 0);
193 memcpy(&body->oa, oa, sizeof(*oa));
194 body->connid = conn->oc_id;
196 request->rq_replen = lustre_msg_size(1, &size);
198 rc = ptlrpc_queue_wait(request);
199 rc = ptlrpc_check_status(request, rc);
203 body = lustre_msg_buf(request->rq_repmsg, 0);
204 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
206 memcpy(oa, &body->oa, sizeof(*oa));
210 ptlrpc_free_req(request);
214 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
216 struct ptlrpc_request *request;
217 struct ptlrpc_client *cl;
218 struct ptlrpc_connection *connection;
219 struct ost_body *body;
220 int rc, size = sizeof(*body);
223 osc_con2cl(conn, &cl, &connection);
224 request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
228 body = lustre_msg_buf(request->rq_reqmsg, 0);
229 memcpy(&body->oa, oa, sizeof(*oa));
230 body->connid = conn->oc_id;
232 request->rq_replen = lustre_msg_size(1, &size);
234 rc = ptlrpc_queue_wait(request);
235 rc = ptlrpc_check_status(request, rc);
239 ptlrpc_free_req(request);
243 static int osc_create(struct obd_conn *conn, struct obdo *oa)
245 struct ptlrpc_request *request;
246 struct ptlrpc_client *cl;
247 struct ptlrpc_connection *connection;
248 struct ost_body *body;
249 int rc, size = sizeof(*body);
256 osc_con2cl(conn, &cl, &connection);
257 request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
261 body = lustre_msg_buf(request->rq_reqmsg, 0);
262 memcpy(&body->oa, oa, sizeof(*oa));
263 body->oa.o_valid = ~0;
264 body->connid = conn->oc_id;
266 request->rq_replen = lustre_msg_size(1, &size);
268 rc = ptlrpc_queue_wait(request);
269 rc = ptlrpc_check_status(request, rc);
273 body = lustre_msg_buf(request->rq_repmsg, 0);
274 memcpy(oa, &body->oa, sizeof(*oa));
278 ptlrpc_free_req(request);
282 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
285 struct ptlrpc_request *request;
286 struct ptlrpc_client *cl;
287 struct ptlrpc_connection *connection;
288 struct ost_body *body;
289 int rc, size = sizeof(*body);
296 osc_con2cl(conn, &cl, &connection);
297 request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
301 body = lustre_msg_buf(request->rq_reqmsg, 0);
302 memcpy(&body->oa, oa, sizeof(*oa));
303 body->connid = conn->oc_id;
304 body->oa.o_valid = ~0;
305 body->oa.o_size = offset;
306 body->oa.o_blocks = count;
308 request->rq_replen = lustre_msg_size(1, &size);
310 rc = ptlrpc_queue_wait(request);
311 rc = ptlrpc_check_status(request, rc);
315 body = lustre_msg_buf(request->rq_repmsg, 0);
316 memcpy(oa, &body->oa, sizeof(*oa));
320 ptlrpc_free_req(request);
324 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
326 struct ptlrpc_request *request;
327 struct ptlrpc_client *cl;
328 struct ptlrpc_connection *connection;
329 struct ost_body *body;
330 int rc, size = sizeof(*body);
337 osc_con2cl(conn, &cl, &connection);
338 request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
342 body = lustre_msg_buf(request->rq_reqmsg, 0);
343 memcpy(&body->oa, oa, sizeof(*oa));
344 body->connid = conn->oc_id;
345 body->oa.o_valid = ~0;
347 request->rq_replen = lustre_msg_size(1, &size);
349 rc = ptlrpc_queue_wait(request);
350 rc = ptlrpc_check_status(request, rc);
354 body = lustre_msg_buf(request->rq_repmsg, 0);
355 memcpy(oa, &body->oa, sizeof(*oa));
359 ptlrpc_free_req(request);
363 static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
364 struct niobuf_remote *dst, struct niobuf_local *src)
366 struct ptlrpc_bulk_page *page;
369 page = ptlrpc_prep_bulk_page(desc);
373 page->b_buf = (void *)(unsigned long)src->addr;
374 page->b_buflen = src->len;
375 page->b_xid = dst->xid;
380 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
381 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
382 obd_size *count, obd_off *offset, obd_flag *flags)
384 struct ptlrpc_client *cl;
385 struct ptlrpc_connection *connection;
386 struct ptlrpc_request *request;
387 struct ost_body *body;
388 struct list_head *tmp;
389 int pages, rc, i, j, size[3] = {sizeof(*body)};
391 struct ptlrpc_bulk_desc *desc;
394 size[1] = num_oa * sizeof(struct obd_ioobj);
396 for (i = 0; i < num_oa; i++)
398 size[2] = pages * sizeof(struct niobuf_remote);
400 osc_con2cl(conn, &cl, &connection);
401 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
403 GOTO(out, rc = -ENOMEM);
405 body = lustre_msg_buf(request->rq_reqmsg, 0);
406 body->data = OBD_BRW_READ;
408 desc = ptlrpc_prep_bulk(connection);
410 GOTO(out2, rc = -ENOMEM);
411 desc->b_portal = OST_BULK_PORTAL;
413 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
414 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
415 for (pages = 0, i = 0; i < num_oa; i++) {
416 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
417 for (j = 0; j < oa_bufs[i]; j++, pages++) {
418 struct ptlrpc_bulk_page *bulk;
419 bulk = ptlrpc_prep_bulk_page(desc);
421 GOTO(out3, rc = -ENOMEM);
423 spin_lock(&connection->c_lock);
424 bulk->b_xid = ++connection->c_xid_out;
425 spin_unlock(&connection->c_lock);
427 bulk->b_buf = kmap(buf[pages]);
428 bulk->b_page = buf[pages];
429 bulk->b_buflen = PAGE_SIZE;
430 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
431 flags[pages], bulk->b_xid);
435 rc = ptlrpc_register_bulk(desc);
439 request->rq_replen = lustre_msg_size(1, size);
440 rc = ptlrpc_queue_wait(request);
441 rc = ptlrpc_check_status(request, rc);
443 ptlrpc_abort_bulk(desc);
447 list_for_each(tmp, &desc->b_page_list) {
448 struct ptlrpc_bulk_page *bulk;
449 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
450 if (bulk->b_buf != NULL)
451 kunmap(bulk->b_page);
453 ptlrpc_free_bulk(desc);
455 ptlrpc_free_req(request);
460 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
461 struct obdo **oa, obd_count *oa_bufs,
462 struct page **pagearray, obd_size *count, obd_off *offset,
465 struct ptlrpc_client *cl;
466 struct ptlrpc_connection *connection;
467 struct ptlrpc_request *request;
468 struct ptlrpc_bulk_desc *desc;
469 struct obd_ioobj ioo;
470 struct ost_body *body;
471 struct niobuf_local *local;
472 struct niobuf_remote *remote;
474 int rc, i, j, size[3] = {sizeof(*body)};
478 size[1] = num_oa * sizeof(ioo);
480 for (i = 0; i < num_oa; i++)
482 size[2] = pages * sizeof(*remote);
484 OBD_ALLOC(local, pages * sizeof(*local));
488 osc_con2cl(conn, &cl, &connection);
489 request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
491 GOTO(out, rc = -ENOMEM);
492 body = lustre_msg_buf(request->rq_reqmsg, 0);
493 body->data = OBD_BRW_WRITE;
495 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
496 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
497 for (pages = 0, i = 0; i < num_oa; i++) {
498 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
499 for (j = 0; j < oa_bufs[i]; j++, pages++) {
500 local[pages].addr = kmap(pagearray[pages]);
501 local[pages].offset = offset[pages];
502 local[pages].len = count[pages];
503 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
508 size[1] = pages * sizeof(struct niobuf_remote);
509 request->rq_replen = lustre_msg_size(2, size);
511 rc = ptlrpc_queue_wait(request);
512 rc = ptlrpc_check_status(request, rc);
516 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
518 GOTO(out2, rc = -EINVAL);
520 if (request->rq_repmsg->buflens[1] !=
521 pages * sizeof(struct niobuf_remote)) {
522 CERROR("buffer length wrong (%d vs. %ld)\n",
523 request->rq_repmsg->buflens[1],
524 pages * sizeof(struct niobuf_remote));
525 GOTO(out2, rc = -EINVAL);
528 desc = ptlrpc_prep_bulk(connection);
529 desc->b_portal = OSC_BULK_PORTAL;
531 for (pages = 0, i = 0; i < num_oa; i++) {
532 for (j = 0; j < oa_bufs[i]; j++, pages++) {
533 ost_unpack_niobuf(&ptr2, &remote);
534 rc = osc_sendpage(desc, remote, &local[pages]);
540 rc = ptlrpc_send_bulk(desc);
544 ptlrpc_free_bulk(desc);
546 ptlrpc_free_req(request);
547 for (pages = 0, i = 0; i < num_oa; i++)
548 for (j = 0; j < oa_bufs[i]; j++, pages++)
549 kunmap(pagearray[pages]);
551 OBD_FREE(local, pages * sizeof(*local));
556 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
557 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
558 obd_size *count, obd_off *offset, obd_flag *flags)
560 if (rw == OBD_BRW_READ)
561 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
564 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
568 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,
569 struct ldlm_handle *parent_lock, __u64 *res_id,
570 __u32 type, struct ldlm_extent *extent, __u32 mode,
571 int *flags, void *data, int datalen,
572 struct ldlm_handle *lockh)
574 struct ptlrpc_connection *conn;
575 struct ptlrpc_client *cl;
579 /* Filesystem locks are given a bit of special treatment: first we
580 * fixup the lock to start and end on page boundaries. */
581 extent->start &= PAGE_MASK;
582 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
584 /* Next, search for already existing extent locks that will cover us */
585 osc_con2dlmcl(oconn, &cl, &conn);
586 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode, lockh);
588 /* We already have a lock, and it's referenced */
592 /* Next, search for locks that we can upgrade (if we're trying to write)
593 * or are more than we need (if we're trying to read). Because the VFS
594 * and page cache already protect us locally, lots of readers/writers
595 * can share a single PW lock. */
601 rc = ldlm_local_lock_match(ns, res_id, type, extent, mode2, lockh);
604 struct ldlm_lock *lock = ldlm_handle2object(lockh);
605 /* FIXME: This is not incredibly elegant, but it might
606 * be more elegant than adding another parameter to
607 * lock_match. I want a second opinion. */
608 ldlm_lock_addref(lock, mode);
609 ldlm_lock_decref(lock, mode2);
614 rc = ldlm_cli_convert(cl, lockh, type, &flags);
621 rc = ldlm_cli_enqueue(cl, conn, ns, parent_lock, res_id, type,
622 extent, mode, flags, data, datalen, lockh);
626 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
627 struct ldlm_handle *lockh)
629 struct ldlm_lock *lock;
632 lock = ldlm_handle2object(lockh);
633 ldlm_lock_decref(lock, mode);
638 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
640 struct osc_obd *osc = &obddev->u.osc;
644 osc->osc_conn = ptlrpc_uuid_to_connection("ost");
648 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
649 if (osc->osc_client == NULL)
650 GOTO(out_conn, rc = -ENOMEM);
652 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
653 if (osc->osc_ldlm_client == NULL)
654 GOTO(out_client, rc = -ENOMEM);
656 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
658 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
659 osc->osc_ldlm_client);
660 osc->osc_client->cli_name = "osc";
661 osc->osc_ldlm_client->cli_name = "ldlm";
667 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
669 ptlrpc_put_connection(osc->osc_conn);
673 static int osc_cleanup(struct obd_device * obddev)
675 struct osc_obd *osc = &obddev->u.osc;
677 ptlrpc_cleanup_client(osc->osc_client);
678 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
679 ptlrpc_cleanup_client(osc->osc_ldlm_client);
680 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
681 ptlrpc_put_connection(osc->osc_conn);
687 struct obd_ops osc_obd_ops = {
689 o_cleanup: osc_cleanup,
690 o_create: osc_create,
691 o_destroy: osc_destroy,
692 o_getattr: osc_getattr,
693 o_setattr: osc_setattr,
696 o_connect: osc_connect,
697 o_disconnect: osc_disconnect,
700 o_enqueue: osc_enqueue,
704 static int __init osc_init(void)
706 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
710 static void __exit osc_exit(void)
712 obd_unregister_type(LUSTRE_OSC_NAME);
715 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
716 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
717 MODULE_LICENSE("GPL");
719 module_init(osc_init);
720 module_exit(osc_exit);