Whamcloud - gitweb
96999f793da29fe128ac6ac1c938e6102901c249
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  * 
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
46
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
48 {
49         struct osc_obd *osc = &conn->oc_dev->u.osc;
50         return osc->osc_peer;
51 }
52
53 static int osc_connect(struct obd_conn *conn)
54 {
55         struct ptlrpc_request *request;
56         struct ptlrpc_client *peer = osc_con2cl(conn);
57         int rc; 
58         ENTRY;
59         
60         request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
61         if (!request) { 
62                 CERROR("cannot pack req!\n"); 
63                 return -ENOMEM;
64         }
65
66         request->rq_replen = 
67                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
68
69         rc = ptlrpc_queue_wait(peer, request);
70         if (rc) { 
71                 EXIT;
72                 goto out;
73         }
74       
75         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
76
77         conn->oc_id = request->rq_rep.ost->connid;
78  out:
79         ptlrpc_free_req(request);
80         EXIT;
81         return rc;
82 }
83
84 static int osc_disconnect(struct obd_conn *conn)
85 {
86         struct ptlrpc_request *request;
87         struct ptlrpc_client *peer = osc_con2cl(conn);
88         int rc; 
89         ENTRY;
90         
91         request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
92         if (!request) { 
93                 CERROR("cannot pack req!\n"); 
94                 return -ENOMEM;
95         }
96         request->rq_req.ost->connid = conn->oc_id;
97         request->rq_replen = 
98                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
99
100         rc = ptlrpc_queue_wait(peer, request);
101         if (rc) { 
102                 EXIT;
103                 goto out;
104         }
105  out:
106         ptlrpc_free_req(request);
107         EXIT;
108         return rc;
109 }
110
111
112 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
113 {
114         struct ptlrpc_request *request;
115         struct ptlrpc_client *peer = osc_con2cl(conn);
116         int rc; 
117
118         request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
119         if (!request) { 
120                 CERROR("cannot pack req!\n"); 
121                 return -ENOMEM;
122         }
123         
124         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
125         request->rq_req.ost->connid = conn->oc_id;
126         request->rq_req.ost->oa.o_valid = ~0;
127         request->rq_replen = 
128                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
129         
130         rc = ptlrpc_queue_wait(peer, request);
131         if (rc) { 
132                 EXIT;
133                 goto out;
134         }
135
136         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
137         if (oa) { 
138                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
139         }
140
141  out:
142         ptlrpc_free_req(request);
143         return 0;
144 }
145
146 static int osc_open(struct obd_conn *conn, struct obdo *oa)
147 {
148         struct ptlrpc_request *request;
149         struct ptlrpc_client *peer = osc_con2cl(conn);
150         int rc; 
151
152         request = ptlrpc_prep_req(peer, OST_OPEN, 0, NULL, 0, NULL);
153         if (!request) { 
154                 CERROR("cannot pack req!\n"); 
155                 return -ENOMEM;
156         }
157         
158         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
159         request->rq_req.ost->connid = conn->oc_id;
160         if (request->rq_req.ost->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
161                 BUG();
162         request->rq_replen = 
163                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
164         
165         rc = ptlrpc_queue_wait(peer, request);
166         if (rc) { 
167                 EXIT;
168                 goto out;
169         }
170
171         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
172         if (oa) { 
173                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
174         }
175
176  out:
177         ptlrpc_free_req(request);
178         return 0;
179 }
180
181 static int osc_close(struct obd_conn *conn, struct obdo *oa)
182 {
183         struct ptlrpc_request *request;
184         struct ptlrpc_client *peer = osc_con2cl(conn);
185         int rc; 
186
187         request = ptlrpc_prep_req(peer, OST_CLOSE, 0, NULL, 0, NULL);
188         if (!request) { 
189                 CERROR("cannot pack req!\n"); 
190                 return -ENOMEM;
191         }
192         
193         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
194         request->rq_req.ost->connid = conn->oc_id;
195
196         request->rq_replen = 
197                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
198         
199         rc = ptlrpc_queue_wait(peer, request);
200         if (rc) { 
201                 EXIT;
202                 goto out;
203         }
204
205         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
206         if (oa) { 
207                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
208         }
209
210  out:
211         ptlrpc_free_req(request);
212         return 0;
213 }
214
215 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
216 {
217         struct ptlrpc_request *request;
218         struct ptlrpc_client *peer = osc_con2cl(conn);
219         int rc; 
220
221         request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
222         if (!request) { 
223                 CERROR("cannot pack req!\n"); 
224                 return -ENOMEM;
225         }
226         
227         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
228         request->rq_req.ost->connid = conn->oc_id;
229         request->rq_replen = 
230                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
231         
232         rc = ptlrpc_queue_wait(peer, request);
233         if (rc) { 
234                 EXIT;
235                 goto out;
236         }
237
238  out:
239         ptlrpc_free_req(request);
240         return 0;
241 }
242
243 static int osc_create(struct obd_conn *conn, struct obdo *oa)
244 {
245         struct ptlrpc_request *request;
246         struct ptlrpc_client *peer = osc_con2cl(conn);
247         int rc; 
248
249         if (!oa) { 
250                 CERROR("oa NULL\n"); 
251         }
252         request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
253         if (!request) { 
254                 CERROR("cannot pack req!\n"); 
255                 return -ENOMEM;
256         }
257         
258         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
259         request->rq_req.ost->connid = conn->oc_id;
260         request->rq_req.ost->oa.o_valid = ~0;
261         request->rq_replen = 
262                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
263         
264         rc = ptlrpc_queue_wait(peer, request);
265         if (rc) { 
266                 EXIT;
267                 goto out;
268         }
269         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
270
271  out:
272         ptlrpc_free_req(request);
273         return 0;
274 }
275
276 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
277                      obd_off offset)
278 {
279         struct ptlrpc_request *request;
280         struct ptlrpc_client *peer = osc_con2cl(conn);
281         int rc; 
282
283         if (!oa) { 
284                 CERROR("oa NULL\n"); 
285         }
286         request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
287         if (!request) { 
288                 CERROR("cannot pack req!\n"); 
289                 return -ENOMEM;
290         }
291         
292         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
293         request->rq_req.ost->connid = conn->oc_id;
294         request->rq_req.ost->oa.o_valid = ~0;
295         request->rq_req.ost->oa.o_size = offset;
296         request->rq_req.ost->oa.o_blocks = count;
297         request->rq_replen = 
298                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
299         
300         rc = ptlrpc_queue_wait(peer, request);
301         if (rc) { 
302                 EXIT;
303                 goto out;
304         }
305         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
306
307  out:
308         ptlrpc_free_req(request);
309         return 0;
310 }
311
312 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
313 {
314         struct ptlrpc_request *request;
315         struct ptlrpc_client *peer = osc_con2cl(conn);
316         int rc; 
317
318         if (!oa) { 
319                 CERROR("oa NULL\n"); 
320         }
321         request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
322         if (!request) { 
323                 CERROR("cannot pack req!\n"); 
324                 return -ENOMEM;
325         }
326         
327         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
328         request->rq_req.ost->connid = conn->oc_id;
329         request->rq_req.ost->oa.o_valid = ~0;
330         request->rq_replen = 
331                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
332         
333         rc = ptlrpc_queue_wait(peer, request);
334         if (rc) { 
335                 EXIT;
336                 goto out;
337         }
338         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
339
340  out:
341         ptlrpc_free_req(request);
342         return 0;
343 }
344
345 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
346                  struct niobuf *dst, struct niobuf *src)
347 {
348         struct ptlrpc_client *cl = osc_con2cl(conn);
349
350         if (cl->cli_obd) {
351                 /* local sendpage */
352                 memcpy((char *)(unsigned long)dst->addr,
353                        (char *)(unsigned long)src->addr, src->len);
354         } else {
355                 struct ptlrpc_bulk_desc *bulk;
356                 int rc;
357
358                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
359                 if (bulk == NULL)
360                         return -ENOMEM;
361
362                 bulk->b_buf = (void *)(unsigned long)src->addr;
363                 bulk->b_buflen = src->len;
364                 bulk->b_xid = dst->xid;
365                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
366                 if (rc != 0) {
367                         CERROR("send_bulk failed: %d\n", rc);
368                         BUG();
369                         return rc;
370                 }
371                 wait_event_interruptible(bulk->b_waitq,
372                                          ptlrpc_check_bulk_sent(bulk));
373
374                 if (bulk->b_flags == PTL_RPC_INTR) {
375                         EXIT;
376                         /* FIXME: hey hey, we leak here. */
377                         return -EINTR;
378                 }
379
380                 OBD_FREE(bulk, sizeof(*bulk));
381         }
382
383         return 0;
384 }
385
386 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
387                  obd_count *oa_bufs, struct page **buf, obd_size *count,
388                  obd_off *offset, obd_flag *flags)
389 {
390         struct ptlrpc_client *cl = osc_con2cl(conn);
391         struct ptlrpc_request *request;
392         int pages;
393         int rc; 
394         struct obd_ioobj ioo;
395         struct niobuf src;
396         int size1, size2 = 0; 
397         void *ptr1, *ptr2;
398         int i, j, n;
399         struct ptlrpc_bulk_desc **bulk;
400
401         size1 = num_oa * sizeof(ioo); 
402         pages = 0;
403         for (i = 0; i < num_oa; i++)
404                 pages += oa_bufs[i];
405         size2 = pages * sizeof(src);
406
407         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
408         if (!request) { 
409                 CERROR("cannot pack req!\n"); 
410                 return -ENOMEM;
411         }
412         request->rq_req.ost->cmd = OBD_BRW_READ;
413
414         OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
415         if (bulk == NULL) {
416                 CERROR("cannot alloc bulk desc vector\n");
417                 return -ENOMEM;
418         }
419         memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
420
421         n = 0;
422         ptr1 = ost_req_buf1(request->rq_req.ost);
423         ptr2 = ost_req_buf2(request->rq_req.ost);
424         for (i = 0; i < num_oa; i++) {
425                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
426                 for (j = 0; j < oa_bufs[i]; j++) {
427                         bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
428                         if (bulk[n] == NULL) {
429                                 CERROR("cannot alloc bulk desc\n");
430                                 rc = -ENOMEM;
431                                 goto out;
432                         }
433
434                         spin_lock(&cl->cli_lock);
435                         bulk[n]->b_xid = cl->cli_xid++;
436                         spin_unlock(&cl->cli_lock);
437                         bulk[n]->b_buf = kmap(buf[n]);
438                         bulk[n]->b_buflen = PAGE_SIZE;
439                         bulk[n]->b_portal = OST_BULK_PORTAL;
440                         ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
441                                         count[n], flags[n], bulk[n]->b_xid);
442
443                         rc = ptlrpc_register_bulk(bulk[n]);
444                         if (rc)
445                                 goto out;
446                         n++;
447                 }
448         }
449
450         request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
451         rc = ptlrpc_queue_wait(cl, request);
452
453  out:
454         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
455          * abort those bulk listeners. */
456
457         n = 0;
458         for (i = 0; i < num_oa; i++) {
459                 for (j = 0; j < oa_bufs[i]; j++) {
460                         if (bulk[n] == NULL)
461                                 continue;
462                         kunmap(buf[n]);
463                         OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
464                         n++;
465                 }
466         }
467
468         OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
469         ptlrpc_free_req(request);
470         return rc;
471 }
472
473 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
474                   obd_count *oa_bufs, struct page **buf, obd_size *count,
475                   obd_off *offset, obd_flag *flags)
476 {
477         struct ptlrpc_client *cl = osc_con2cl(conn);
478         struct ptlrpc_request *request;
479         struct obd_ioobj ioo;
480         struct niobuf *src;
481         int pages, rc, i, j, n, size1, size2 = 0; 
482         void *ptr1, *ptr2;
483
484         size1 = num_oa * sizeof(ioo); 
485         pages = 0;
486         for (i = 0; i < num_oa; i++)
487                 pages += oa_bufs[i];
488         size2 = pages * sizeof(*src);
489
490         OBD_ALLOC(src, size2);
491         if (!src) { 
492                 CERROR("no src memory\n");
493                 return -ENOMEM;
494         }
495         memset((char *)src, 0, size2);
496
497         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
498         if (!request) { 
499                 CERROR("cannot pack req!\n"); 
500                 return -ENOMEM;
501         }
502         request->rq_req.ost->cmd = OBD_BRW_WRITE;
503
504         n = 0;
505         ptr1 = ost_req_buf1(request->rq_req.ost);
506         ptr2 = ost_req_buf2(request->rq_req.ost);
507         for (i = 0; i < num_oa; i++) {
508                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
509                 for (j = 0; j < oa_bufs[i]; j++) {
510                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
511                                         count[n], flags[n], 0);
512                         n++;
513                 }
514         }
515         memcpy((char *)src, (char *)ost_req_buf2(request->rq_req.ost), size2); 
516
517         request->rq_replen = sizeof(struct ptlrep_hdr) +
518                 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
519         rc = ptlrpc_queue_wait(cl, request);
520         if (rc) { 
521                 EXIT;
522                 goto out;
523         }
524
525         ptr2 = ost_rep_buf2(request->rq_rep.ost);
526         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
527                 CERROR("buffer length wrong (%d vs. %d)\n",
528                        request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
529                 EXIT;
530                 goto out;
531         }
532
533         n = 0;
534         for (i = 0; i < num_oa; i++) {
535                 for (j = 0; j < oa_bufs[i]; j++) {
536                         struct niobuf *dst;
537                         ost_unpack_niobuf(&ptr2, &dst);
538                         osc_sendpage(conn, request, dst, &src[n]);
539                         n++;
540                 }
541         }
542         OBD_FREE(src, size2);
543  out:
544         n = 0;
545         for (i = 0; i < num_oa; i++) {
546                 for (j = 0; j < oa_bufs[i]; j++) {
547                         kunmap(buf[n]);
548                         n++;
549                 }
550         }
551
552         ptlrpc_free_req(request);
553         return 0;
554 }
555
556 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
557               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
558               obd_size *count, obd_off *offset, obd_flag *flags)
559 {
560         if (rw == OBD_BRW_READ) {
561                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
562                                     offset, flags);
563         } else {
564                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
565                                      offset, flags);
566         }
567 }
568
569 /* mount the file system (secretly) */
570 static int osc_setup(struct obd_device *obddev, obd_count len,
571                         void *buf)
572                         
573 {
574         struct osc_obd *osc = &obddev->u.osc;
575         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
576         int rc;
577         int dev = data->ioc_dev;
578         ENTRY;
579
580         OBD_ALLOC(osc->osc_peer, sizeof(*osc->osc_peer));
581         if (osc->osc_peer == NULL)
582                 return -ENOMEM;
583
584         rc = ptlrpc_connect_client(dev, "ost",
585                                    OST_REQUEST_PORTAL,
586                                    OSC_REPLY_PORTAL,  
587                                    ost_pack_req,
588                                    ost_unpack_rep,
589                                    osc->osc_peer);
590
591         MOD_INC_USE_COUNT;
592         EXIT;
593         return rc;
594
595
596 static int osc_cleanup(struct obd_device * obddev)
597 {
598         struct osc_obd *osc = &obddev->u.osc;
599
600         if (osc->osc_peer != NULL)
601                 OBD_FREE(osc->osc_peer, sizeof(*osc->osc_peer));
602
603         MOD_DEC_USE_COUNT;
604         return 0;
605 }
606
607 struct obd_ops osc_obd_ops = { 
608         o_setup:   osc_setup,
609         o_cleanup: osc_cleanup, 
610         o_create: osc_create,
611         o_destroy: osc_destroy,
612         o_getattr: osc_getattr,
613         o_setattr: osc_setattr,
614         o_open: osc_open,
615         o_close: osc_close,
616         o_connect: osc_connect,
617         o_disconnect: osc_disconnect,
618         o_brw: osc_brw,
619         o_punch: osc_punch
620 };
621
622 static int __init osc_init(void)
623 {
624         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
625         return 0;
626 }
627
628 static void __exit osc_exit(void)
629 {
630         obd_unregister_type(LUSTRE_OSC_NAME);
631 }
632
633 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
634 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
635 MODULE_LICENSE("GPL"); 
636
637 module_init(osc_init);
638 module_exit(osc_exit);