1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection)
31 struct osc_obd *osc = &obd->u.osc;
32 *cl = osc->osc_client;
33 *connection = osc->osc_conn;
36 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
37 struct ptlrpc_connection **connection)
39 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
40 *cl = osc->osc_client;
41 *connection = osc->osc_conn;
44 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
45 struct ptlrpc_connection **connection)
47 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
48 *cl = osc->osc_ldlm_client;
49 *connection = osc->osc_conn;
52 static int osc_connect(struct obd_conn *conn, struct obd_device *obd)
54 struct osc_obd *osc = &obd->u.osc;
55 struct obd_import *import;
56 struct ptlrpc_request *request;
57 struct ptlrpc_client *cl;
58 struct ptlrpc_connection *connection;
59 struct ost_body *body;
60 char *tmp = osc->osc_target_uuid;
61 int rc, size = sizeof(osc->osc_target_uuid);
64 OBD_ALLOC(import, sizeof(*import));
69 rc = gen_connect(conn, obd);
73 osc_obd2cl(obd, &cl, &connection);
74 request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 1, &size, &tmp);
79 request->rq_replen = lustre_msg_size(1, &size);
81 rc = ptlrpc_queue_wait(request);
82 rc = ptlrpc_check_status(request, rc);
84 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
88 body = lustre_msg_buf(request->rq_repmsg, 0);
89 CDEBUG(D_INODE, "received connid %d\n", body->connid);
92 /* XXX: Make this a handle */
93 osc->osc_connh.addr = request->rq_repmsg->addr;
94 osc->osc_connh.cookie = request->rq_repmsg->cookie;
95 /* This might be redundant. */
96 cl->cli_target_devno = request->rq_repmsg->target_id;
97 osc->osc_ldlm_client->cli_target_devno = cl->cli_target_devno;
98 conn->oc_id = body->connid;
101 ptlrpc_free_req(request);
107 static int osc_disconnect(struct obd_conn *conn)
109 struct ptlrpc_request *request;
110 struct ptlrpc_client *cl;
111 struct ptlrpc_connection *connection;
112 struct ost_body *body;
113 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
114 int rc, size = sizeof(*body);
117 osc_con2cl(conn, &cl, &connection);
118 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
119 OST_DISCONNECT, 1, &size, NULL);
123 body = lustre_msg_buf(request->rq_reqmsg, 0);
124 body->connid = conn->oc_id;
126 request->rq_replen = lustre_msg_size(0, NULL);
128 rc = ptlrpc_queue_wait(request);
131 rc = gen_disconnect(conn);
136 ptlrpc_free_req(request);
140 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
142 struct ptlrpc_request *request;
143 struct ptlrpc_client *cl;
144 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
145 struct ptlrpc_connection *connection;
146 struct ost_body *body;
147 int rc, size = sizeof(*body);
150 osc_con2cl(conn, &cl, &connection);
151 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
152 OST_GETATTR, 1, &size, NULL);
156 body = lustre_msg_buf(request->rq_reqmsg, 0);
157 memcpy(&body->oa, oa, sizeof(*oa));
158 body->connid = conn->oc_id;
159 body->oa.o_valid = ~0;
161 request->rq_replen = lustre_msg_size(1, &size);
163 rc = ptlrpc_queue_wait(request);
164 rc = ptlrpc_check_status(request, rc);
166 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
170 body = lustre_msg_buf(request->rq_repmsg, 0);
171 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
173 memcpy(oa, &body->oa, sizeof(*oa));
177 ptlrpc_free_req(request);
181 static int osc_open(struct obd_conn *conn, struct obdo *oa)
183 struct ptlrpc_request *request;
184 struct ptlrpc_client *cl;
185 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
186 struct ptlrpc_connection *connection;
187 struct ost_body *body;
188 int rc, size = sizeof(*body);
191 osc_con2cl(conn, &cl, &connection);
192 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
193 OST_OPEN, 1, &size, NULL);
197 body = lustre_msg_buf(request->rq_reqmsg, 0);
198 memcpy(&body->oa, oa, sizeof(*oa));
199 body->connid = conn->oc_id;
200 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
202 request->rq_replen = lustre_msg_size(1, &size);
204 rc = ptlrpc_queue_wait(request);
205 rc = ptlrpc_check_status(request, rc);
209 body = lustre_msg_buf(request->rq_repmsg, 0);
210 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
212 memcpy(oa, &body->oa, sizeof(*oa));
216 ptlrpc_free_req(request);
220 static int osc_close(struct obd_conn *conn, struct obdo *oa)
222 struct ptlrpc_request *request;
223 struct ptlrpc_client *cl;
224 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
225 struct ptlrpc_connection *connection;
226 struct ost_body *body;
227 int rc, size = sizeof(*body);
230 osc_con2cl(conn, &cl, &connection);
231 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
232 OST_CLOSE, 1, &size, NULL);
236 body = lustre_msg_buf(request->rq_reqmsg, 0);
237 memcpy(&body->oa, oa, sizeof(*oa));
238 body->connid = conn->oc_id;
240 request->rq_replen = lustre_msg_size(1, &size);
242 rc = ptlrpc_queue_wait(request);
243 rc = ptlrpc_check_status(request, rc);
247 body = lustre_msg_buf(request->rq_repmsg, 0);
248 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
250 memcpy(oa, &body->oa, sizeof(*oa));
254 ptlrpc_free_req(request);
258 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
260 struct ptlrpc_request *request;
261 struct ptlrpc_client *cl;
262 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
263 struct ptlrpc_connection *connection;
264 struct ost_body *body;
265 int rc, size = sizeof(*body);
268 osc_con2cl(conn, &cl, &connection);
269 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
270 OST_SETATTR, 1, &size, NULL);
274 body = lustre_msg_buf(request->rq_reqmsg, 0);
275 memcpy(&body->oa, oa, sizeof(*oa));
276 body->connid = conn->oc_id;
278 request->rq_replen = lustre_msg_size(1, &size);
280 rc = ptlrpc_queue_wait(request);
281 rc = ptlrpc_check_status(request, rc);
285 ptlrpc_free_req(request);
289 static int osc_create(struct obd_conn *conn, struct obdo *oa)
291 struct ptlrpc_request *request;
292 struct ptlrpc_client *cl;
293 struct ptlrpc_connection *connection;
294 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
295 struct ost_body *body;
296 struct mds_objid *objid;
297 struct lov_object_id *lov_id;
298 int rc, size = sizeof(*body);
305 osc_con2cl(conn, &cl, &connection);
306 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
307 OST_CREATE, 1, &size, NULL);
311 body = lustre_msg_buf(request->rq_reqmsg, 0);
312 memcpy(&body->oa, oa, sizeof(*oa));
313 body->connid = conn->oc_id;
315 request->rq_replen = lustre_msg_size(1, &size);
317 rc = ptlrpc_queue_wait(request);
318 rc = ptlrpc_check_status(request, rc);
322 body = lustre_msg_buf(request->rq_repmsg, 0);
323 memcpy(oa, &body->oa, sizeof(*oa));
325 memset(oa->o_inline, 0, sizeof(oa->o_inline));
326 objid = (struct mds_objid *)oa->o_inline;
327 objid->mo_lov_md.lmd_object_id = oa->o_id;
328 objid->mo_lov_md.lmd_stripe_count = 1;
329 lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
330 lov_id->l_device_id = 0;
331 lov_id->l_object_id = oa->o_id;
335 ptlrpc_free_req(request);
339 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
342 struct ptlrpc_request *request;
343 struct ptlrpc_client *cl;
344 struct ptlrpc_connection *connection;
345 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
346 struct ost_body *body;
347 int rc, size = sizeof(*body);
354 osc_con2cl(conn, &cl, &connection);
355 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
356 OST_PUNCH, 1, &size, NULL);
360 body = lustre_msg_buf(request->rq_reqmsg, 0);
361 memcpy(&body->oa, oa, sizeof(*oa));
362 body->connid = conn->oc_id;
363 body->oa.o_blocks = count;
364 body->oa.o_valid |= OBD_MD_FLBLOCKS;
366 request->rq_replen = lustre_msg_size(1, &size);
368 rc = ptlrpc_queue_wait(request);
369 rc = ptlrpc_check_status(request, rc);
373 body = lustre_msg_buf(request->rq_repmsg, 0);
374 memcpy(oa, &body->oa, sizeof(*oa));
378 ptlrpc_free_req(request);
382 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
384 struct ptlrpc_request *request;
385 struct ptlrpc_client *cl;
386 struct ptlrpc_connection *connection;
387 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
388 struct ost_body *body;
389 int rc, size = sizeof(*body);
396 osc_con2cl(conn, &cl, &connection);
397 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
398 OST_DESTROY, 1, &size, NULL);
402 body = lustre_msg_buf(request->rq_reqmsg, 0);
403 memcpy(&body->oa, oa, sizeof(*oa));
404 body->connid = conn->oc_id;
405 body->oa.o_valid = ~0;
407 request->rq_replen = lustre_msg_size(1, &size);
409 rc = ptlrpc_queue_wait(request);
410 rc = ptlrpc_check_status(request, rc);
414 body = lustre_msg_buf(request->rq_repmsg, 0);
415 memcpy(oa, &body->oa, sizeof(*oa));
419 ptlrpc_free_req(request);
423 struct osc_brw_cb_data {
425 struct ptlrpc_request *req;
426 bulk_callback_t callback;
430 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
432 struct osc_brw_cb_data *cb_data = data;
434 if (desc->b_flags & PTL_RPC_FL_INTR)
435 CERROR("got signal\n");
437 (cb_data->callback)(desc, cb_data->cb_data);
439 ptlrpc_free_bulk(desc);
440 ptlrpc_free_req(cb_data->req);
442 OBD_FREE(cb_data, sizeof(*cb_data));
445 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
446 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
447 obd_size *count, obd_off *offset, obd_flag *flags,
448 bulk_callback_t callback)
450 struct ptlrpc_client *cl;
451 struct ptlrpc_connection *connection;
452 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
453 struct ptlrpc_request *request;
454 struct ost_body *body;
455 struct list_head *tmp;
456 int pages, rc, i, j, size[3] = {sizeof(*body)};
458 struct ptlrpc_bulk_desc *desc;
461 size[1] = num_oa * sizeof(struct obd_ioobj);
463 for (i = 0; i < num_oa; i++)
465 size[2] = pages * sizeof(struct niobuf_remote);
467 osc_con2cl(conn, &cl, &connection);
468 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
469 OST_BRW, 3, size, NULL);
471 GOTO(out, rc = -ENOMEM);
473 body = lustre_msg_buf(request->rq_reqmsg, 0);
474 body->data = OBD_BRW_READ;
476 desc = ptlrpc_prep_bulk(connection);
478 GOTO(out2, rc = -ENOMEM);
479 desc->b_portal = OST_BULK_PORTAL;
481 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
482 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
483 for (pages = 0, i = 0; i < num_oa; i++) {
484 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
485 /* FIXME: this inner loop is wrong for multiple OAs */
486 for (j = 0; j < oa_bufs[i]; j++, pages++) {
487 struct ptlrpc_bulk_page *bulk;
488 bulk = ptlrpc_prep_bulk_page(desc);
490 GOTO(out3, rc = -ENOMEM);
492 spin_lock(&connection->c_lock);
493 bulk->b_xid = ++connection->c_xid_out;
494 spin_unlock(&connection->c_lock);
496 bulk->b_buf = kmap(buf[pages]);
497 bulk->b_page = buf[pages];
498 bulk->b_buflen = PAGE_SIZE;
499 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
500 flags[pages], bulk->b_xid);
504 rc = ptlrpc_register_bulk(desc);
508 request->rq_replen = lustre_msg_size(1, size);
509 rc = ptlrpc_queue_wait(request);
510 rc = ptlrpc_check_status(request, rc);
512 ptlrpc_abort_bulk(desc);
516 list_for_each(tmp, &desc->b_page_list) {
517 struct ptlrpc_bulk_page *bulk;
518 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
519 if (bulk->b_page != NULL)
520 kunmap(bulk->b_page);
522 ptlrpc_free_bulk(desc);
524 ptlrpc_free_req(request);
529 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
531 struct osc_brw_cb_data *cb_data = data;
535 if (desc->b_flags & PTL_RPC_FL_INTR)
536 CERROR("got signal\n");
538 for (i = 0; i < desc->b_page_count; i++)
539 kunmap(cb_data->buf[i]);
541 (cb_data->callback)(desc, cb_data->cb_data);
543 ptlrpc_free_bulk(desc);
544 ptlrpc_free_req(cb_data->req);
546 OBD_FREE(cb_data, sizeof(*cb_data));
550 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
551 struct obdo **oa, obd_count *oa_bufs,
552 struct page **pagearray, obd_size *count,
553 obd_off *offset, obd_flag *flags,
554 bulk_callback_t callback)
556 struct ptlrpc_client *cl;
557 struct ptlrpc_connection *connection;
558 struct ptlrpc_request *request;
559 struct ptlrpc_bulk_desc *desc;
560 struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
561 struct obd_ioobj ioo;
562 struct ost_body *body;
563 struct niobuf_local *local;
564 struct niobuf_remote *remote;
565 struct osc_brw_cb_data *cb_data;
567 int rc, i, j, size[3] = {sizeof(*body)};
571 size[1] = num_oa * sizeof(ioo);
573 for (i = 0; i < num_oa; i++)
575 size[2] = pages * sizeof(*remote);
577 OBD_ALLOC(local, pages * sizeof(*local));
581 osc_con2cl(conn, &cl, &connection);
582 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
583 OST_BRW, 3, size, NULL);
585 GOTO(out, rc = -ENOMEM);
586 body = lustre_msg_buf(request->rq_reqmsg, 0);
587 body->data = OBD_BRW_WRITE;
589 ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
590 ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
591 for (pages = 0, i = 0; i < num_oa; i++) {
592 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
593 for (j = 0; j < oa_bufs[i]; j++, pages++) {
594 local[pages].addr = kmap(pagearray[pages]);
595 local[pages].offset = offset[pages];
596 local[pages].len = count[pages];
597 ost_pack_niobuf(&ptr2, offset[pages], count[pages],
602 size[1] = pages * sizeof(struct niobuf_remote);
603 request->rq_replen = lustre_msg_size(2, size);
605 rc = ptlrpc_queue_wait(request);
606 rc = ptlrpc_check_status(request, rc);
610 ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
612 GOTO(out2, rc = -EINVAL);
614 if (request->rq_repmsg->buflens[1] !=
615 pages * sizeof(struct niobuf_remote)) {
616 CERROR("buffer length wrong (%d vs. %ld)\n",
617 request->rq_repmsg->buflens[1],
618 pages * sizeof(struct niobuf_remote));
619 GOTO(out2, rc = -EINVAL);
622 desc = ptlrpc_prep_bulk(connection);
623 desc->b_portal = OSC_BULK_PORTAL;
625 desc->b_cb = brw_write_finish;
626 OBD_ALLOC(cb_data, sizeof(*cb_data));
627 cb_data->buf = pagearray;
628 cb_data->callback = callback;
629 desc->b_cb_data = cb_data;
632 for (pages = 0, i = 0; i < num_oa; i++) {
633 for (j = 0; j < oa_bufs[i]; j++, pages++) {
634 struct ptlrpc_bulk_page *page;
636 ost_unpack_niobuf(&ptr2, &remote);
638 page = ptlrpc_prep_bulk_page(desc);
640 GOTO(out3, rc = -ENOMEM);
642 page->b_buf = (void *)(unsigned long)local[pages].addr;
643 page->b_buflen = local[pages].len;
644 page->b_xid = remote->xid;
648 if (desc->b_page_count != pages)
651 rc = ptlrpc_send_bulk(desc);
657 /* If there's no callback function, sleep here until complete. */
658 wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
659 if (desc->b_flags & PTL_RPC_FL_INTR)
665 ptlrpc_free_bulk(desc);
667 ptlrpc_free_req(request);
668 for (pages = 0, i = 0; i < num_oa; i++)
669 for (j = 0; j < oa_bufs[i]; j++, pages++)
670 kunmap(pagearray[pages]);
672 OBD_FREE(local, pages * sizeof(*local));
677 static int osc_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
678 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
679 obd_size *count, obd_off *offset, obd_flag *flags,
685 if (cmd & OBD_BRW_WRITE)
686 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
687 offset, flags, (bulk_callback_t)callback);
689 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
690 offset, flags, (bulk_callback_t)callback);
693 static int osc_enqueue(struct obd_conn *oconn,
694 struct lustre_handle *parent_lock, __u64 *res_id,
695 __u32 type, void *extentp, int extent_len, __u32 mode,
696 int *flags, void *callback, void *data, int datalen,
697 struct lustre_handle *lockh)
699 struct obd_device *obddev = gen_conn2obd(oconn);
700 struct ptlrpc_connection *conn;
701 struct osc_obd *osc = &gen_conn2obd(oconn)->u.osc;
702 struct ptlrpc_client *cl;
703 struct ldlm_extent *extent = extentp;
707 /* Filesystem locks are given a bit of special treatment: first we
708 * fixup the lock to start and end on page boundaries. */
709 extent->start &= PAGE_MASK;
710 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
712 /* Next, search for already existing extent locks that will cover us */
713 osc_con2dlmcl(oconn, &cl, &conn);
714 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
715 sizeof(extent), mode, lockh);
717 /* We already have a lock, and it's referenced */
721 /* Next, search for locks that we can upgrade (if we're trying to write)
722 * or are more than we need (if we're trying to read). Because the VFS
723 * and page cache already protect us locally, lots of readers/writers
724 * can share a single PW lock. */
730 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
731 sizeof(extent), mode2, lockh);
734 /* FIXME: This is not incredibly elegant, but it might
735 * be more elegant than adding another parameter to
736 * lock_match. I want a second opinion. */
737 ldlm_lock_addref(lockh, mode);
738 ldlm_lock_decref(lockh, mode2);
743 rc = ldlm_cli_convert(cl, lockh, mode, &flags);
750 rc = ldlm_cli_enqueue(cl, conn, NULL, obddev->obd_namespace,
751 parent_lock, res_id, type, extent, sizeof(extent),
752 mode, flags, callback, data, datalen, lockh);
756 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
757 struct lustre_handle *lockh)
761 ldlm_lock_decref(lockh, mode);
766 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
768 struct obd_ioctl_data* data = buf;
769 struct osc_obd *osc = &obddev->u.osc;
770 char server_uuid[37];
774 if (data->ioc_inllen1 < 1) {
775 CERROR("osc setup requires a TARGET UUID\n");
779 if (data->ioc_inllen1 > 37) {
780 CERROR("osc TARGET UUID must be less than 38 characters\n");
784 if (data->ioc_inllen2 < 1) {
785 CERROR("osc setup requires a SERVER UUID\n");
789 if (data->ioc_inllen2 > 37) {
790 CERROR("osc SERVER UUID must be less than 38 characters\n");
794 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
795 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
796 sizeof(server_uuid)));
798 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
802 obddev->obd_namespace =
803 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
804 if (obddev->obd_namespace == NULL)
805 GOTO(out_conn, rc = -ENOMEM);
807 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
808 if (osc->osc_client == NULL)
809 GOTO(out_ns, rc = -ENOMEM);
811 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
812 if (osc->osc_ldlm_client == NULL)
813 GOTO(out_client, rc = -ENOMEM);
815 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
817 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
818 osc->osc_ldlm_client);
819 osc->osc_client->cli_name = "osc";
820 osc->osc_ldlm_client->cli_name = "ldlm";
826 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
828 ldlm_namespace_free(obddev->obd_namespace);
830 ptlrpc_put_connection(osc->osc_conn);
834 static int osc_cleanup(struct obd_device * obddev)
836 struct osc_obd *osc = &obddev->u.osc;
838 ldlm_namespace_free(obddev->obd_namespace);
840 ptlrpc_cleanup_client(osc->osc_client);
841 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
842 ptlrpc_cleanup_client(osc->osc_ldlm_client);
843 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
844 ptlrpc_put_connection(osc->osc_conn);
851 static int osc_statfs(struct obd_conn *conn, struct statfs *sfs);
853 struct ptlrpc_request *request;
854 struct ptlrpc_client *cl;
855 struct ptlrpc_connection *connection;
856 struct obd_statfs *osfs;
857 int rc, size = sizeof(*osfs);
860 osc_con2cl(conn, &cl, &connection);
861 request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
865 request->rq_replen = lustre_msg_size(1, &size);
867 rc = ptlrpc_queue_wait(request);
868 rc = ptlrpc_check_status(request, rc);
872 osfs = lustre_msg_buf(request->rq_repmsg, 0);
873 obd_statfs_unpack(sfs, osfs);
877 ptlrpc_free_req(request);
882 struct obd_ops osc_obd_ops = {
884 o_cleanup: osc_cleanup,
885 o_create: osc_create,
886 o_destroy: osc_destroy,
887 o_getattr: osc_getattr,
888 o_setattr: osc_setattr,
891 o_connect: osc_connect,
892 o_disconnect: osc_disconnect,
895 o_enqueue: osc_enqueue,
899 static int __init osc_init(void)
901 return obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
904 static void __exit osc_exit(void)
906 obd_unregister_type(LUSTRE_OSC_NAME);
909 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
910 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
911 MODULE_LICENSE("GPL");
913 module_init(osc_init);
914 module_exit(osc_exit);