1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * Author Peter Braam <braam@clusterfs.com>
11 * This server is single threaded at present (but can easily be multi
12 * threaded). For testing and management it is treated as an
13 * obd_device, although it does not export a full OBD method table
14 * (the requests are coming in over the wire, so object target
15 * modules do not have a full method table.)
20 #define DEBUG_SUBSYSTEM S_OSC
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29 struct ptlrpc_connection **connection)
31 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32 *cl = osc->osc_client;
33 *connection = osc->osc_conn;
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37 struct ptlrpc_connection **connection)
39 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40 *cl = osc->osc_ldlm_client;
41 *connection = osc->osc_conn;
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
46 struct osc_obd *osc = &obd->u.osc;
47 struct obd_import *import;
48 struct ptlrpc_request *request;
49 char *tmp = osc->osc_target_uuid;
50 int rc, size = sizeof(osc->osc_target_uuid);
53 OBD_ALLOC(import, sizeof(*import));
58 rc = class_connect(conn, obd);
62 request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
63 OST_CONNECT, 1, &size, &tmp);
65 GOTO(out_disco, rc = -ENOMEM);
67 request->rq_level = LUSTRE_CONN_NEW;
68 request->rq_replen = lustre_msg_size(0, NULL);
70 rc = ptlrpc_queue_wait(request);
71 rc = ptlrpc_check_status(request, rc);
73 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
77 /* XXX: Make this a handle */
78 osc->osc_connh.addr = request->rq_repmsg->addr;
79 osc->osc_connh.cookie = request->rq_repmsg->cookie;
83 ptlrpc_free_req(request);
86 class_disconnect(conn);
92 static int osc_disconnect(struct lustre_handle *conn)
94 struct ptlrpc_request *request;
95 struct ptlrpc_client *cl;
96 struct ptlrpc_connection *connection;
97 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
101 osc_con2cl(conn, &cl, &connection);
102 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
103 OST_DISCONNECT, 0, NULL, NULL);
106 request->rq_replen = lustre_msg_size(0, NULL);
108 rc = ptlrpc_queue_wait(request);
111 rc = class_disconnect(conn);
116 ptlrpc_free_req(request);
120 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
122 struct ptlrpc_request *request;
123 struct ptlrpc_client *cl;
124 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
125 struct ptlrpc_connection *connection;
126 struct ost_body *body;
127 int rc, size = sizeof(*body);
130 osc_con2cl(conn, &cl, &connection);
131 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
132 OST_GETATTR, 1, &size, NULL);
136 body = lustre_msg_buf(request->rq_reqmsg, 0);
137 memcpy(&body->oa, oa, sizeof(*oa));
138 body->oa.o_valid = ~0;
140 request->rq_replen = lustre_msg_size(1, &size);
142 rc = ptlrpc_queue_wait(request);
143 rc = ptlrpc_check_status(request, rc);
145 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
149 body = lustre_msg_buf(request->rq_repmsg, 0);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
152 memcpy(oa, &body->oa, sizeof(*oa));
156 ptlrpc_free_req(request);
160 static int osc_open(struct lustre_handle *conn, struct obdo *oa)
162 struct ptlrpc_request *request;
163 struct ptlrpc_client *cl;
164 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
165 struct ptlrpc_connection *connection;
166 struct ost_body *body;
167 int rc, size = sizeof(*body);
170 osc_con2cl(conn, &cl, &connection);
171 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
172 OST_OPEN, 1, &size, NULL);
176 body = lustre_msg_buf(request->rq_reqmsg, 0);
177 memcpy(&body->oa, oa, sizeof(*oa));
178 body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
180 request->rq_replen = lustre_msg_size(1, &size);
182 rc = ptlrpc_queue_wait(request);
183 rc = ptlrpc_check_status(request, rc);
187 body = lustre_msg_buf(request->rq_repmsg, 0);
188 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
190 memcpy(oa, &body->oa, sizeof(*oa));
194 ptlrpc_free_req(request);
198 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
200 struct ptlrpc_request *request;
201 struct ptlrpc_client *cl;
202 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
203 struct ptlrpc_connection *connection;
204 struct ost_body *body;
205 int rc, size = sizeof(*body);
208 osc_con2cl(conn, &cl, &connection);
209 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
210 OST_CLOSE, 1, &size, NULL);
214 body = lustre_msg_buf(request->rq_reqmsg, 0);
215 memcpy(&body->oa, oa, sizeof(*oa));
217 request->rq_replen = lustre_msg_size(1, &size);
219 rc = ptlrpc_queue_wait(request);
220 rc = ptlrpc_check_status(request, rc);
224 body = lustre_msg_buf(request->rq_repmsg, 0);
225 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
227 memcpy(oa, &body->oa, sizeof(*oa));
231 ptlrpc_free_req(request);
235 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
237 struct ptlrpc_request *request;
238 struct ptlrpc_client *cl;
239 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
240 struct ptlrpc_connection *connection;
241 struct ost_body *body;
242 int rc, size = sizeof(*body);
245 osc_con2cl(conn, &cl, &connection);
246 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
247 OST_SETATTR, 1, &size, NULL);
251 body = lustre_msg_buf(request->rq_reqmsg, 0);
252 memcpy(&body->oa, oa, sizeof(*oa));
254 request->rq_replen = lustre_msg_size(1, &size);
256 rc = ptlrpc_queue_wait(request);
257 rc = ptlrpc_check_status(request, rc);
261 ptlrpc_free_req(request);
265 static int osc_create(struct lustre_handle *conn, struct obdo *oa)
267 struct ptlrpc_request *request;
268 struct ptlrpc_client *cl;
269 struct ptlrpc_connection *connection;
270 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
271 struct ost_body *body;
272 struct mds_objid *objid;
273 struct lov_object_id *lov_id;
274 int rc, size = sizeof(*body);
281 osc_con2cl(conn, &cl, &connection);
282 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
283 OST_CREATE, 1, &size, NULL);
287 body = lustre_msg_buf(request->rq_reqmsg, 0);
288 memcpy(&body->oa, oa, sizeof(*oa));
290 request->rq_replen = lustre_msg_size(1, &size);
292 rc = ptlrpc_queue_wait(request);
293 rc = ptlrpc_check_status(request, rc);
297 body = lustre_msg_buf(request->rq_repmsg, 0);
298 memcpy(oa, &body->oa, sizeof(*oa));
300 memset(oa->o_inline, 0, sizeof(oa->o_inline));
301 objid = (struct mds_objid *)oa->o_inline;
302 objid->mo_lov_md.lmd_object_id = oa->o_id;
303 objid->mo_lov_md.lmd_stripe_count = 1;
304 lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
305 lov_id->l_device_id = 0;
306 lov_id->l_object_id = oa->o_id;
310 ptlrpc_free_req(request);
314 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
317 struct ptlrpc_request *request;
318 struct ptlrpc_client *cl;
319 struct ptlrpc_connection *connection;
320 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
321 struct ost_body *body;
322 int rc, size = sizeof(*body);
329 osc_con2cl(conn, &cl, &connection);
330 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
331 OST_PUNCH, 1, &size, NULL);
335 body = lustre_msg_buf(request->rq_reqmsg, 0);
336 memcpy(&body->oa, oa, sizeof(*oa));
337 body->oa.o_blocks = count;
338 body->oa.o_valid |= OBD_MD_FLBLOCKS;
340 request->rq_replen = lustre_msg_size(1, &size);
342 rc = ptlrpc_queue_wait(request);
343 rc = ptlrpc_check_status(request, rc);
347 body = lustre_msg_buf(request->rq_repmsg, 0);
348 memcpy(oa, &body->oa, sizeof(*oa));
352 ptlrpc_free_req(request);
356 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
358 struct ptlrpc_request *request;
359 struct ptlrpc_client *cl;
360 struct ptlrpc_connection *connection;
361 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
362 struct ost_body *body;
363 int rc, size = sizeof(*body);
370 osc_con2cl(conn, &cl, &connection);
371 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
372 OST_DESTROY, 1, &size, NULL);
376 body = lustre_msg_buf(request->rq_reqmsg, 0);
377 memcpy(&body->oa, oa, sizeof(*oa));
378 body->oa.o_valid = ~0;
380 request->rq_replen = lustre_msg_size(1, &size);
382 rc = ptlrpc_queue_wait(request);
383 rc = ptlrpc_check_status(request, rc);
387 body = lustre_msg_buf(request->rq_repmsg, 0);
388 memcpy(oa, &body->oa, sizeof(*oa));
392 ptlrpc_free_req(request);
396 struct osc_brw_cb_data {
398 bulk_callback_t callback;
404 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
406 struct osc_brw_cb_data *cb_data = data;
410 if (desc->b_flags & PTL_RPC_FL_INTR)
411 CERROR("got signal\n");
413 for (i = 0; i < desc->b_page_count; i++)
414 kunmap(cb_data->buf[i]);
416 if (cb_data->callback)
417 (cb_data->callback)(desc, cb_data->cb_data);
419 ptlrpc_bulk_decref(desc);
420 if (cb_data->obd_data)
421 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
422 OBD_FREE(cb_data, sizeof(*cb_data));
426 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
427 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
428 obd_size *count, obd_off *offset, obd_flag *flags,
429 bulk_callback_t callback)
431 struct ptlrpc_client *cl;
432 struct ptlrpc_connection *connection;
433 struct ptlrpc_request *request = NULL;
434 struct ptlrpc_bulk_desc *desc = NULL;
435 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
436 struct ost_body *body;
437 struct osc_brw_cb_data *cb_data = NULL;
439 int rc, i, j, size[3] = {sizeof(*body)};
440 void *iooptr, *nioptr;
443 /* XXX almost identical to brw_write case */
444 size[1] = num_oa * sizeof(struct obd_ioobj);
446 for (i = 0; i < num_oa; i++)
448 size[2] = pages * sizeof(struct niobuf_remote);
450 osc_con2cl(conn, &cl, &connection);
451 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
452 OST_BRW, 3, size, NULL);
456 body = lustre_msg_buf(request->rq_reqmsg, 0);
457 body->data = OBD_BRW_READ;
459 desc = ptlrpc_prep_bulk(connection);
461 GOTO(out_free, rc = -ENOMEM);
462 desc->b_portal = OST_BULK_PORTAL;
463 desc->b_cb = brw_finish;
464 OBD_ALLOC(cb_data, sizeof(*cb_data));
466 GOTO(out_free, rc = -ENOMEM);
468 cb_data->callback = callback;
469 desc->b_cb_data = cb_data;
470 /* XXX end almost identical to brw_write case */
472 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
473 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
474 for (pages = 0, i = 0; i < num_oa; i++) {
475 ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
476 /* FIXME: this inner loop is wrong for multiple OAs */
477 for (j = 0; j < oa_bufs[i]; j++, pages++) {
478 struct ptlrpc_bulk_page *bulk;
479 bulk = ptlrpc_prep_bulk_page(desc);
481 GOTO(out_unmap, rc = -ENOMEM);
483 spin_lock(&connection->c_lock);
484 bulk->b_xid = ++connection->c_xid_out;
485 spin_unlock(&connection->c_lock);
487 bulk->b_buf = kmap(buf[pages]);
488 bulk->b_page = buf[pages];
489 bulk->b_buflen = PAGE_SIZE;
490 ost_pack_niobuf(&nioptr, offset[pages], count[pages],
491 flags[pages], bulk->b_xid);
496 * Register the bulk first, because the reply could arrive out of order,
497 * and we want to be ready for the bulk data.
499 * One reference is released by the bulk callback, the other when
500 * we finish sleeping on it (if we don't have a callback).
502 atomic_set(&desc->b_refcount, callback ? 1 : 2);
503 rc = ptlrpc_register_bulk(desc);
507 request->rq_replen = lustre_msg_size(1, size);
508 rc = ptlrpc_queue_wait(request);
509 rc = ptlrpc_check_status(request, rc);
511 ptlrpc_bulk_decref(desc);
515 /* Callbacks cause asynchronous handling. */
519 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
520 ptlrpc_bulk_decref(desc);
521 if (desc->b_flags & PTL_RPC_FL_INTR)
525 /* Clean up on error. */
527 for (pages = 0, i = 0; i < num_oa; i++)
528 for (j = 0; j < oa_bufs[i]; j++, pages++)
529 kunmap(pagearray[pages]);
532 OBD_FREE(cb_data, sizeof(*cb_data));
533 ptlrpc_free_bulk(desc);
534 ptlrpc_free_req(request);
538 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
539 struct obdo **oa, obd_count *oa_bufs,
540 struct page **pagearray, obd_size *count,
541 obd_off *offset, obd_flag *flags,
542 bulk_callback_t callback)
544 struct ptlrpc_client *cl;
545 struct ptlrpc_connection *connection;
546 struct ptlrpc_request *request = NULL;
547 struct ptlrpc_bulk_desc *desc = NULL;
548 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
549 struct ost_body *body;
550 struct niobuf_local *local = NULL;
551 struct niobuf_remote *remote;
552 struct osc_brw_cb_data *cb_data = NULL;
554 int rc, i, j, size[3] = {sizeof(*body)};
555 void *iooptr, *nioptr;
558 /* XXX almost identical to brw_read case */
559 size[1] = num_oa * sizeof(struct obd_ioobj);
561 for (i = 0; i < num_oa; i++)
563 size[2] = pages * sizeof(struct niobuf_remote);
565 osc_con2cl(conn, &cl, &connection);
566 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
567 OST_BRW, 3, size, NULL);
571 body = lustre_msg_buf(request->rq_reqmsg, 0);
572 body->data = OBD_BRW_WRITE;
574 OBD_ALLOC(local, pages * sizeof(*local));
576 GOTO(out_free, rc = -ENOMEM);
578 desc = ptlrpc_prep_bulk(connection);
580 GOTO(out_free, rc = -ENOMEM);
581 desc->b_portal = OSC_BULK_PORTAL;
582 desc->b_cb = brw_finish;
583 OBD_ALLOC(cb_data, sizeof(*cb_data));
585 GOTO(out_free, rc = -ENOMEM);
586 cb_data->buf = pagearray;
587 cb_data->callback = callback;
588 desc->b_cb_data = cb_data;
589 /* XXX end almost identical to brw_read case */
590 cb_data->obd_data = local;
591 cb_data->obd_size = pages * sizeof(*local);
593 iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
594 nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
595 for (pages = 0, i = 0; i < num_oa; i++) {
596 ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
597 for (j = 0; j < oa_bufs[i]; j++, pages++) {
598 local[pages].addr = kmap(pagearray[pages]);
599 local[pages].offset = offset[pages];
600 local[pages].len = count[pages];
601 ost_pack_niobuf(&nioptr, offset[pages], count[pages],
606 size[1] = pages * sizeof(struct niobuf_remote);
607 request->rq_replen = lustre_msg_size(2, size);
608 rc = ptlrpc_queue_wait(request);
609 rc = ptlrpc_check_status(request, rc);
613 nioptr = lustre_msg_buf(request->rq_repmsg, 1);
615 GOTO(out_unmap, rc = -EINVAL);
617 if (request->rq_repmsg->buflens[1] !=
618 pages * sizeof(struct niobuf_remote)) {
619 CERROR("buffer length wrong (%d vs. %ld)\n",
620 request->rq_repmsg->buflens[1],
621 pages * sizeof(struct niobuf_remote));
622 GOTO(out_unmap, rc = -EINVAL);
625 for (pages = 0, i = 0; i < num_oa; i++) {
626 for (j = 0; j < oa_bufs[i]; j++, pages++) {
627 struct ptlrpc_bulk_page *page;
629 ost_unpack_niobuf(&nioptr, &remote);
631 page = ptlrpc_prep_bulk_page(desc);
633 GOTO(out_unmap, rc = -ENOMEM);
635 page->b_buf = (void *)(unsigned long)local[pages].addr;
636 page->b_buflen = local[pages].len;
637 page->b_xid = remote->xid;
641 if (desc->b_page_count != pages)
645 * One is released when the bulk is complete, the other when we finish
646 * waiting on it. (Callback cases don't sleep, so only one ref for
649 atomic_set(&desc->b_refcount, callback ? 1 : 2);
650 CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
651 atomic_read(&desc->b_refcount));
652 rc = ptlrpc_send_bulk(desc);
656 /* Callbacks cause asynchronous handling. */
660 /* If there's no callback function, sleep here until complete. */
661 l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
662 ptlrpc_bulk_decref(desc);
663 if (desc->b_flags & PTL_RPC_FL_INTR)
667 /* Clean up on error. */
669 for (pages = 0, i = 0; i < num_oa; i++)
670 for (j = 0; j < oa_bufs[i]; j++, pages++)
671 kunmap(pagearray[pages]);
675 OBD_FREE(cb_data, sizeof(*cb_data));
677 OBD_FREE(local, pages * sizeof(*local));
678 ptlrpc_free_bulk(desc);
679 ptlrpc_req_finished(request);
683 static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
684 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
685 obd_size *count, obd_off *offset, obd_flag *flags,
691 if (cmd & OBD_BRW_WRITE)
692 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
693 offset, flags, (bulk_callback_t)callback);
695 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
696 offset, flags, (bulk_callback_t)callback);
699 static int osc_enqueue(struct lustre_handle *oconn,
700 struct lustre_handle *parent_lock, __u64 *res_id,
701 __u32 type, void *extentp, int extent_len, __u32 mode,
702 int *flags, void *callback, void *data, int datalen,
703 struct lustre_handle *lockh)
705 struct obd_device *obddev = class_conn2obd(oconn);
706 struct osc_obd *osc = &obddev->u.osc;
707 struct ptlrpc_connection *conn;
708 struct ptlrpc_client *cl;
709 struct ldlm_extent *extent = extentp;
713 /* Filesystem locks are given a bit of special treatment: first we
714 * fixup the lock to start and end on page boundaries. */
715 extent->start &= PAGE_MASK;
716 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
718 /* Next, search for already existing extent locks that will cover us */
719 osc_con2dlmcl(oconn, &cl, &conn);
720 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
721 sizeof(extent), mode, lockh);
723 /* We already have a lock, and it's referenced */
727 /* Next, search for locks that we can upgrade (if we're trying to write)
728 * or are more than we need (if we're trying to read). Because the VFS
729 * and page cache already protect us locally, lots of readers/writers
730 * can share a single PW lock. */
736 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
737 sizeof(extent), mode2, lockh);
740 /* FIXME: This is not incredibly elegant, but it might
741 * be more elegant than adding another parameter to
742 * lock_match. I want a second opinion. */
743 ldlm_lock_addref(lockh, mode);
744 ldlm_lock_decref(lockh, mode2);
749 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh,
757 rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh,
758 NULL, obddev->obd_namespace,
759 parent_lock, res_id, type, extent, sizeof(extent),
760 mode, flags, callback, data, datalen, lockh);
764 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
765 struct lustre_handle *lockh)
769 ldlm_lock_decref(lockh, mode);
774 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
776 struct obd_ioctl_data* data = buf;
777 struct osc_obd *osc = &obddev->u.osc;
778 char server_uuid[37];
782 if (data->ioc_inllen1 < 1) {
783 CERROR("osc setup requires a TARGET UUID\n");
787 if (data->ioc_inllen1 > 37) {
788 CERROR("osc TARGET UUID must be less than 38 characters\n");
792 if (data->ioc_inllen2 < 1) {
793 CERROR("osc setup requires a SERVER UUID\n");
797 if (data->ioc_inllen2 > 37) {
798 CERROR("osc SERVER UUID must be less than 38 characters\n");
802 memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
803 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
804 sizeof(server_uuid)));
806 osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
810 obddev->obd_namespace =
811 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
812 if (obddev->obd_namespace == NULL)
813 GOTO(out_conn, rc = -ENOMEM);
815 OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
816 if (osc->osc_client == NULL)
817 GOTO(out_ns, rc = -ENOMEM);
819 OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
820 if (osc->osc_ldlm_client == NULL)
821 GOTO(out_client, rc = -ENOMEM);
823 ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
825 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
826 osc->osc_ldlm_client);
827 osc->osc_client->cli_name = "osc";
828 osc->osc_ldlm_client->cli_name = "ldlm";
834 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
836 ldlm_namespace_free(obddev->obd_namespace);
838 ptlrpc_put_connection(osc->osc_conn);
842 static int osc_cleanup(struct obd_device * obddev)
844 struct osc_obd *osc = &obddev->u.osc;
846 ldlm_namespace_free(obddev->obd_namespace);
848 ptlrpc_cleanup_client(osc->osc_client);
849 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
850 ptlrpc_cleanup_client(osc->osc_ldlm_client);
851 OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
852 ptlrpc_put_connection(osc->osc_conn);
858 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
860 struct ptlrpc_request *request;
861 struct ptlrpc_client *cl;
862 struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
863 struct ptlrpc_connection *connection;
864 struct obd_statfs *osfs;
865 int rc, size = sizeof(*osfs);
868 osc_con2cl(conn, &cl, &connection);
869 request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
870 OST_STATFS, 0, NULL, NULL);
874 request->rq_replen = lustre_msg_size(1, &size);
876 rc = ptlrpc_queue_wait(request);
877 rc = ptlrpc_check_status(request, rc);
879 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
883 osfs = lustre_msg_buf(request->rq_repmsg, 0);
884 obd_statfs_unpack(osfs, sfs);
888 ptlrpc_free_req(request);
892 struct obd_ops osc_obd_ops = {
894 o_cleanup: osc_cleanup,
895 o_statfs: osc_statfs,
896 o_create: osc_create,
897 o_destroy: osc_destroy,
898 o_getattr: osc_getattr,
899 o_setattr: osc_setattr,
902 o_connect: osc_connect,
903 o_disconnect: osc_disconnect,
906 o_enqueue: osc_enqueue,
910 static int __init osc_init(void)
912 return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
915 static void __exit osc_exit(void)
917 class_unregister_type(LUSTRE_OSC_NAME);
920 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
921 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
922 MODULE_LICENSE("GPL");
924 module_init(osc_init);
925 module_exit(osc_exit);