Whamcloud - gitweb
- elimininate the system calls from filter obd
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* 
2  * Copryright (C) 2001 Cluster File Systems, Inc.
3  *
4  *  This code is issued under the GNU General Public License.
5  *  See the file COPYING in this distribution
6  *
7  *  Author Peter Braam <braam@clusterfs.com>
8  * 
9  *  This server is single threaded at present (but can easily be multi
10  *  threaded). For testing and management it is treated as an
11  *  obd_device, although it does not export a full OBD method table
12  *  (the requests are coming in over the wire, so object target
13  *  modules do not have a full method table.)
14  * 
15  */
16
17 #define EXPORT_SYMTAB
18
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
22 #include <linux/mm.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
28
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
31
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <linux/vmalloc.h>
36 #include <asm/segment.h>
37 #include <linux/miscdevice.h>
38
39 #include <linux/obd_support.h>
40 #include <linux/obd_class.h>
41 #include <linux/lustre_lib.h>
42 #include <linux/lustre_idl.h>
43
44 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
45
46 /* FIXME: this belongs in some sort of service struct */
47 static int osc_xid = 1;
48
49 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
50                                  int buflen2, char *buf2)
51 {
52         struct ptlrpc_request *request;
53         int rc;
54         ENTRY; 
55
56         request = (struct ptlrpc_request *)kmalloc(sizeof(*request), GFP_KERNEL); 
57         if (!request) { 
58                 printk("osc_prep_req: request allocation out of memory\n");
59                 return NULL;
60         }
61
62         memset(request, 0, sizeof(*request));
63         request->rq_xid = osc_xid++;
64
65         rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
66                           &request->rq_reqhdr, &request->rq_req.ost, 
67                           &request->rq_reqlen, &request->rq_reqbuf);
68         if (rc) { 
69                 printk("llight request: cannot pack request %d\n", rc); 
70                 return NULL;
71         }
72         request->rq_reqhdr->opc = opcode;
73
74         EXIT;
75         return request;
76 }
77
78 /* XXX: unify with mdc_queue_wait */
79 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
80 {
81         struct obd_device *client = conn->oc_dev;
82         struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
83         int rc;
84
85         ENTRY;
86
87         /* set the connection id */
88         req->rq_req.ost->connid = conn->oc_id;
89
90         /* XXX fix the race here (wait_for_event?)*/
91         if (peer == NULL) {
92                 /* Local delivery */
93                 CDEBUG(D_INODE, "\n");
94                 rc = ost_queue_req(client, req); 
95         } else {
96                 /* Remote delivery via portals. */
97                 req->rq_req_portal = OST_REQUEST_PORTAL;
98                 req->rq_reply_portal = OST_REPLY_PORTAL;
99                 rc = ptl_send_rpc(req, peer);
100         }
101         if (rc) { 
102                 printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
103                        req->rq_reqhdr->opc); 
104                 return -rc;
105         }
106
107         CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
108                &conn->oc_dev->u.osc.osc_tgt->u.ost, 
109                conn->oc_id, req->rq_reqhdr->opc, req);
110
111         /* wait for the reply */
112         init_waitqueue_head(&req->rq_wait_for_rep);
113         CDEBUG(D_INODE, "-- sleeping\n");
114         interruptible_sleep_on(&req->rq_wait_for_rep);
115         CDEBUG(D_INODE, "-- done\n");
116
117         rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
118                             &req->rq_rep.ost); 
119         if (rc) {
120                 printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
121                 return rc;
122         }
123
124         if ( req->rq_rephdr->status == 0 )
125                 CDEBUG(D_INODE, "buf %p len %d status %d\n", 
126                        req->rq_repbuf, req->rq_replen, 
127                        req->rq_rephdr->status); 
128
129         EXIT;
130         return 0;
131 }
132
133 static void osc_free_req(struct ptlrpc_request *request)
134 {
135         kfree(request);
136 }
137
138 static int osc_connect(struct obd_conn *conn)
139 {
140         struct ptlrpc_request *request;
141         int rc; 
142         ENTRY;
143         
144         request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
145         if (!request) { 
146                 printk(__FUNCTION__ ": cannot pack req!\n"); 
147                 return -ENOMEM;
148         }
149
150         request->rq_replen = 
151                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
152
153         rc = osc_queue_wait(conn, request);
154         if (rc) { 
155                 EXIT;
156                 goto out;
157         }
158       
159         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
160
161         conn->oc_id = request->rq_rep.ost->connid;
162  out:
163         osc_free_req(request);
164         EXIT;
165         return rc;
166 }
167
168 static int osc_disconnect(struct obd_conn *conn)
169 {
170         struct ptlrpc_request *request;
171         int rc; 
172         ENTRY;
173         
174         request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
175         if (!request) { 
176                 printk(__FUNCTION__ ": cannot pack req!\n"); 
177                 return -ENOMEM;
178         }
179
180         request->rq_replen = 
181                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
182
183         rc = osc_queue_wait(conn, request);
184         if (rc) { 
185                 EXIT;
186                 goto out;
187         }
188  out:
189         osc_free_req(request);
190         EXIT;
191         return rc;
192 }
193
194
195 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
196 {
197         struct ptlrpc_request *request;
198         int rc; 
199
200         request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
201         if (!request) { 
202                 printk(__FUNCTION__ ": cannot pack req!\n"); 
203                 return -ENOMEM;
204         }
205         
206         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
207         request->rq_req.ost->oa.o_valid = ~0;
208         request->rq_replen = 
209                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
210         
211         rc = osc_queue_wait(conn, request);
212         if (rc) { 
213                 EXIT;
214                 goto out;
215         }
216
217         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
218         if (oa) { 
219                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
220         }
221
222  out:
223         osc_free_req(request);
224         return 0;
225 }
226
227 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
228 {
229         struct ptlrpc_request *request;
230         int rc; 
231
232         request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
233         if (!request) { 
234                 printk(__FUNCTION__ ": cannot pack req!\n"); 
235                 return -ENOMEM;
236         }
237         
238         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
239         request->rq_replen = 
240                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
241         
242         rc = osc_queue_wait(conn, request);
243         if (rc) { 
244                 EXIT;
245                 goto out;
246         }
247
248  out:
249         osc_free_req(request);
250         return 0;
251 }
252
253 static int osc_create(struct obd_conn *conn, struct obdo *oa)
254 {
255         struct ptlrpc_request *request;
256         int rc; 
257
258         if (!oa) { 
259                 printk(__FUNCTION__ ": oa NULL\n"); 
260         }
261         request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
262         if (!request) { 
263                 printk("osc_connect: cannot pack req!\n"); 
264                 return -ENOMEM;
265         }
266         
267         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
268         request->rq_req.ost->oa.o_valid = ~0;
269         request->rq_replen = 
270                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
271         
272         rc = osc_queue_wait(conn, request);
273         if (rc) { 
274                 EXIT;
275                 goto out;
276         }
277         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
278
279  out:
280         osc_free_req(request);
281         return 0;
282 }
283
284 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
285 {
286         struct ptlrpc_request *request;
287         int rc; 
288
289         if (!oa) { 
290                 printk(__FUNCTION__ ": oa NULL\n"); 
291         }
292         request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
293         if (!request) { 
294                 printk("osc_connect: cannot pack req!\n"); 
295                 return -ENOMEM;
296         }
297         
298         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
299         request->rq_req.ost->oa.o_valid = ~0;
300         request->rq_replen = 
301                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
302         
303         rc = osc_queue_wait(conn, request);
304         if (rc) { 
305                 EXIT;
306                 goto out;
307         }
308         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
309
310  out:
311         osc_free_req(request);
312         return 0;
313 }
314
315
316 /* mount the file system (secretly) */
317 static int osc_setup(struct obd_device *obddev, obd_count len,
318                         void *buf)
319                         
320 {
321         struct obd_ioctl_data* data = buf;
322         struct osc_obd *osc = &obddev->u.osc;
323         ENTRY;
324
325         if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
326                 /* This is a local connection */
327                 osc->osc_tgt = &obd_dev[data->ioc_dev];
328
329                 printk("OSC: tgt %d ost at %p\n", data->ioc_dev,
330                        &osc->osc_tgt->u.ost);
331                 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
332                      ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
333                         printk("device not attached or not set up (%d)\n", 
334                                data->ioc_dev);
335                         EXIT;
336                         return -EINVAL;
337                 }
338         } else {
339                 int err;
340                 /* This is a remote connection using Portals */
341
342                 /* XXX: this should become something like ioc_inlbuf1 */
343                 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
344                 if (err != 0) {
345                         printk("Cannot find 'ost' peer.\n");
346                         EXIT;
347                         return -EINVAL;
348                 }
349         }
350
351         MOD_INC_USE_COUNT;
352         EXIT;
353         return 0;
354
355
356 void osc_sendpage(struct niobuf *dst, struct niobuf *src)
357 {
358         memcpy((char *)(unsigned long)dst->addr,  
359                (char *)(unsigned long)src->addr, 
360                src->len);
361         return;
362 }
363
364
365 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
366               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
367               obd_size *count, obd_off *offset, obd_flag *flags)
368 {
369         struct ptlrpc_request *request;
370         int rc; 
371         struct obd_ioobj ioo;
372         struct niobuf src;
373         int size1, size2 = 0; 
374         void *ptr1, *ptr2;
375         int i, j, n;
376
377         size1 = num_oa * sizeof(ioo); 
378         for (i = 0; i < num_oa; i++) { 
379                 size2 += oa_bufs[i] * sizeof(src);
380         }
381
382         request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
383         if (!request) { 
384                 printk("osc_connect: cannot pack req!\n"); 
385                 return -ENOMEM;
386         }
387
388         n = 0;
389         request->rq_req.ost->cmd = rw;
390         ptr1 = ost_req_buf1(request->rq_req.ost);
391         ptr2 = ost_req_buf2(request->rq_req.ost);
392         for (i=0; i < num_oa; i++) { 
393                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
394                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
395                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
396                                         count[n], flags[n]); 
397                         n++;
398                 }
399         }
400
401         request->rq_replen = 
402                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
403
404         rc = osc_queue_wait(conn, request);
405         if (rc) { 
406                 EXIT;
407                 goto out;
408         }
409
410 #if 0
411         ptr2 = ost_rep_buf2(request->rq_rep.ost); 
412         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
413                 printk(__FUNCTION__ ": buffer length wrong\n"); 
414                 goto out;
415         }
416
417         if (rw == OBD_BRW_READ)
418                 goto out;
419
420         for (i=0; i < num_oa; i++) { 
421                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
422                         struct niobuf *dst;
423                         src.addr = (__u64)(unsigned long)buf[n];
424                         src.len = count[n];
425                         ost_unpack_niobuf(&ptr2, &dst);
426                         osc_sendpage(dst, &src);
427                         n++;
428                 }
429         }
430 #endif
431
432  out:
433         if (request->rq_rephdr)
434                 kfree(request->rq_rephdr);
435         n = 0;
436         for (i=0; i < num_oa; i++) { 
437                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
438                         kunmap(buf[n]);
439                         n++;
440                 }
441         }
442
443         osc_free_req(request);
444         return 0;
445 }
446
447 static int osc_cleanup(struct obd_device * obddev)
448 {
449         MOD_DEC_USE_COUNT;
450         return 0;
451 }
452
453 struct obd_ops osc_obd_ops = { 
454         o_setup:   osc_setup,
455         o_cleanup: osc_cleanup, 
456         o_create: osc_create,
457         o_destroy: osc_destroy,
458         o_getattr: osc_getattr,
459         o_setattr: osc_setattr,
460         o_connect: osc_connect,
461         o_disconnect: osc_disconnect,
462         o_brw: osc_brw
463 };
464
465 static int __init osc_init(void)
466 {
467         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
468         return 0;
469 }
470
471 static void __exit osc_exit(void)
472 {
473         obd_unregister_type(LUSTRE_OSC_NAME);
474 }
475
476 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
477 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
478 MODULE_LICENSE("GPL"); 
479
480 module_init(osc_init);
481 module_exit(osc_exit);
482