Whamcloud - gitweb
- add open and close calls: initial purpose is to support I/O to open,
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  * 
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_support.h>
43 #include <linux/obd_class.h>
44 #include <linux/lustre_lib.h>
45 #include <linux/lustre_idl.h>
46
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
48 {
49         struct osc_obd *osc = &conn->oc_dev->u.osc;
50         return &osc->osc_peer;
51
52 }
53
54 static int osc_connect(struct obd_conn *conn)
55 {
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *peer = osc_con2cl(conn);
58         int rc; 
59         ENTRY;
60         
61         request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
62         if (!request) { 
63                 CERROR("cannot pack req!\n"); 
64                 return -ENOMEM;
65         }
66
67         request->rq_replen = 
68                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
69
70         rc = ptlrpc_queue_wait(peer, request);
71         if (rc) { 
72                 EXIT;
73                 goto out;
74         }
75       
76         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
77
78         conn->oc_id = request->rq_rep.ost->connid;
79  out:
80         ptlrpc_free_req(request);
81         EXIT;
82         return rc;
83 }
84
85 static int osc_disconnect(struct obd_conn *conn)
86 {
87         struct ptlrpc_request *request;
88         struct ptlrpc_client *peer = osc_con2cl(conn);
89         int rc; 
90         ENTRY;
91         
92         request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
93         if (!request) { 
94                 CERROR("cannot pack req!\n"); 
95                 return -ENOMEM;
96         }
97         request->rq_req.ost->connid = conn->oc_id;
98         request->rq_replen = 
99                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
100
101         rc = ptlrpc_queue_wait(peer, request);
102         if (rc) { 
103                 EXIT;
104                 goto out;
105         }
106  out:
107         ptlrpc_free_req(request);
108         EXIT;
109         return rc;
110 }
111
112
113 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
114 {
115         struct ptlrpc_request *request;
116         struct ptlrpc_client *peer = osc_con2cl(conn);
117         int rc; 
118
119         request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
120         if (!request) { 
121                 CERROR("cannot pack req!\n"); 
122                 return -ENOMEM;
123         }
124         
125         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
126         request->rq_req.ost->connid = conn->oc_id;
127         request->rq_req.ost->oa.o_valid = ~0;
128         request->rq_replen = 
129                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
130         
131         rc = ptlrpc_queue_wait(peer, request);
132         if (rc) { 
133                 EXIT;
134                 goto out;
135         }
136
137         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
138         if (oa) { 
139                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
140         }
141
142  out:
143         ptlrpc_free_req(request);
144         return 0;
145 }
146
147 static int osc_open(struct obd_conn *conn, struct obdo *oa)
148 {
149         struct ptlrpc_request *request;
150         struct ptlrpc_client *peer = osc_con2cl(conn);
151         int rc; 
152
153         request = ptlrpc_prep_req(peer, OST_OPEN, 0, NULL, 0, NULL);
154         if (!request) { 
155                 CERROR("cannot pack req!\n"); 
156                 return -ENOMEM;
157         }
158         
159         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
160         request->rq_req.ost->connid = conn->oc_id;
161         if (request->rq_req.ost->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
162                 BUG();
163         request->rq_replen = 
164                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
165         
166         rc = ptlrpc_queue_wait(peer, request);
167         if (rc) { 
168                 EXIT;
169                 goto out;
170         }
171
172         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
173         if (oa) { 
174                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
175         }
176
177  out:
178         ptlrpc_free_req(request);
179         return 0;
180 }
181
182 static int osc_close(struct obd_conn *conn, struct obdo *oa)
183 {
184         struct ptlrpc_request *request;
185         struct ptlrpc_client *peer = osc_con2cl(conn);
186         int rc; 
187
188         request = ptlrpc_prep_req(peer, OST_CLOSE, 0, NULL, 0, NULL);
189         if (!request) { 
190                 CERROR("cannot pack req!\n"); 
191                 return -ENOMEM;
192         }
193         
194         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
195         request->rq_req.ost->connid = conn->oc_id;
196
197         request->rq_replen = 
198                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
199         
200         rc = ptlrpc_queue_wait(peer, request);
201         if (rc) { 
202                 EXIT;
203                 goto out;
204         }
205
206         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
207         if (oa) { 
208                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
209         }
210
211  out:
212         ptlrpc_free_req(request);
213         return 0;
214 }
215
216 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
217 {
218         struct ptlrpc_request *request;
219         struct ptlrpc_client *peer = osc_con2cl(conn);
220         int rc; 
221
222         request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
223         if (!request) { 
224                 CERROR("cannot pack req!\n"); 
225                 return -ENOMEM;
226         }
227         
228         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
229         request->rq_req.ost->connid = conn->oc_id;
230         request->rq_replen = 
231                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
232         
233         rc = ptlrpc_queue_wait(peer, request);
234         if (rc) { 
235                 EXIT;
236                 goto out;
237         }
238
239  out:
240         ptlrpc_free_req(request);
241         return 0;
242 }
243
244 static int osc_create(struct obd_conn *conn, struct obdo *oa)
245 {
246         struct ptlrpc_request *request;
247         struct ptlrpc_client *peer = osc_con2cl(conn);
248         int rc; 
249
250         if (!oa) { 
251                 CERROR("oa NULL\n"); 
252         }
253         request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
254         if (!request) { 
255                 CERROR("cannot pack req!\n"); 
256                 return -ENOMEM;
257         }
258         
259         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
260         request->rq_req.ost->connid = conn->oc_id;
261         request->rq_req.ost->oa.o_valid = ~0;
262         request->rq_replen = 
263                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
264         
265         rc = ptlrpc_queue_wait(peer, request);
266         if (rc) { 
267                 EXIT;
268                 goto out;
269         }
270         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
271
272  out:
273         ptlrpc_free_req(request);
274         return 0;
275 }
276
277 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
278                      obd_off offset)
279 {
280         struct ptlrpc_request *request;
281         struct ptlrpc_client *peer = osc_con2cl(conn);
282         int rc; 
283
284         if (!oa) { 
285                 CERROR("oa NULL\n"); 
286         }
287         request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
288         if (!request) { 
289                 CERROR("cannot pack req!\n"); 
290                 return -ENOMEM;
291         }
292         
293         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
294         request->rq_req.ost->connid = conn->oc_id;
295         request->rq_req.ost->oa.o_valid = ~0;
296         request->rq_req.ost->oa.o_size = offset;
297         request->rq_req.ost->oa.o_blocks = count;
298         request->rq_replen = 
299                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
300         
301         rc = ptlrpc_queue_wait(peer, request);
302         if (rc) { 
303                 EXIT;
304                 goto out;
305         }
306         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
307
308  out:
309         ptlrpc_free_req(request);
310         return 0;
311 }
312
313 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
314 {
315         struct ptlrpc_request *request;
316         struct ptlrpc_client *peer = osc_con2cl(conn);
317         int rc; 
318
319         if (!oa) { 
320                 CERROR("oa NULL\n"); 
321         }
322         request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
323         if (!request) { 
324                 CERROR("cannot pack req!\n"); 
325                 return -ENOMEM;
326         }
327         
328         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
329         request->rq_req.ost->connid = conn->oc_id;
330         request->rq_req.ost->oa.o_valid = ~0;
331         request->rq_replen = 
332                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
333         
334         rc = ptlrpc_queue_wait(peer, request);
335         if (rc) { 
336                 EXIT;
337                 goto out;
338         }
339         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
340
341  out:
342         ptlrpc_free_req(request);
343         return 0;
344 }
345
346 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
347                  struct niobuf *dst, struct niobuf *src)
348 {
349         struct ptlrpc_client *cl = osc_con2cl(conn);
350
351         if (cl->cli_obd) {
352                 /* local sendpage */
353                 memcpy((char *)(unsigned long)dst->addr,
354                        (char *)(unsigned long)src->addr, src->len);
355         } else {
356                 struct ptlrpc_bulk_desc *bulk;
357                 int rc;
358
359                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
360                 if (bulk == NULL)
361                         return -ENOMEM;
362
363                 bulk->b_buf = (void *)(unsigned long)src->addr;
364                 bulk->b_buflen = src->len;
365                 bulk->b_xid = dst->xid;
366                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
367                 if (rc != 0) {
368                         CERROR("send_bulk failed: %d\n", rc);
369                         BUG();
370                         return rc;
371                 }
372                 wait_event_interruptible(bulk->b_waitq,
373                                          ptlrpc_check_bulk_sent(bulk));
374
375                 if (bulk->b_flags == PTL_RPC_INTR) {
376                         EXIT;
377                         /* FIXME: hey hey, we leak here. */
378                         return -EINTR;
379                 }
380
381                 OBD_FREE(bulk, sizeof(*bulk));
382         }
383
384         return 0;
385 }
386
387 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
388                  obd_count *oa_bufs, struct page **buf, obd_size *count,
389                  obd_off *offset, obd_flag *flags)
390 {
391         struct ptlrpc_client *cl = osc_con2cl(conn);
392         struct ptlrpc_request *request;
393         int pages;
394         int rc; 
395         struct obd_ioobj ioo;
396         struct niobuf src;
397         int size1, size2 = 0; 
398         void *ptr1, *ptr2;
399         int i, j, n;
400         struct ptlrpc_bulk_desc **bulk;
401
402         size1 = num_oa * sizeof(ioo); 
403         pages = 0;
404         for (i = 0; i < num_oa; i++)
405                 pages += oa_bufs[i];
406         size2 = pages * sizeof(src);
407
408         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
409         if (!request) { 
410                 CERROR("cannot pack req!\n"); 
411                 return -ENOMEM;
412         }
413         request->rq_req.ost->cmd = OBD_BRW_READ;
414
415         OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
416         if (bulk == NULL) {
417                 CERROR("cannot alloc bulk desc vector\n");
418                 return -ENOMEM;
419         }
420         memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
421
422         n = 0;
423         ptr1 = ost_req_buf1(request->rq_req.ost);
424         ptr2 = ost_req_buf2(request->rq_req.ost);
425         for (i = 0; i < num_oa; i++) {
426                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
427                 for (j = 0; j < oa_bufs[i]; j++) {
428                         bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
429                         if (bulk[n] == NULL) {
430                                 CERROR("cannot alloc bulk desc\n");
431                                 rc = -ENOMEM;
432                                 goto out;
433                         }
434
435                         spin_lock(&cl->cli_lock);
436                         bulk[n]->b_xid = cl->cli_xid++;
437                         spin_unlock(&cl->cli_lock);
438                         bulk[n]->b_buf = kmap(buf[n]);
439                         bulk[n]->b_buflen = PAGE_SIZE;
440                         bulk[n]->b_portal = OST_BULK_PORTAL;
441                         ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
442                                         count[n], flags[n], bulk[n]->b_xid);
443
444                         rc = ptlrpc_register_bulk(bulk[n]);
445                         if (rc)
446                                 goto out;
447                         n++;
448                 }
449         }
450
451         request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
452         rc = ptlrpc_queue_wait(cl, request);
453
454  out:
455         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
456          * abort those bulk listeners. */
457
458         n = 0;
459         for (i = 0; i < num_oa; i++) {
460                 for (j = 0; j < oa_bufs[i]; j++) {
461                         if (bulk[n] == NULL)
462                                 continue;
463                         kunmap(buf[n]);
464                         OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
465                         n++;
466                 }
467         }
468
469         OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
470         ptlrpc_free_req(request);
471         return rc;
472 }
473
474 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
475                   obd_count *oa_bufs, struct page **buf, obd_size *count,
476                   obd_off *offset, obd_flag *flags)
477 {
478         struct ptlrpc_client *cl = osc_con2cl(conn);
479         struct ptlrpc_request *request;
480         struct obd_ioobj ioo;
481         struct niobuf *src;
482         int pages, rc, i, j, n, size1, size2 = 0; 
483         void *ptr1, *ptr2;
484
485         size1 = num_oa * sizeof(ioo); 
486         pages = 0;
487         for (i = 0; i < num_oa; i++)
488                 pages += oa_bufs[i];
489         size2 = pages * sizeof(*src);
490
491         OBD_ALLOC(src, size2);
492         if (!src) { 
493                 CERROR("no src memory\n");
494                 return -ENOMEM;
495         }
496         memset((char *)src, 0, size2);
497
498         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
499         if (!request) { 
500                 CERROR("cannot pack req!\n"); 
501                 return -ENOMEM;
502         }
503         request->rq_req.ost->cmd = OBD_BRW_WRITE;
504
505         n = 0;
506         ptr1 = ost_req_buf1(request->rq_req.ost);
507         ptr2 = ost_req_buf2(request->rq_req.ost);
508         for (i = 0; i < num_oa; i++) {
509                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
510                 for (j = 0; j < oa_bufs[i]; j++) {
511                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
512                                         count[n], flags[n], 0);
513                         n++;
514                 }
515         }
516         memcpy((char *)src, (char *)ost_req_buf2(request->rq_req.ost), size2); 
517
518         request->rq_replen = sizeof(struct ptlrep_hdr) +
519                 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
520         rc = ptlrpc_queue_wait(cl, request);
521         if (rc) { 
522                 EXIT;
523                 goto out;
524         }
525
526         ptr2 = ost_rep_buf2(request->rq_rep.ost);
527         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
528                 CERROR("buffer length wrong (%d vs. %d)\n",
529                        request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
530                 EXIT;
531                 goto out;
532         }
533
534         n = 0;
535         for (i = 0; i < num_oa; i++) {
536                 for (j = 0; j < oa_bufs[i]; j++) {
537                         struct niobuf *dst;
538                         ost_unpack_niobuf(&ptr2, &dst);
539                         osc_sendpage(conn, request, dst, &src[n]);
540                         n++;
541                 }
542         }
543         OBD_FREE(src, size2);
544  out:
545         n = 0;
546         for (i = 0; i < num_oa; i++) {
547                 for (j = 0; j < oa_bufs[i]; j++) {
548                         kunmap(buf[n]);
549                         n++;
550                 }
551         }
552
553         ptlrpc_free_req(request);
554         return 0;
555 }
556
557 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
558               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
559               obd_size *count, obd_off *offset, obd_flag *flags)
560 {
561         if (rw == OBD_BRW_READ) {
562                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
563                                     offset, flags);
564         } else {
565                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
566                                      offset, flags);
567         }
568 }
569
570 /* mount the file system (secretly) */
571 static int osc_setup(struct obd_device *obddev, obd_count len,
572                         void *buf)
573                         
574 {
575         struct osc_obd *osc = &obddev->u.osc;
576         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
577         int rc;
578         int dev = data->ioc_dev;
579         ENTRY;
580
581         rc = ptlrpc_connect_client(dev, "ost", 
582                                    OST_REQUEST_PORTAL, 
583                                    OSC_REPLY_PORTAL,    
584                                    ost_pack_req, 
585                                    ost_unpack_rep,
586                                    &osc->osc_peer); 
587
588         MOD_INC_USE_COUNT;
589         EXIT;
590         return rc;
591
592
593 static int osc_cleanup(struct obd_device * obddev)
594 {
595         MOD_DEC_USE_COUNT;
596         return 0;
597 }
598
599 struct obd_ops osc_obd_ops = { 
600         o_setup:   osc_setup,
601         o_cleanup: osc_cleanup, 
602         o_create: osc_create,
603         o_destroy: osc_destroy,
604         o_getattr: osc_getattr,
605         o_setattr: osc_setattr,
606         o_open: osc_open,
607         o_close: osc_close,
608         o_connect: osc_connect,
609         o_disconnect: osc_disconnect,
610         o_brw: osc_brw,
611         o_punch: osc_punch
612 };
613
614 static int __init osc_init(void)
615 {
616         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
617         return 0;
618 }
619
620 static void __exit osc_exit(void)
621 {
622         obd_unregister_type(LUSTRE_OSC_NAME);
623 }
624
625 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
626 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
627 MODULE_LICENSE("GPL"); 
628
629 module_init(osc_init);
630 module_exit(osc_exit);