Whamcloud - gitweb
Merged branch 'peter' with the tip. Pre-merge tag is 't_20020302_networking'.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  * 
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_support.h>
43 #include <linux/obd_class.h>
44 #include <linux/lustre_lib.h>
45 #include <linux/lustre_idl.h>
46
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
48 {
49         struct osc_obd *osc = &conn->oc_dev->u.osc;
50         return &osc->osc_peer;
51
52 }
53
54 static int osc_connect(struct obd_conn *conn)
55 {
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *peer = osc_con2cl(conn);
58         int rc; 
59         ENTRY;
60         
61         request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
62         if (!request) { 
63                 CERROR("cannot pack req!\n"); 
64                 return -ENOMEM;
65         }
66
67         request->rq_replen = 
68                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
69
70         rc = ptlrpc_queue_wait(peer, request);
71         if (rc) { 
72                 EXIT;
73                 goto out;
74         }
75       
76         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
77
78         conn->oc_id = request->rq_rep.ost->connid;
79  out:
80         ptlrpc_free_req(request);
81         EXIT;
82         return rc;
83 }
84
85 static int osc_disconnect(struct obd_conn *conn)
86 {
87         struct ptlrpc_request *request;
88         struct ptlrpc_client *peer = osc_con2cl(conn);
89         int rc; 
90         ENTRY;
91         
92         request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
93         if (!request) { 
94                 CERROR("cannot pack req!\n"); 
95                 return -ENOMEM;
96         }
97         request->rq_req.ost->connid = conn->oc_id;
98         request->rq_replen = 
99                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
100
101         rc = ptlrpc_queue_wait(peer, request);
102         if (rc) { 
103                 EXIT;
104                 goto out;
105         }
106  out:
107         ptlrpc_free_req(request);
108         EXIT;
109         return rc;
110 }
111
112
113 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
114 {
115         struct ptlrpc_request *request;
116         struct ptlrpc_client *peer = osc_con2cl(conn);
117         int rc; 
118
119         request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
120         if (!request) { 
121                 CERROR("cannot pack req!\n"); 
122                 return -ENOMEM;
123         }
124         
125         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
126         request->rq_req.ost->oa.o_valid = ~0;
127         request->rq_replen = 
128                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
129         
130         rc = ptlrpc_queue_wait(peer, request);
131         if (rc) { 
132                 EXIT;
133                 goto out;
134         }
135
136         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
137         if (oa) { 
138                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
139         }
140
141  out:
142         ptlrpc_free_req(request);
143         return 0;
144 }
145
146 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
147 {
148         struct ptlrpc_request *request;
149         struct ptlrpc_client *peer = osc_con2cl(conn);
150         int rc; 
151
152         request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
153         if (!request) { 
154                 CERROR("cannot pack req!\n"); 
155                 return -ENOMEM;
156         }
157         
158         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
159         request->rq_replen = 
160                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
161         
162         rc = ptlrpc_queue_wait(peer, request);
163         if (rc) { 
164                 EXIT;
165                 goto out;
166         }
167
168  out:
169         ptlrpc_free_req(request);
170         return 0;
171 }
172
173 static int osc_create(struct obd_conn *conn, struct obdo *oa)
174 {
175         struct ptlrpc_request *request;
176         struct ptlrpc_client *peer = osc_con2cl(conn);
177         int rc; 
178
179         if (!oa) { 
180                 CERROR("oa NULL\n"); 
181         }
182         request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
183         if (!request) { 
184                 CERROR("cannot pack req!\n"); 
185                 return -ENOMEM;
186         }
187         
188         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
189         request->rq_req.ost->oa.o_valid = ~0;
190         request->rq_replen = 
191                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
192         
193         rc = ptlrpc_queue_wait(peer, request);
194         if (rc) { 
195                 EXIT;
196                 goto out;
197         }
198         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
199
200  out:
201         ptlrpc_free_req(request);
202         return 0;
203 }
204
205 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
206                      obd_off offset)
207 {
208         struct ptlrpc_request *request;
209         struct ptlrpc_client *peer = osc_con2cl(conn);
210         int rc; 
211
212         if (!oa) { 
213                 CERROR("oa NULL\n"); 
214         }
215         request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
216         if (!request) { 
217                 CERROR("cannot pack req!\n"); 
218                 return -ENOMEM;
219         }
220         
221         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
222         request->rq_req.ost->oa.o_valid = ~0;
223         request->rq_req.ost->oa.o_size = offset;
224         request->rq_req.ost->oa.o_blocks = count;
225         request->rq_replen = 
226                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
227         
228         rc = ptlrpc_queue_wait(peer, request);
229         if (rc) { 
230                 EXIT;
231                 goto out;
232         }
233         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
234
235  out:
236         ptlrpc_free_req(request);
237         return 0;
238 }
239
240 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
241 {
242         struct ptlrpc_request *request;
243         struct ptlrpc_client *peer = osc_con2cl(conn);
244         int rc; 
245
246         if (!oa) { 
247                 CERROR("oa NULL\n"); 
248         }
249         request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
250         if (!request) { 
251                 CERROR("cannot pack req!\n"); 
252                 return -ENOMEM;
253         }
254         
255         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
256         request->rq_req.ost->oa.o_valid = ~0;
257         request->rq_replen = 
258                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
259         
260         rc = ptlrpc_queue_wait(peer, request);
261         if (rc) { 
262                 EXIT;
263                 goto out;
264         }
265         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
266
267  out:
268         ptlrpc_free_req(request);
269         return 0;
270 }
271
272 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
273                  struct niobuf *dst, struct niobuf *src)
274 {
275         if (conn->oc_id != -1) {
276                 /* local sendpage */
277                 memcpy((char *)(unsigned long)dst->addr,
278                        (char *)(unsigned long)src->addr, src->len);
279         } else {
280                 struct ptlrpc_client *cl = osc_con2cl(conn);
281                 struct ptlrpc_bulk_desc *bulk;
282                 char *buf;
283                 int rc;
284
285                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
286                 if (bulk == NULL)
287                         return -ENOMEM;
288
289                 spin_lock(&cl->cli_lock);
290                 bulk->b_xid = cl->cli_xid++;
291                 spin_unlock(&cl->cli_lock);
292
293                 OBD_ALLOC(buf, src->len);
294                 if (!buf) {
295                         OBD_FREE(bulk, sizeof(*bulk));
296                         return -ENOMEM;
297                 }
298
299                 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
300
301                 bulk->b_buf = buf;
302                 bulk->b_buflen = src->len;
303                 /* FIXME: maybe we should add an XID to struct niobuf? */
304                 bulk->b_xid = (__u32)(unsigned long)src->page;
305
306                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
307                 if (rc != 0) {
308                         CERROR("send_bulk failed: %d\n", rc);
309                         BUG();
310                         return rc;
311                 }
312                 wait_event_interruptible(bulk->b_waitq,
313                                          ptlrpc_check_bulk_sent(bulk));
314
315                 if (bulk->b_flags == PTL_RPC_INTR) {
316                         EXIT;
317                         /* FIXME: hey hey, we leak here. */
318                         return -EINTR;
319                 }
320
321                 OBD_FREE(bulk, sizeof(*bulk));
322                 OBD_FREE(buf, src->len);
323         }
324
325         return 0;
326 }
327
328 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
329                  obd_count *oa_bufs, struct page **buf, obd_size *count,
330                  obd_off *offset, obd_flag *flags)
331 {
332         struct ptlrpc_client *cl = osc_con2cl(conn);
333         struct ptlrpc_request *request;
334         int pages;
335         int rc; 
336         struct obd_ioobj ioo;
337         struct niobuf src;
338         int size1, size2 = 0; 
339         void *ptr1, *ptr2;
340         int i, j, n;
341         struct ptlrpc_bulk_desc **bulk;
342
343         size1 = num_oa * sizeof(ioo); 
344         pages = 0;
345         for (i = 0; i < num_oa; i++) { 
346                 size2 += oa_bufs[i] * sizeof(src);
347                 pages += oa_bufs[i];
348         }
349
350         /* We actually pack a _third_ buffer, with XIDs for bulk pages */
351         size2 += pages * sizeof(__u32);
352         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
353         if (!request) { 
354                 CERROR("cannot pack req!\n"); 
355                 return -ENOMEM;
356         }
357         request->rq_req.ost->cmd = OBD_BRW_READ;
358
359         OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
360         if (bulk == NULL) {
361                 CERROR("cannot alloc bulk desc vector\n");
362                 return -ENOMEM;
363         }
364         memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
365
366         n = 0;
367         ptr1 = ost_req_buf1(request->rq_req.ost);
368         ptr2 = ost_req_buf2(request->rq_req.ost);
369         for (i = 0; i < num_oa; i++) {
370                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
371                 for (j = 0; j < oa_bufs[i]; j++) {
372                         bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
373                         if (bulk[n] == NULL) {
374                                 CERROR("cannot alloc bulk desc\n");
375                                 rc = -ENOMEM;
376                                 goto out;
377                         }
378
379                         spin_lock(&cl->cli_lock);
380                         bulk[n]->b_xid = cl->cli_xid++;
381                         spin_unlock(&cl->cli_lock);
382                         bulk[n]->b_buf = kmap(buf[n]);
383                         bulk[n]->b_buflen = PAGE_SIZE;
384                         bulk[n]->b_portal = OST_BULK_PORTAL;
385                         ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
386                                         count[n], flags[n]);
387                         n++;
388                 }
389         }
390
391         /* This is kinda silly--put the XIDs in the "third" buffer. */
392         for (n = 0; n < pages; n++) {
393                 *(__u32 *)ptr2 = bulk[n]->b_xid;
394                 ptr2 = (char *)ptr2 + sizeof(__u32);
395
396                 rc = ptlrpc_wait_bulk(bulk[n]);
397                 if (rc)
398                         goto out;
399         }
400
401         request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
402         rc = ptlrpc_queue_wait(cl, request);
403
404  out:
405         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
406          * abort those bulk listeners. */
407
408         if (request->rq_rephdr)
409                 OBD_FREE(request->rq_rephdr, request->rq_replen);
410         n = 0;
411         for (i = 0; i < num_oa; i++) {
412                 for (j = 0; j < oa_bufs[i]; j++) {
413                         if (bulk[n] == NULL)
414                                 continue;
415                         kunmap(bulk[n]->b_buf);
416                         OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
417                         n++;
418                 }
419         }
420
421         OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
422         ptlrpc_free_req(request);
423         return rc;
424 }
425
426 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
427                   obd_count *oa_bufs, struct page **buf, obd_size *count,
428                   obd_off *offset, obd_flag *flags)
429 {
430         struct ptlrpc_client *cl = osc_con2cl(conn);
431         struct ptlrpc_request *request;
432         struct obd_ioobj ioo;
433         struct niobuf src;
434         int pages, rc, i, j, n, size1, size2 = 0; 
435         void *ptr1, *ptr2;
436
437         size1 = num_oa * sizeof(ioo); 
438         pages = 0;
439         for (i = 0; i < num_oa; i++) { 
440                 size2 += oa_bufs[i] * sizeof(src);
441                 pages += oa_bufs[i];
442         }
443
444         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
445         if (!request) { 
446                 CERROR("cannot pack req!\n"); 
447                 return -ENOMEM;
448         }
449         request->rq_req.ost->cmd = OBD_BRW_WRITE;
450
451         n = 0;
452         ptr1 = ost_req_buf1(request->rq_req.ost);
453         ptr2 = ost_req_buf2(request->rq_req.ost);
454         for (i = 0; i < num_oa; i++) {
455                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
456                 for (j = 0; j < oa_bufs[i]; j++) {
457                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
458                                         count[n], flags[n]);
459                         n++;
460                 }
461         }
462
463         request->rq_replen = sizeof(struct ptlrep_hdr) +
464                 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
465         rc = ptlrpc_queue_wait(cl, request);
466         if (rc) { 
467                 EXIT;
468                 goto out;
469         }
470
471         ptr2 = ost_rep_buf2(request->rq_rep.ost);
472         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
473                 CERROR("buffer length wrong (%d vs. %d)\n",
474                        request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
475                 EXIT;
476                 goto out;
477         }
478
479         for (i = 0; i < num_oa; i++) {
480                 for (j = 0; j < oa_bufs[i]; j++) {
481                         struct niobuf *dst;
482                         src.addr = (__u64)(unsigned long)buf[n];
483                         src.len = count[n];
484                         ost_unpack_niobuf(&ptr2, &dst);
485                         osc_sendpage(conn, request, dst, &src);
486                         n++;
487                 }
488         }
489
490         /* Reuse the request structure for the completion request. */
491         OBD_FREE(request->rq_rephdr, request->rq_replen);
492         request->rq_rephdr = NULL;
493         request->rq_repbuf = NULL;
494         request->rq_reqhdr->opc = OST_BRW_COMPLETE;
495         request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
496         rc = ptlrpc_queue_wait(cl, request);
497         if (rc) { 
498                 EXIT;
499                 goto out;
500         }
501
502  out:
503         if (request->rq_rephdr)
504                 OBD_FREE(request->rq_rephdr, request->rq_replen);
505         n = 0;
506         for (i = 0; i < num_oa; i++) {
507                 for (j = 0; j < oa_bufs[i]; j++) {
508                         kunmap(buf[n]);
509                         n++;
510                 }
511         }
512
513         ptlrpc_free_req(request);
514         return 0;
515 }
516
517 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
518               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
519               obd_size *count, obd_off *offset, obd_flag *flags)
520 {
521         if (rw == OBD_BRW_READ) {
522                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
523                                     offset, flags);
524         } else {
525                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
526                                      offset, flags);
527         }
528 }
529
530 /* mount the file system (secretly) */
531 static int osc_setup(struct obd_device *obddev, obd_count len,
532                         void *buf)
533                         
534 {
535         struct osc_obd *osc = &obddev->u.osc;
536         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
537         int rc;
538         int dev = data->ioc_dev;
539         ENTRY;
540
541         rc = ptlrpc_connect_client(dev, "ost", 
542                                    OST_REQUEST_PORTAL, 
543                                    OSC_REPLY_PORTAL,    
544                                    ost_pack_req, 
545                                    ost_unpack_rep,
546                                    &osc->osc_peer); 
547
548         MOD_INC_USE_COUNT;
549         EXIT;
550         return rc;
551
552
553 static int osc_cleanup(struct obd_device * obddev)
554 {
555         MOD_DEC_USE_COUNT;
556         return 0;
557 }
558
559 struct obd_ops osc_obd_ops = { 
560         o_setup:   osc_setup,
561         o_cleanup: osc_cleanup, 
562         o_create: osc_create,
563         o_destroy: osc_destroy,
564         o_getattr: osc_getattr,
565         o_setattr: osc_setattr,
566         o_connect: osc_connect,
567         o_disconnect: osc_disconnect,
568         o_brw: osc_brw,
569         o_punch: osc_punch
570 };
571
572 static int __init osc_init(void)
573 {
574         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
575         return 0;
576 }
577
578 static void __exit osc_exit(void)
579 {
580         obd_unregister_type(LUSTRE_OSC_NAME);
581 }
582
583 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
584 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
585 MODULE_LICENSE("GPL"); 
586
587 module_init(osc_init);
588 module_exit(osc_exit);