2 * Copryright (C) 2001 Cluster File Systems, Inc.
4 * This code is issued under the GNU General Public License.
5 * See the file COPYING in this distribution
7 * Author Peter Braam <braam@clusterfs.com>
9 * This server is single threaded at present (but can easily be multi
10 * threaded). For testing and management it is treated as an
11 * obd_device, although it does not export a full OBD method table
12 * (the requests are coming in over the wire, so object target
13 * modules do not have a full method table.)
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <asm/segment.h>
36 #include <linux/miscdevice.h>
38 #define DEBUG_SUBSYSTEM S_OSC
40 #include <linux/obd_support.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_idl.h>
45 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
47 /* FIXME: this belongs in some sort of service struct */
48 static int osc_xid = 1;
50 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
51 int buflen2, char *buf2)
53 struct ptlrpc_request *request;
57 OBD_ALLOC(request, sizeof(*request));
59 CERROR("request allocation out of memory\n");
63 memset(request, 0, sizeof(*request));
64 request->rq_xid = osc_xid++;
66 rc = ost_pack_req(buf1, buflen1, buf2, buflen2,
67 &request->rq_reqhdr, &request->rq_req.ost,
68 &request->rq_reqlen, &request->rq_reqbuf);
70 CERROR("llight request: cannot pack request %d\n", rc);
73 request->rq_reqhdr->opc = opcode;
79 /* XXX: unify with mdc_queue_wait */
80 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
82 struct obd_device *client = conn->oc_dev;
83 struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
85 DECLARE_WAITQUEUE(wait, current);
89 /* set the connection id */
90 req->rq_req.ost->connid = conn->oc_id;
91 init_waitqueue_head(&req->rq_wait_for_rep);
93 /* XXX fix the race here (wait_for_event?)*/
96 CDEBUG(D_INODE, "\n");
97 rc = ost_queue_req(client, req);
99 /* Remote delivery via portals. */
100 req->rq_req_portal = OST_REQUEST_PORTAL;
101 req->rq_reply_portal = OST_REPLY_PORTAL;
102 rc = ptl_send_rpc(req, peer);
105 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
109 CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n",
110 &conn->oc_dev->u.osc.osc_tgt->u.ost,
111 conn->oc_id, req->rq_reqhdr->opc, req);
113 /* wait for the reply */
114 CDEBUG(D_INODE, "-- sleeping\n");
115 add_wait_queue(&req->rq_wait_for_rep, &wait);
116 while (req->rq_repbuf == NULL) {
117 set_current_state(TASK_INTERRUPTIBLE);
119 /* if this process really wants to die, let it go */
120 if (sigismember(&(current->pending.signal), SIGKILL) ||
121 sigismember(&(current->pending.signal), SIGINT))
126 remove_wait_queue(&req->rq_wait_for_rep, &wait);
127 set_current_state(TASK_RUNNING);
128 CDEBUG(D_INODE, "-- done\n");
130 if (req->rq_repbuf == NULL) {
131 /* We broke out because of a signal */
136 rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr,
139 CERROR("mds_unpack_rep failed: %d\n", rc);
143 if ( req->rq_rephdr->status == 0 )
144 CDEBUG(D_INODE, "buf %p len %d status %d\n",
145 req->rq_repbuf, req->rq_replen,
146 req->rq_rephdr->status);
152 static void osc_free_req(struct ptlrpc_request *request)
154 OBD_FREE(request, sizeof(*request));
157 static int osc_connect(struct obd_conn *conn)
159 struct ptlrpc_request *request;
163 request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
165 CERROR("cannot pack req!\n");
170 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
172 rc = osc_queue_wait(conn, request);
178 CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid);
180 conn->oc_id = request->rq_rep.ost->connid;
182 osc_free_req(request);
187 static int osc_disconnect(struct obd_conn *conn)
189 struct ptlrpc_request *request;
193 request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
195 CERROR("cannot pack req!\n");
200 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
202 rc = osc_queue_wait(conn, request);
208 osc_free_req(request);
214 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
216 struct ptlrpc_request *request;
219 request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
221 CERROR("cannot pack req!\n");
225 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
226 request->rq_req.ost->oa.o_valid = ~0;
228 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
230 rc = osc_queue_wait(conn, request);
236 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
238 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
242 osc_free_req(request);
246 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
248 struct ptlrpc_request *request;
251 request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
253 CERROR("cannot pack req!\n");
257 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
259 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
261 rc = osc_queue_wait(conn, request);
268 osc_free_req(request);
272 static int osc_create(struct obd_conn *conn, struct obdo *oa)
274 struct ptlrpc_request *request;
280 request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
282 CERROR("cannot pack req!\n");
286 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
287 request->rq_req.ost->oa.o_valid = ~0;
289 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
291 rc = osc_queue_wait(conn, request);
296 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
299 osc_free_req(request);
303 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset)
305 struct ptlrpc_request *request;
311 request = ost_prep_req(OST_PUNCH, 0, NULL, 0, NULL);
313 CERROR("cannot pack req!\n");
317 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
318 request->rq_req.ost->oa.o_valid = ~0;
319 request->rq_req.ost->oa.o_size = offset;
320 request->rq_req.ost->oa.o_blocks = count;
322 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
324 rc = osc_queue_wait(conn, request);
329 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
332 osc_free_req(request);
336 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
338 struct ptlrpc_request *request;
344 request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
346 CERROR("cannot pack req!\n");
350 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
351 request->rq_req.ost->oa.o_valid = ~0;
353 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
355 rc = osc_queue_wait(conn, request);
360 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
363 osc_free_req(request);
368 /* mount the file system (secretly) */
369 static int osc_setup(struct obd_device *obddev, obd_count len,
373 struct obd_ioctl_data* data = buf;
374 struct osc_obd *osc = &obddev->u.osc;
377 if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
378 /* This is a local connection */
379 osc->osc_tgt = &obd_dev[data->ioc_dev];
381 CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev,
382 &osc->osc_tgt->u.ost);
383 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) ||
384 ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
385 CERROR("device not attached or not set up (%d)\n",
392 /* This is a remote connection using Portals */
394 /* XXX: this should become something like ioc_inlbuf1 */
395 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
397 CERROR("Cannot find 'ost' peer.\n");
408 int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst,
411 if (req->rq_peer.peer_nid == 0) {
413 memcpy((char *)(unsigned long)dst->addr,
414 (char *)(unsigned long)src->addr, src->len);
419 OBD_ALLOC(buf, src->len);
423 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
425 req->rq_bulkbuf = buf;
426 req->rq_bulklen = src->len;
427 rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL, 0);
428 init_waitqueue_head(&req->rq_wait_for_bulk);
429 sleep_on(&req->rq_wait_for_bulk);
430 OBD_FREE(buf, src->len);
431 req->rq_bulklen = 0; /* FIXME: eek. */
438 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
439 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
440 obd_size *count, obd_off *offset, obd_flag *flags)
442 struct ptlrpc_request *request;
444 struct obd_ioobj ioo;
446 int size1, size2 = 0;
450 size1 = num_oa * sizeof(ioo);
451 for (i = 0; i < num_oa; i++) {
452 size2 += oa_bufs[i] * sizeof(src);
455 request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
457 CERROR("cannot pack req!\n");
462 request->rq_req.ost->cmd = rw;
463 ptr1 = ost_req_buf1(request->rq_req.ost);
464 ptr2 = ost_req_buf2(request->rq_req.ost);
465 for (i = 0; i < num_oa; i++) {
466 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
467 for (j = 0; j < oa_bufs[i]; j++) {
468 ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
474 request->rq_bulk_portal = OST_BULK_PORTAL;
476 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
478 rc = osc_queue_wait(conn, request);
485 ptr2 = ost_rep_buf2(request->rq_rep.ost);
486 if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
487 CERROR("buffer length wrong\n");
491 if (rw == OBD_BRW_READ)
494 for (i = 0; i < num_oa; i++) {
495 for (j = 0; j < oa_bufs[i]; j++) {
497 src.addr = (__u64)(unsigned long)buf[n];
499 ost_unpack_niobuf(&ptr2, &dst);
500 osc_sendpage(request, dst, &src);
507 if (request->rq_rephdr)
508 OBD_FREE(request->rq_rephdr, request->rq_replen);
510 for (i = 0; i < num_oa; i++) {
511 for (j = 0; j < oa_bufs[i]; j++) {
517 osc_free_req(request);
521 static int osc_cleanup(struct obd_device * obddev)
527 struct obd_ops osc_obd_ops = {
529 o_cleanup: osc_cleanup,
530 o_create: osc_create,
531 o_destroy: osc_destroy,
532 o_getattr: osc_getattr,
533 o_setattr: osc_setattr,
534 o_connect: osc_connect,
535 o_disconnect: osc_disconnect,
540 static int __init osc_init(void)
542 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
546 static void __exit osc_exit(void)
548 obd_unregister_type(LUSTRE_OSC_NAME);
551 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
552 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
553 MODULE_LICENSE("GPL");
555 module_init(osc_init);
556 module_exit(osc_exit);