1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection)
31 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32 *cl = osc->osc_client;
33 *connection = osc->osc_conn;
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37 struct ptlrpc_connection **connection)
39 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40 *cl = osc->osc_ldlm_client;
41 *connection = osc->osc_conn;
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
46 struct osc_obd *osc = &obd->u.osc;
47 struct obd_import *import;
48 struct ptlrpc_request *request;
49 char *tmp = osc->osc_target_uuid;
50 int rc, size = sizeof(osc->osc_target_uuid);
53 OBD_ALLOC(import, sizeof(*import));
58 rc = class_connect(conn, obd);
62 request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
63 OST_CONNECT, 1, &size, &tmp);
65 GOTO(out_disco, rc = -ENOMEM);
67 request->rq_level = LUSTRE_CONN_NEW;
68 request->rq_replen = lustre_msg_size(0, NULL);
70 rc = ptlrpc_queue_wait(request);
71 rc = ptlrpc_check_status(request, rc);
73 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
77 /* XXX: Make this a handle */
78 osc->osc_connh.addr = request->rq_repmsg->addr;
79 osc->osc_connh.cookie = request->rq_repmsg->cookie;
83 ptlrpc_free_req(request);
86 class_disconnect(conn);
92 static int osc_disconnect(struct lustre_handle *conn)
94 struct ptlrpc_request *request;
95 struct ptlrpc_client *cl;
96 struct ptlrpc_connection *connection;
97 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
101 osc_con2cl(conn, &cl, &connection);
102 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
103 OST_DISCONNECT, 0, NULL, NULL);
106 request->rq_replen = lustre_msg_size(0, NULL);
108 rc = ptlrpc_queue_wait(request);
111 rc = class_disconnect(conn);
116 ptlrpc_free_req(request);
120 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
122 struct ptlrpc_request *request;
123 struct ptlrpc_client *cl;
124 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
125 struct ptlrpc_connection *connection;
126 struct ost_body *body;
127 int rc, size = sizeof(*body);
130 osc_con2cl(conn, &cl, &connection);
131 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
132 OST_GETATTR, 1, &size, NULL);
136 body = lustre_msg_buf(request->rq_reqmsg, 0);
137 memcpy(&body->oa, oa, sizeof(*oa));
138 body->oa.o_valid = ~0;
140 request->rq_replen = lustre_msg_size(1, &size);
142 rc = ptlrpc_queue_wait(request);
143 rc = ptlrpc_check_status(request, rc);
145 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
149 body = lustre_msg_buf(request->rq_repmsg, 0);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
152 memcpy(oa, &body->oa, sizeof(*oa));
156 ptlrpc_free_req(request);
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
161 struct lov_stripe_md *md)
163 struct ptlrpc_request *request;
164 struct ptlrpc_client *cl;
165 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
166 struct ptlrpc_connection *connection;
167 struct ost_body *body;
168 int rc, size = sizeof(*body);
171 osc_con2cl(conn, &cl, &connection);
172 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
173 OST_OPEN, 1, &size, NULL);
177 body = lustre_msg_buf(request->rq_reqmsg, 0);
178 memcpy(&body->oa, oa, sizeof(*oa));
179 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
181 request->rq_replen = lustre_msg_size(1, &size);
183 rc = ptlrpc_queue_wait(request);
184 rc = ptlrpc_check_status(request, rc);
188 body = lustre_msg_buf(request->rq_repmsg, 0);
189 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
191 memcpy(oa, &body->oa, sizeof(*oa));
195 ptlrpc_free_req(request);
199 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
200 struct lov_stripe_md *md)
202 struct ptlrpc_request *request;
203 struct ptlrpc_client *cl;
204 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
205 struct ptlrpc_connection *connection;
206 struct ost_body *body;
207 int rc, size = sizeof(*body);
210 osc_con2cl(conn, &cl, &connection);
211 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
212 OST_CLOSE, 1, &size, NULL);
216 oa->o_id = md->lmd_object_id;
217 oa->o_mode = S_IFREG;
218 oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
219 body = lustre_msg_buf(request->rq_reqmsg, 0);
220 memcpy(&body->oa, oa, sizeof(*oa));
222 request->rq_replen = lustre_msg_size(1, &size);
224 rc = ptlrpc_queue_wait(request);
225 rc = ptlrpc_check_status(request, rc);
229 body = lustre_msg_buf(request->rq_repmsg, 0);
230 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
232 memcpy(oa, &body->oa, sizeof(*oa));
236 ptlrpc_free_req(request);
240 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
242 struct ptlrpc_request *request;
243 struct ptlrpc_client *cl;
244 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
245 struct ptlrpc_connection *connection;
246 struct ost_body *body;
247 int rc, size = sizeof(*body);
250 osc_con2cl(conn, &cl, &connection);
251 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
252 OST_SETATTR, 1, &size, NULL);
256 body = lustre_msg_buf(request->rq_reqmsg, 0);
257 memcpy(&body->oa, oa, sizeof(*oa));
259 request->rq_replen = lustre_msg_size(1, &size);
261 rc = ptlrpc_queue_wait(request);
262 rc = ptlrpc_check_status(request, rc);
266 ptlrpc_free_req(request);
270 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
271 struct lov_stripe_md **ea)
273 struct ptlrpc_request *request;
274 struct ptlrpc_client *cl;
275 struct ptlrpc_connection *connection;
276 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
277 struct ost_body *body;
278 int rc, size = sizeof(*body);
291 OBD_ALLOC(*ea, oa->o_easize);
294 (*ea)->lmd_size = oa->o_easize;
297 osc_con2cl(conn, &cl, &connection);
298 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
299 OST_CREATE, 1, &size, NULL);
303 body = lustre_msg_buf(request->rq_reqmsg, 0);
304 memcpy(&body->oa, oa, sizeof(*oa));
306 request->rq_replen = lustre_msg_size(1, &size);
308 rc = ptlrpc_queue_wait(request);
309 rc = ptlrpc_check_status(request, rc);
313 body = lustre_msg_buf(request->rq_repmsg, 0);
314 memcpy(oa, &body->oa, sizeof(*oa));
316 (*ea)->lmd_object_id = oa->o_id;
317 (*ea)->lmd_stripe_count = 1;
320 ptlrpc_free_req(request);
324 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
325 struct lov_stripe_md *md, obd_size count,
328 struct ptlrpc_request *request;
329 struct ptlrpc_client *cl;
330 struct ptlrpc_connection *connection;
331 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
332 struct ost_body *body;
333 int rc, size = sizeof(*body);
340 osc_con2cl(conn, &cl, &connection);
341 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
342 OST_PUNCH, 1, &size, NULL);
346 body = lustre_msg_buf(request->rq_reqmsg, 0);
347 memcpy(&body->oa, oa, sizeof(*oa));
348 body->oa.o_blocks = count;
349 body->oa.o_valid |= OBD_MD_FLBLOCKS;
351 request->rq_replen = lustre_msg_size(1, &size);
353 rc = ptlrpc_queue_wait(request);
354 rc = ptlrpc_check_status(request, rc);
358 body = lustre_msg_buf(request->rq_repmsg, 0);
359 memcpy(oa, &body->oa, sizeof(*oa));
363 ptlrpc_free_req(request);
367 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
368 struct lov_stripe_md *ea)
370 struct ptlrpc_request *request;
371 struct ptlrpc_client *cl;
372 struct ptlrpc_connection *connection;
373 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
374 struct ost_body *body;
375 int rc, size = sizeof(*body);
382 osc_con2cl(conn, &cl, &connection);
383 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
384 OST_DESTROY, 1, &size, NULL);
388 body = lustre_msg_buf(request->rq_reqmsg, 0);
389 memcpy(&body->oa, oa, sizeof(*oa));
390 body->oa.o_valid = ~0;
392 request->rq_replen = lustre_msg_size(1, &size);
394 rc = ptlrpc_queue_wait(request);
395 rc = ptlrpc_check_status(request, rc);
399 body = lustre_msg_buf(request->rq_repmsg, 0);
400 memcpy(oa, &body->oa, sizeof(*oa));
404 ptlrpc_free_req(request);
408 struct osc_brw_cb_data {
410 bulk_callback_t callback;
416 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
418 struct osc_brw_cb_data *cb_data = data;
422 if (desc->b_flags & PTL_RPC_FL_INTR)
423 CERROR("got signal\n");
425 for (i = 0; i < desc->b_page_count; i++)
426 kunmap(cb_data->buf[i]);
428 if (cb_data->callback)
429 (cb_data->callback)(desc, cb_data->cb_data);
431 ptlrpc_bulk_decref(desc);
432 if (cb_data->obd_data)
433 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
434 OBD_FREE(cb_data, sizeof(*cb_data));
438 static int osc_brw_read(struct lustre_handle *conn,
439 struct lov_stripe_md *md, obd_count page_count,
440 struct page **page_array,
441 obd_size *count, obd_off *offset, obd_flag *flags,
442 bulk_callback_t callback)
444 struct ptlrpc_client *cl;
445 struct ptlrpc_connection *connection;
446 struct ptlrpc_request *request = NULL;
447 struct ptlrpc_bulk_desc *desc = NULL;
448 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
449 struct ost_body *body;
450 int rc, j, size[3] = {sizeof(*body)};
451 void *iooptr, *nioptr;
452 struct osc_brw_cb_data *cb_data = NULL;
456 size[1] = sizeof(struct obd_ioobj);
457 size[2] = page_count * sizeof(struct niobuf_remote);
459 osc_con2cl(conn, &cl, &connection);
460 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
461 OST_BRW, 3, size, NULL);
465 body = lustre_msg_buf(request->rq_reqmsg, 0);
466 body->data = OBD_BRW_READ;
468 desc = ptlrpc_prep_bulk(connection);
470 GOTO(out_free, rc = -ENOMEM);
471 desc->b_portal = OST_BULK_PORTAL;
472 desc->b_cb = brw_finish;
473 OBD_ALLOC(cb_data, sizeof(*cb_data));
475 GOTO(out_free, rc = -ENOMEM);
477 cb_data->callback = callback;
478 desc->b_cb_data = cb_data;
479 /* XXX end almost identical to brw_write case */
481 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
482 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
483 ost_pack_ioo(&iooptr, md, page_count);
484 for (j = 0; j < page_count; j++) {
485 struct ptlrpc_bulk_page *bulk;
486 bulk = ptlrpc_prep_bulk_page(desc);
488 GOTO(out_unmap, rc = -ENOMEM);
490 spin_lock(&connection->c_lock);
491 bulk->b_xid = ++connection->c_xid_out;
492 spin_unlock(&connection->c_lock);
494 bulk->b_buf = kmap(page_array[j]);
495 bulk->b_page = page_array[j];
496 bulk->b_buflen = PAGE_SIZE;
497 ost_pack_niobuf(&nioptr, offset[j], count[j],
498 flags[j], bulk->b_xid);
502 * Register the bulk first, because the reply could arrive out of order,
503 * and we want to be ready for the bulk data.
505 * One reference is released by the bulk callback, the other when
506 * we finish sleeping on it (if we don't have a callback).
508 atomic_set(&desc->b_refcount, callback ? 1 : 2);
509 rc = ptlrpc_register_bulk(desc);
513 request->rq_replen = lustre_msg_size(1, size);
514 rc = ptlrpc_queue_wait(request);
515 rc = ptlrpc_check_status(request, rc);
517 ptlrpc_bulk_decref(desc);
521 /* Callbacks cause asynchronous handling. */
525 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
526 ptlrpc_bulk_decref(desc);
527 if (desc->b_flags & PTL_RPC_FL_INTR)
532 /* Clean up on error. */
534 for (j = 0; j < desc->b_page_count; j++)
535 kunmap(pagearray[j]);
538 OBD_FREE(cb_data, sizeof(*cb_data));
539 ptlrpc_free_bulk(desc);
540 ptlrpc_free_req(request);
544 static int osc_brw_write(struct lustre_handle *conn,
545 struct lov_stripe_md *md, obd_count page_count,
546 struct page **pagearray, obd_size *count,
547 obd_off *offset, obd_flag *flags,
548 bulk_callback_t callback)
550 struct ptlrpc_client *cl;
551 struct ptlrpc_connection *connection;
552 struct ptlrpc_request *request = NULL;
553 struct ptlrpc_bulk_desc *desc = NULL;
554 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
555 struct ost_body *body;
556 struct niobuf_local *local = NULL;
557 struct niobuf_remote *remote;
558 struct osc_brw_cb_data *cb_data = NULL;
559 int rc, j, size[3] = {sizeof(*body)};
560 void *iooptr, *nioptr;
563 size[1] = sizeof(struct obd_ioobj);
564 size[2] = page_count * sizeof(*remote);
566 osc_con2cl(conn, &cl, &connection);
567 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
568 OST_BRW, 3, size, NULL);
572 body = lustre_msg_buf(request->rq_reqmsg, 0);
573 body->data = OBD_BRW_WRITE;
575 OBD_ALLOC(local, page_count * sizeof(*local));
577 GOTO(out_free, rc = -ENOMEM);
579 desc = ptlrpc_prep_bulk(connection);
581 GOTO(out_free, rc = -ENOMEM);
582 desc->b_portal = OSC_BULK_PORTAL;
583 desc->b_cb = brw_finish;
584 OBD_ALLOC(cb_data, sizeof(*cb_data));
586 GOTO(out_free, rc = -ENOMEM);
587 cb_data->buf = pagearray;
588 cb_data->callback = callback;
589 desc->b_cb_data = cb_data;
590 /* XXX end almost identical to brw_read case */
591 cb_data->obd_data = local;
592 cb_data->obd_size = page_count * sizeof(*local);
594 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
595 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
596 ost_pack_ioo(&iooptr, md, page_count);
597 for (j = 0; j < page_count; j++) {
598 local[j].addr = kmap(pagearray[j]);
599 local[j].offset = offset[j];
600 local[j].len = count[j];
601 ost_pack_niobuf(&nioptr, offset[j], count[j],
605 size[1] = page_count * sizeof(struct niobuf_remote);
606 request->rq_replen = lustre_msg_size(2, size);
607 rc = ptlrpc_queue_wait(request);
608 rc = ptlrpc_check_status(request, rc);
612 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
614 GOTO(out_unmap, rc = -EINVAL);
616 if (request->rq_repmsg->buflens[1] !=
617 page_count * sizeof(struct niobuf_remote)) {
618 CERROR("buffer length wrong (%d vs. %d)\n",
619 request->rq_repmsg->buflens[1],
620 page_count * sizeof(struct niobuf_remote));
621 GOTO(out_unmap, rc = -EINVAL);
625 for (j = 0; j < page_count; j++) {
626 struct ptlrpc_bulk_page *page;
628 ost_unpack_niobuf(&nioptr, &remote);
630 page = ptlrpc_prep_bulk_page(desc);
632 GOTO(out_unmap, rc = -ENOMEM);
634 page->b_buf = (void *)(unsigned long)local[j].addr;
635 page->b_buflen = local[j].len;
636 page->b_xid = remote->xid;
639 if (desc->b_page_count != page_count)
643 * One is released when the bulk is complete, the other when we finish
644 * waiting on it. (Callback cases don't sleep, so only one ref for
647 atomic_set(&desc->b_refcount, callback ? 1 : 2);
648 CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
649 atomic_read(&desc->b_refcount));
650 rc = ptlrpc_send_bulk(desc);
654 /* Callbacks cause asynchronous handling. */
658 /* If there's no callback function, sleep here until complete. */
659 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
660 ptlrpc_bulk_decref(desc);
661 if (desc->b_flags & PTL_RPC_FL_INTR)
665 /* Clean up on error. */
667 for (j = 0; j < page_count; j++)
668 kunmap(pagearray[j]);
672 OBD_FREE(cb_data, sizeof(*cb_data));
674 OBD_FREE(local, page_count * sizeof(*local));
675 ptlrpc_free_bulk(desc);
676 ptlrpc_req_finished(request);
680 static int osc_brw(int cmd, struct lustre_handle *conn,
681 struct lov_stripe_md *md, obd_count page_count,
682 struct page **page_array,
688 if (cmd & OBD_BRW_WRITE)
689 return osc_brw_write(conn, md, page_count, page_array, count,
690 offset, flags, (bulk_callback_t)callback);
692 return osc_brw_read(conn, md, page_count, page_array, count,
693 offset, flags, (bulk_callback_t)callback);
696 static int osc_enqueue(struct lustre_handle *oconn,
697 struct lustre_handle *parent_lock, __u64 *res_id,
698 __u32 type, void *extentp, int extent_len, __u32 mode,
699 int *flags, void *callback, void *data, int datalen,
700 struct lustre_handle *lockh)
702 struct obd_device *obddev = class_conn2obd(oconn);
703 struct osc_obd *osc = &obddev->u.osc;
704 struct ptlrpc_connection *conn;
705 struct ptlrpc_client *cl;
706 struct ldlm_extent *extent = extentp;
710 /* Filesystem locks are given a bit of special treatment: first we
711 * fixup the lock to start and end on page boundaries. */
712 extent->start &= PAGE_MASK;
713 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
715 /* Next, search for already existing extent locks that will cover us */
716 osc_con2dlmcl(oconn, &cl, &conn);
717 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
718 sizeof(extent), mode, lockh);
720 /* We already have a lock, and it's referenced */
724 /* Next, search for locks that we can upgrade (if we're trying to write)
725 * or are more than we need (if we're trying to read). Because the VFS
726 * and page cache already protect us locally, lots of readers/writers
727 * can share a single PW lock. */
733 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
734 sizeof(extent), mode2, lockh);
737 /* FIXME: This is not incredibly elegant, but it might
738 * be more elegant than adding another parameter to
739 * lock_match. I want a second opinion. */
740 ldlm_lock_addref(lockh, mode);
741 ldlm_lock_decref(lockh, mode2);
746 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh,
754 rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh,
755 NULL, obddev->obd_namespace,
756 parent_lock, res_id, type, extent, sizeof(extent),
757 mode, flags, callback, data, datalen, lockh);
761 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
762 struct lustre_handle *lockh)
766 ldlm_lock_decref(lockh, mode);
771 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
773 struct obd_ioctl_data* data = buf;
774 struct osc_obd *osc = &obddev->u.osc;
775 char server_uuid[37];
779 if (data->ioc_inllen1 < 1) {
780 CERROR("osc setup requires a TARGET UUID\n");
784 if (data->ioc_inllen1 > 37) {
785 CERROR("osc TARGET UUID must be less than 38 characters\n");
789 if (data->ioc_inllen2 < 1) {
790 CERROR("osc setup requires a SERVER UUID\n");
794 if (data->ioc_inllen2 > 37) {
795 CERROR("osc SERVER UUID must be less than 38 characters\n");
799 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
800 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
801 sizeof(server_uuid)));
803 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
807 obddev->obd_namespace =
808 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
809 if (obddev->obd_namespace == NULL)
810 GOTO(out_conn, rc = -ENOMEM);
812 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
813 if (osc->osc_client == NULL)
814 GOTO(out_ns, rc = -ENOMEM);
816 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
817 if (osc->osc_ldlm_client == NULL)
818 GOTO(out_client, rc = -ENOMEM);
820 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
822 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
823 osc->osc_ldlm_client);
824 osc->osc_client->cli_name = "osc";
825 osc->osc_ldlm_client->cli_name = "ldlm";
831 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
833 ldlm_namespace_free(obddev->obd_namespace);
835 ptlrpc_put_connection(osc->osc_conn);
839 static int osc_cleanup(struct obd_device * obddev)
841 struct osc_obd *osc = &obddev->u.osc;
843 ldlm_namespace_free(obddev->obd_namespace);
845 ptlrpc_cleanup_client(osc->osc_client);
846 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
847 ptlrpc_cleanup_client(osc->osc_ldlm_client);
848 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
849 ptlrpc_put_connection(osc->osc_conn);
855 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
857 struct ptlrpc_request *request;
858 struct ptlrpc_client *cl;
859 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
860 struct ptlrpc_connection *connection;
861 struct obd_statfs *osfs;
862 int rc, size = sizeof(*osfs);
865 osc_con2cl(conn, &cl, &connection);
866 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
867 OST_STATFS, 0, NULL, NULL);
871 request->rq_replen = lustre_msg_size(1, &size);
873 rc = ptlrpc_queue_wait(request);
874 rc = ptlrpc_check_status(request, rc);
876 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
880 osfs = lustre_msg_buf(request->rq_repmsg, 0);
881 obd_statfs_unpack(osfs, sfs);
885 ptlrpc_free_req(request);
889 struct obd_ops osc_obd_ops = {
891 o_cleanup: osc_cleanup,
892 o_statfs: osc_statfs,
893 o_create: osc_create,
894 o_destroy: osc_destroy,
895 o_getattr: osc_getattr,
896 o_setattr: osc_setattr,
899 o_connect: osc_connect,
900 o_disconnect: osc_disconnect,
903 o_enqueue: osc_enqueue,
907 static int __init osc_init(void)
909 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
912 static void __exit osc_exit(void)
914 class_unregister_type(LUSTRE_OSC_NAME);
917 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
918 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
919 MODULE_LICENSE("GPL");
921 module_init(osc_init);
922 module_exit(osc_exit);