2 * Copryright (C) 2001 Cluster File Systems, Inc.
4 * This code is issued under the GNU General Public License.
5 * See the file COPYING in this distribution
7 * Author Peter Braam <braam@clusterfs.com>
9 * This server is single threaded at present (but can easily be multi
10 * threaded). For testing and management it is treated as an
11 * obd_device, although it does not export a full OBD method table
12 * (the requests are coming in over the wire, so object target
13 * modules do not have a full method table.)
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <asm/segment.h>
36 #include <linux/miscdevice.h>
38 #define DEBUG_SUBSYSTEM S_OSC
40 #include <linux/obd_support.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_idl.h>
45 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
47 /* FIXME: this belongs in some sort of service struct */
48 static int osc_xid = 1;
50 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
51 int buflen2, char *buf2)
53 struct ptlrpc_request *request;
57 OBD_ALLOC(request, sizeof(*request));
59 CERROR("request allocation out of memory\n");
63 memset(request, 0, sizeof(*request));
64 request->rq_xid = osc_xid++;
66 rc = ost_pack_req(buf1, buflen1, buf2, buflen2,
67 &request->rq_reqhdr, &request->rq_req.ost,
68 &request->rq_reqlen, &request->rq_reqbuf);
70 CERROR("llight request: cannot pack request %d\n", rc);
73 request->rq_reqhdr->opc = opcode;
79 /* XXX: unify with mdc_queue_wait */
80 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
82 struct obd_device *client = conn->oc_dev;
83 struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
88 /* set the connection id */
89 req->rq_req.ost->connid = conn->oc_id;
91 /* XXX fix the race here (wait_for_event?)*/
94 CDEBUG(D_INODE, "\n");
95 rc = ost_queue_req(client, req);
97 /* Remote delivery via portals. */
98 req->rq_req_portal = OST_REQUEST_PORTAL;
99 req->rq_reply_portal = OST_REPLY_PORTAL;
100 rc = ptl_send_rpc(req, peer);
103 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
107 CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n",
108 &conn->oc_dev->u.osc.osc_tgt->u.ost,
109 conn->oc_id, req->rq_reqhdr->opc, req);
111 /* wait for the reply */
112 init_waitqueue_head(&req->rq_wait_for_rep);
113 CDEBUG(D_INODE, "-- sleeping\n");
114 interruptible_sleep_on(&req->rq_wait_for_rep);
115 CDEBUG(D_INODE, "-- done\n");
117 rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr,
120 CERROR("mds_unpack_rep failed: %d\n", rc);
124 if ( req->rq_rephdr->status == 0 )
125 CDEBUG(D_INODE, "buf %p len %d status %d\n",
126 req->rq_repbuf, req->rq_replen,
127 req->rq_rephdr->status);
133 static void osc_free_req(struct ptlrpc_request *request)
135 OBD_FREE(request, sizeof(*request));
138 static int osc_connect(struct obd_conn *conn)
140 struct ptlrpc_request *request;
144 request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
146 CERROR("cannot pack req!\n");
151 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
153 rc = osc_queue_wait(conn, request);
159 CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid);
161 conn->oc_id = request->rq_rep.ost->connid;
163 osc_free_req(request);
168 static int osc_disconnect(struct obd_conn *conn)
170 struct ptlrpc_request *request;
174 request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
176 CERROR("cannot pack req!\n");
181 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
183 rc = osc_queue_wait(conn, request);
189 osc_free_req(request);
195 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
197 struct ptlrpc_request *request;
200 request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
202 CERROR("cannot pack req!\n");
206 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
207 request->rq_req.ost->oa.o_valid = ~0;
209 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
211 rc = osc_queue_wait(conn, request);
217 CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode);
219 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
223 osc_free_req(request);
227 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
229 struct ptlrpc_request *request;
232 request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
234 CERROR("cannot pack req!\n");
238 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
240 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
242 rc = osc_queue_wait(conn, request);
249 osc_free_req(request);
253 static int osc_create(struct obd_conn *conn, struct obdo *oa)
255 struct ptlrpc_request *request;
261 request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
263 CERROR("cannot pack req!\n");
267 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
268 request->rq_req.ost->oa.o_valid = ~0;
270 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
272 rc = osc_queue_wait(conn, request);
277 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
280 osc_free_req(request);
284 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
286 struct ptlrpc_request *request;
292 request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
294 CERROR("cannot pack req!\n");
298 memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
299 request->rq_req.ost->oa.o_valid = ~0;
301 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
303 rc = osc_queue_wait(conn, request);
308 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
311 osc_free_req(request);
316 /* mount the file system (secretly) */
317 static int osc_setup(struct obd_device *obddev, obd_count len,
321 struct obd_ioctl_data* data = buf;
322 struct osc_obd *osc = &obddev->u.osc;
325 if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
326 /* This is a local connection */
327 osc->osc_tgt = &obd_dev[data->ioc_dev];
329 CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev,
330 &osc->osc_tgt->u.ost);
331 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) ||
332 ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
333 CERROR("device not attached or not set up (%d)\n",
340 /* This is a remote connection using Portals */
342 /* XXX: this should become something like ioc_inlbuf1 */
343 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
345 CERROR("Cannot find 'ost' peer.\n");
356 void osc_sendpage(struct niobuf *dst, struct niobuf *src)
358 memcpy((char *)(unsigned long)dst->addr,
359 (char *)(unsigned long)src->addr,
365 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
366 struct obdo **oa, obd_count *oa_bufs, struct page **buf,
367 obd_size *count, obd_off *offset, obd_flag *flags)
369 struct ptlrpc_request *request;
371 struct obd_ioobj ioo;
373 int size1, size2 = 0;
377 size1 = num_oa * sizeof(ioo);
378 for (i = 0; i < num_oa; i++) {
379 size2 += oa_bufs[i] * sizeof(src);
382 request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
384 CERROR("cannot pack req!\n");
389 request->rq_req.ost->cmd = rw;
390 ptr1 = ost_req_buf1(request->rq_req.ost);
391 ptr2 = ost_req_buf2(request->rq_req.ost);
392 for (i=0; i < num_oa; i++) {
393 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
394 for (j = 0 ; j < oa_bufs[i] ; j++) {
395 ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
402 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
404 rc = osc_queue_wait(conn, request);
411 ptr2 = ost_rep_buf2(request->rq_rep.ost);
412 if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
413 CERROR("buffer length wrong\n");
417 if (rw == OBD_BRW_READ)
420 for (i=0; i < num_oa; i++) {
421 for (j = 0 ; j < oa_bufs[i] ; j++) {
423 src.addr = (__u64)(unsigned long)buf[n];
425 ost_unpack_niobuf(&ptr2, &dst);
426 osc_sendpage(dst, &src);
433 if (request->rq_rephdr)
434 OBD_FREE(request->rq_rephdr, request->rq_replen);
436 for (i=0; i < num_oa; i++) {
437 for (j = 0 ; j < oa_bufs[i] ; j++) {
443 osc_free_req(request);
447 static int osc_cleanup(struct obd_device * obddev)
453 struct obd_ops osc_obd_ops = {
455 o_cleanup: osc_cleanup,
456 o_create: osc_create,
457 o_destroy: osc_destroy,
458 o_getattr: osc_getattr,
459 o_setattr: osc_setattr,
460 o_connect: osc_connect,
461 o_disconnect: osc_disconnect,
465 static int __init osc_init(void)
467 obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
471 static void __exit osc_exit(void)
473 obd_unregister_type(LUSTRE_OSC_NAME);
476 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
477 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
478 MODULE_LICENSE("GPL");
480 module_init(osc_init);
481 module_exit(osc_exit);