1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection,
30 struct lustre_handle **rconn)
32 struct obd_export *export = class_conn2export(conn);
33 struct osc_obd *osc = &export->export_obd->u.osc;
34 *cl = osc->osc_client;
35 *connection = osc->osc_conn;
36 *rconn = &export->export_import;
39 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
40 struct ptlrpc_connection **connection,
41 struct lustre_handle **rconn)
43 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
44 *cl = osc->osc_ldlm_client;
45 *connection = osc->osc_conn;
48 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
50 struct osc_obd *osc = &obd->u.osc;
51 struct obd_import *import;
52 struct ptlrpc_request *request;
53 char *tmp = osc->osc_target_uuid;
54 int rc, size = sizeof(osc->osc_target_uuid);
57 OBD_ALLOC(import, sizeof(*import));
62 rc = class_connect(conn, obd);
66 request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
67 OST_CONNECT, 1, &size, &tmp);
69 GOTO(out_disco, rc = -ENOMEM);
71 request->rq_level = LUSTRE_CONN_NEW;
72 request->rq_replen = lustre_msg_size(0, NULL);
73 request->rq_reqmsg->addr = -1;
74 /* Sending our local connection info breaks for local connections
75 request->rq_reqmsg->addr = conn->addr;
76 request->rq_reqmsg->cookie = conn->cookie;
79 rc = ptlrpc_queue_wait(request);
80 rc = ptlrpc_check_status(request, rc);
82 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
86 /* XXX eventually maybe more refinement */
87 osc->osc_conn->c_level = LUSTRE_CONN_FULL;
89 class_import2export(conn, (struct lustre_handle *)request->rq_repmsg);
93 ptlrpc_free_req(request);
96 class_disconnect(conn);
102 static int osc_disconnect(struct lustre_handle *conn)
104 struct ptlrpc_request *request;
105 struct ptlrpc_client *cl;
106 struct ptlrpc_connection *connection;
107 struct lustre_handle *rconn;
111 osc_con2cl(conn, &cl, &connection, &rconn);
112 request = ptlrpc_prep_req2(cl, connection, rconn,
113 OST_DISCONNECT, 0, NULL, NULL);
116 request->rq_replen = lustre_msg_size(0, NULL);
118 rc = ptlrpc_queue_wait(request);
121 rc = class_disconnect(conn);
126 ptlrpc_free_req(request);
130 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
132 struct ptlrpc_request *request;
133 struct ptlrpc_client *cl;
134 struct ptlrpc_connection *connection;
135 struct lustre_handle *rconn;
136 struct ost_body *body;
137 int rc, size = sizeof(*body);
140 osc_con2cl(conn, &cl, &connection, &rconn);
141 request = ptlrpc_prep_req2(cl, connection, rconn,
142 OST_GETATTR, 1, &size, NULL);
146 body = lustre_msg_buf(request->rq_reqmsg, 0);
147 memcpy(&body->oa, oa, sizeof(*oa));
148 body->oa.o_valid = ~0;
150 request->rq_replen = lustre_msg_size(1, &size);
152 rc = ptlrpc_queue_wait(request);
153 rc = ptlrpc_check_status(request, rc);
155 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
159 body = lustre_msg_buf(request->rq_repmsg, 0);
160 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
162 memcpy(oa, &body->oa, sizeof(*oa));
166 ptlrpc_free_req(request);
170 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
171 struct lov_stripe_md *md)
173 struct ptlrpc_request *request;
174 struct ptlrpc_client *cl;
175 struct ptlrpc_connection *connection;
176 struct lustre_handle *rconn;
177 struct ost_body *body;
178 int rc, size = sizeof(*body);
181 osc_con2cl(conn, &cl, &connection, &rconn);
182 request = ptlrpc_prep_req2(cl, connection, rconn,
183 OST_OPEN, 1, &size, NULL);
187 body = lustre_msg_buf(request->rq_reqmsg, 0);
188 memcpy(&body->oa, oa, sizeof(*oa));
189 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
191 request->rq_replen = lustre_msg_size(1, &size);
193 rc = ptlrpc_queue_wait(request);
194 rc = ptlrpc_check_status(request, rc);
198 body = lustre_msg_buf(request->rq_repmsg, 0);
199 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
201 memcpy(oa, &body->oa, sizeof(*oa));
205 ptlrpc_free_req(request);
209 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
210 struct lov_stripe_md *md)
212 struct ptlrpc_request *request;
213 struct ptlrpc_client *cl;
214 struct ptlrpc_connection *connection;
215 struct lustre_handle *rconn;
216 struct ost_body *body;
217 int rc, size = sizeof(*body);
220 osc_con2cl(conn, &cl, &connection, &rconn);
221 request = ptlrpc_prep_req2(cl, connection, rconn,
222 OST_CLOSE, 1, &size, NULL);
226 oa->o_id = md->lmd_object_id;
227 oa->o_mode = S_IFREG;
228 oa->o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
229 body = lustre_msg_buf(request->rq_reqmsg, 0);
230 memcpy(&body->oa, oa, sizeof(*oa));
232 request->rq_replen = lustre_msg_size(1, &size);
234 rc = ptlrpc_queue_wait(request);
235 rc = ptlrpc_check_status(request, rc);
239 body = lustre_msg_buf(request->rq_repmsg, 0);
240 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242 memcpy(oa, &body->oa, sizeof(*oa));
246 ptlrpc_free_req(request);
250 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
252 struct ptlrpc_request *request;
253 struct ptlrpc_client *cl;
254 struct ptlrpc_connection *connection;
255 struct lustre_handle *rconn;
256 struct ost_body *body;
257 int rc, size = sizeof(*body);
260 osc_con2cl(conn, &cl, &connection, &rconn);
261 request = ptlrpc_prep_req2(cl, connection, rconn,
262 OST_SETATTR, 1, &size, NULL);
266 body = lustre_msg_buf(request->rq_reqmsg, 0);
267 memcpy(&body->oa, oa, sizeof(*oa));
269 request->rq_replen = lustre_msg_size(1, &size);
271 rc = ptlrpc_queue_wait(request);
272 rc = ptlrpc_check_status(request, rc);
276 ptlrpc_free_req(request);
280 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
281 struct lov_stripe_md **ea)
283 struct ptlrpc_request *request;
284 struct ptlrpc_client *cl;
285 struct ptlrpc_connection *connection;
286 struct lustre_handle *rconn;
287 struct ost_body *body;
288 int rc, size = sizeof(*body);
301 OBD_ALLOC(*ea, oa->o_easize);
304 (*ea)->lmd_size = oa->o_easize;
307 osc_con2cl(conn, &cl, &connection, &rconn);
308 request = ptlrpc_prep_req2(cl, connection, rconn,
309 OST_CREATE, 1, &size, NULL);
313 body = lustre_msg_buf(request->rq_reqmsg, 0);
314 memcpy(&body->oa, oa, sizeof(*oa));
316 request->rq_replen = lustre_msg_size(1, &size);
318 rc = ptlrpc_queue_wait(request);
319 rc = ptlrpc_check_status(request, rc);
323 body = lustre_msg_buf(request->rq_repmsg, 0);
324 memcpy(oa, &body->oa, sizeof(*oa));
326 (*ea)->lmd_object_id = oa->o_id;
327 (*ea)->lmd_stripe_count = 1;
330 ptlrpc_free_req(request);
334 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
335 struct lov_stripe_md *md, obd_size count,
338 struct ptlrpc_request *request;
339 struct ptlrpc_client *cl;
340 struct ptlrpc_connection *connection;
341 struct lustre_handle *rconn;
342 struct ost_body *body;
343 int rc, size = sizeof(*body);
350 osc_con2cl(conn, &cl, &connection, &rconn);
351 request = ptlrpc_prep_req2(cl, connection, rconn,
352 OST_PUNCH, 1, &size, NULL);
356 body = lustre_msg_buf(request->rq_reqmsg, 0);
357 memcpy(&body->oa, oa, sizeof(*oa));
358 body->oa.o_blocks = count;
359 body->oa.o_valid |= OBD_MD_FLBLOCKS;
361 request->rq_replen = lustre_msg_size(1, &size);
363 rc = ptlrpc_queue_wait(request);
364 rc = ptlrpc_check_status(request, rc);
368 body = lustre_msg_buf(request->rq_repmsg, 0);
369 memcpy(oa, &body->oa, sizeof(*oa));
373 ptlrpc_free_req(request);
377 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
378 struct lov_stripe_md *ea)
380 struct ptlrpc_request *request;
381 struct ptlrpc_client *cl;
382 struct ptlrpc_connection *connection;
383 struct lustre_handle *rconn;
384 struct ost_body *body;
385 int rc, size = sizeof(*body);
392 osc_con2cl(conn, &cl, &connection, &rconn);
393 request = ptlrpc_prep_req2(cl, connection, rconn,
394 OST_DESTROY, 1, &size, NULL);
398 body = lustre_msg_buf(request->rq_reqmsg, 0);
399 memcpy(&body->oa, oa, sizeof(*oa));
400 body->oa.o_valid = ~0;
402 request->rq_replen = lustre_msg_size(1, &size);
404 rc = ptlrpc_queue_wait(request);
405 rc = ptlrpc_check_status(request, rc);
409 body = lustre_msg_buf(request->rq_repmsg, 0);
410 memcpy(oa, &body->oa, sizeof(*oa));
414 ptlrpc_free_req(request);
418 struct osc_brw_cb_data {
419 bulk_callback_t callback;
425 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
427 struct list_head *tmp, *next;
428 struct osc_brw_cb_data *cb_data = data;
431 if (desc->b_flags & PTL_RPC_FL_INTR)
432 CERROR("got signal\n");
434 /* This feels wrong to me. */
435 list_for_each_safe(tmp, next, &desc->b_page_list) {
436 struct ptlrpc_bulk_page *bulk;
437 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
439 kunmap(bulk->b_page);
442 if (cb_data->callback)
443 (cb_data->callback)(desc, cb_data->cb_data);
445 ptlrpc_bulk_decref(desc);
446 if (cb_data->obd_data)
447 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
448 OBD_FREE(cb_data, sizeof(*cb_data));
452 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
453 obd_count page_count, struct page **page_array,
454 obd_size *count, obd_off *offset, obd_flag *flags,
455 bulk_callback_t callback)
457 struct ptlrpc_client *cl;
458 struct ptlrpc_connection *connection;
459 struct lustre_handle *rconn;
460 struct ptlrpc_request *request = NULL;
461 struct ptlrpc_bulk_desc *desc = NULL;
462 struct ost_body *body;
463 int rc, j, size[3] = {sizeof(*body)};
464 void *iooptr, *nioptr;
465 struct osc_brw_cb_data *cb_data = NULL;
468 size[1] = sizeof(struct obd_ioobj);
469 size[2] = page_count * sizeof(struct niobuf_remote);
471 osc_con2cl(conn, &cl, &connection, &rconn);
472 request = ptlrpc_prep_req2(cl, connection, rconn,
473 OST_BRW, 3, size, NULL);
477 body = lustre_msg_buf(request->rq_reqmsg, 0);
478 body->data = OBD_BRW_READ;
480 desc = ptlrpc_prep_bulk(connection);
482 GOTO(out_free, rc = -ENOMEM);
483 desc->b_portal = OST_BULK_PORTAL;
484 desc->b_cb = brw_finish;
485 OBD_ALLOC(cb_data, sizeof(*cb_data));
487 GOTO(out_free, rc = -ENOMEM);
488 cb_data->callback = callback;
489 desc->b_cb_data = cb_data;
490 /* XXX end almost identical to brw_write case */
492 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
493 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
494 ost_pack_ioo(&iooptr, md, page_count);
495 for (j = 0; j < page_count; j++) {
496 struct ptlrpc_bulk_page *bulk;
497 bulk = ptlrpc_prep_bulk_page(desc);
499 GOTO(out_unmap, rc = -ENOMEM);
501 spin_lock(&connection->c_lock);
502 bulk->b_xid = ++connection->c_xid_out;
503 spin_unlock(&connection->c_lock);
505 bulk->b_buf = kmap(page_array[j]);
506 bulk->b_page = page_array[j];
507 bulk->b_buflen = PAGE_SIZE;
508 ost_pack_niobuf(&nioptr, offset[j], count[j],
509 flags[j], bulk->b_xid);
513 * Register the bulk first, because the reply could arrive out of order,
514 * and we want to be ready for the bulk data.
516 * One reference is released by the bulk callback, the other when
517 * we finish sleeping on it (if we don't have a callback).
519 atomic_set(&desc->b_refcount, callback ? 1 : 2);
520 rc = ptlrpc_register_bulk(desc);
524 request->rq_replen = lustre_msg_size(1, size);
525 rc = ptlrpc_queue_wait(request);
526 rc = ptlrpc_check_status(request, rc);
528 ptlrpc_bulk_decref(desc);
532 /* Callbacks cause asynchronous handling. */
536 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
537 ptlrpc_bulk_decref(desc);
538 if (desc->b_flags & PTL_RPC_FL_INTR)
543 /* Clean up on error. */
545 for (j = 0; j < desc->b_page_count; j++)
546 kunmap(page_array[j]);
549 OBD_FREE(cb_data, sizeof(*cb_data));
550 ptlrpc_free_bulk(desc);
551 ptlrpc_free_req(request);
555 static int osc_brw_write(struct lustre_handle *conn,
556 struct lov_stripe_md *md, obd_count page_count,
557 struct page **pagearray, obd_size *count,
558 obd_off *offset, obd_flag *flags,
559 bulk_callback_t callback)
561 struct ptlrpc_client *cl;
562 struct ptlrpc_connection *connection;
563 struct lustre_handle *rconn;
564 struct ptlrpc_request *request = NULL;
565 struct ptlrpc_bulk_desc *desc = NULL;
566 struct ost_body *body;
567 struct niobuf_local *local = NULL;
568 struct niobuf_remote *remote;
569 struct osc_brw_cb_data *cb_data = NULL;
570 int rc, j, size[3] = {sizeof(*body)};
571 void *iooptr, *nioptr;
574 size[1] = sizeof(struct obd_ioobj);
575 size[2] = page_count * sizeof(*remote);
577 osc_con2cl(conn, &cl, &connection, &rconn);
578 request = ptlrpc_prep_req2(cl, connection, rconn,
579 OST_BRW, 3, size, NULL);
583 body = lustre_msg_buf(request->rq_reqmsg, 0);
584 body->data = OBD_BRW_WRITE;
586 OBD_ALLOC(local, page_count * sizeof(*local));
588 GOTO(out_free, rc = -ENOMEM);
590 desc = ptlrpc_prep_bulk(connection);
592 GOTO(out_free, rc = -ENOMEM);
593 desc->b_portal = OSC_BULK_PORTAL;
594 desc->b_cb = brw_finish;
595 OBD_ALLOC(cb_data, sizeof(*cb_data));
597 GOTO(out_free, rc = -ENOMEM);
598 cb_data->callback = callback;
599 desc->b_cb_data = cb_data;
600 /* XXX end almost identical to brw_read case */
601 cb_data->obd_data = local;
602 cb_data->obd_size = page_count * sizeof(*local);
604 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
605 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
606 ost_pack_ioo(&iooptr, md, page_count);
607 for (j = 0; j < page_count; j++) {
608 local[j].addr = kmap(pagearray[j]);
609 local[j].offset = offset[j];
610 local[j].len = count[j];
611 ost_pack_niobuf(&nioptr, offset[j], count[j], flags[j], 0);
614 size[1] = page_count * sizeof(struct niobuf_remote);
615 request->rq_replen = lustre_msg_size(2, size);
616 rc = ptlrpc_queue_wait(request);
617 rc = ptlrpc_check_status(request, rc);
621 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
623 GOTO(out_unmap, rc = -EINVAL);
625 if (request->rq_repmsg->buflens[1] !=
626 page_count * sizeof(struct niobuf_remote)) {
627 CERROR("buffer length wrong (%d vs. %d)\n",
628 request->rq_repmsg->buflens[1],
629 page_count * sizeof(struct niobuf_remote));
630 GOTO(out_unmap, rc = -EINVAL);
633 for (j = 0; j < page_count; j++) {
634 struct ptlrpc_bulk_page *page;
636 ost_unpack_niobuf(&nioptr, &remote);
638 page = ptlrpc_prep_bulk_page(desc);
640 GOTO(out_unmap, rc = -ENOMEM);
642 page->b_buf = (void *)(unsigned long)local[j].addr;
643 page->b_buflen = local[j].len;
644 page->b_xid = remote->xid;
647 if (desc->b_page_count != page_count)
651 * One is released when the bulk is complete, the other when we finish
652 * waiting on it. (Callback cases don't sleep, so only one ref for
655 atomic_set(&desc->b_refcount, callback ? 1 : 2);
656 CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
657 atomic_read(&desc->b_refcount));
658 rc = ptlrpc_send_bulk(desc);
662 /* Callbacks cause asynchronous handling. */
666 /* If there's no callback function, sleep here until complete. */
667 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
668 ptlrpc_bulk_decref(desc);
669 if (desc->b_flags & PTL_RPC_FL_INTR)
673 /* Clean up on error. */
675 for (j = 0; j < page_count; j++)
676 kunmap(pagearray[j]);
680 OBD_FREE(cb_data, sizeof(*cb_data));
682 OBD_FREE(local, page_count * sizeof(*local));
683 ptlrpc_free_bulk(desc);
684 ptlrpc_req_finished(request);
688 static int osc_brw(int cmd, struct lustre_handle *conn,
689 struct lov_stripe_md *md, obd_count page_count,
690 struct page **page_array,
696 if (cmd & OBD_BRW_WRITE)
697 return osc_brw_write(conn, md, page_count, page_array, count,
698 offset, flags, (bulk_callback_t)callback);
700 return osc_brw_read(conn, md, page_count, page_array, count,
701 offset, flags, (bulk_callback_t)callback);
704 static int osc_enqueue(struct lustre_handle *conn,
705 struct lustre_handle *parent_lock, __u64 *res_id,
706 __u32 type, void *extentp, int extent_len, __u32 mode,
707 int *flags, void *callback, void *data, int datalen,
708 struct lustre_handle *lockh)
710 struct obd_device *obddev = class_conn2obd(conn);
711 struct ptlrpc_connection *connection;
712 struct ptlrpc_client *cl;
713 struct lustre_handle *rconn;
714 struct ldlm_extent *extent = extentp;
718 /* Filesystem locks are given a bit of special treatment: first we
719 * fixup the lock to start and end on page boundaries. */
720 extent->start &= PAGE_MASK;
721 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
723 /* Next, search for already existing extent locks that will cover us */
724 osc_con2dlmcl(conn, &cl, &connection, &rconn);
725 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
726 sizeof(extent), mode, lockh);
728 /* We already have a lock, and it's referenced */
732 /* Next, search for locks that we can upgrade (if we're trying to write)
733 * or are more than we need (if we're trying to read). Because the VFS
734 * and page cache already protect us locally, lots of readers/writers
735 * can share a single PW lock. */
741 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
742 sizeof(extent), mode2, lockh);
745 /* FIXME: This is not incredibly elegant, but it might
746 * be more elegant than adding another parameter to
747 * lock_match. I want a second opinion. */
748 ldlm_lock_addref(lockh, mode);
749 ldlm_lock_decref(lockh, mode2);
754 rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
761 rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
762 parent_lock, res_id, type, extent, sizeof(extent),
763 mode, flags, callback, data, datalen, lockh);
767 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
768 struct lustre_handle *lockh)
772 ldlm_lock_decref(lockh, mode);
777 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
779 struct obd_ioctl_data* data = buf;
780 struct osc_obd *osc = &obddev->u.osc;
781 char server_uuid[37];
785 if (data->ioc_inllen1 < 1) {
786 CERROR("osc setup requires a TARGET UUID\n");
790 if (data->ioc_inllen1 > 37) {
791 CERROR("osc TARGET UUID must be less than 38 characters\n");
795 if (data->ioc_inllen2 < 1) {
796 CERROR("osc setup requires a SERVER UUID\n");
800 if (data->ioc_inllen2 > 37) {
801 CERROR("osc SERVER UUID must be less than 38 characters\n");
805 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
806 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
807 sizeof(server_uuid)));
809 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
813 obddev->obd_namespace =
814 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
815 if (obddev->obd_namespace == NULL)
816 GOTO(out_conn, rc = -ENOMEM);
818 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
819 if (osc->osc_client == NULL)
820 GOTO(out_ns, rc = -ENOMEM);
822 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
823 if (osc->osc_ldlm_client == NULL)
824 GOTO(out_client, rc = -ENOMEM);
826 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
828 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
829 osc->osc_ldlm_client);
830 osc->osc_client->cli_name = "osc";
831 osc->osc_ldlm_client->cli_name = "ldlm";
837 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
839 ldlm_namespace_free(obddev->obd_namespace);
841 ptlrpc_put_connection(osc->osc_conn);
845 static int osc_cleanup(struct obd_device * obddev)
847 struct osc_obd *osc = &obddev->u.osc;
849 ldlm_namespace_free(obddev->obd_namespace);
851 ptlrpc_cleanup_client(osc->osc_client);
852 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
853 ptlrpc_cleanup_client(osc->osc_ldlm_client);
854 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
855 ptlrpc_put_connection(osc->osc_conn);
861 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
863 struct ptlrpc_request *request;
864 struct ptlrpc_client *cl;
865 struct ptlrpc_connection *connection;
866 struct lustre_handle *rconn;
867 struct obd_statfs *osfs;
868 int rc, size = sizeof(*osfs);
871 osc_con2cl(conn, &cl, &connection, &rconn);
872 request = ptlrpc_prep_req2(cl, connection, rconn,
873 OST_STATFS, 0, NULL, NULL);
877 request->rq_replen = lustre_msg_size(1, &size);
879 rc = ptlrpc_queue_wait(request);
880 rc = ptlrpc_check_status(request, rc);
882 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
886 osfs = lustre_msg_buf(request->rq_repmsg, 0);
887 obd_statfs_unpack(osfs, sfs);
891 ptlrpc_free_req(request);
895 struct obd_ops osc_obd_ops = {
897 o_cleanup: osc_cleanup,
898 o_statfs: osc_statfs,
899 o_create: osc_create,
900 o_destroy: osc_destroy,
901 o_getattr: osc_getattr,
902 o_setattr: osc_setattr,
905 o_connect: osc_connect,
906 o_disconnect: osc_disconnect,
909 o_enqueue: osc_enqueue,
913 static int __init osc_init(void)
915 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
918 static void __exit osc_exit(void)
920 class_unregister_type(LUSTRE_OSC_NAME);
923 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
924 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
925 MODULE_LICENSE("GPL");
927 module_init(osc_init);
928 module_exit(osc_exit);